diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a48c2c..6896990 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## [Unreleased] ### Changed +- Raise an error when trying to use a consumer after it has been closed +- Add `Consumer#stop` method to stop an `each` loop - Add crystal versions 1.15.1 and 1.16.3 to test matrix ### Fixed diff --git a/spec/kafka/consumer_spec.cr b/spec/kafka/consumer_spec.cr index 314e63a..062ea05 100644 --- a/spec/kafka/consumer_spec.cr +++ b/spec/kafka/consumer_spec.cr @@ -11,19 +11,80 @@ describe Kafka::Consumer do describe "#subscribe" do it "raises an exception when given no topics" do - consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) - + consumer = create_consumer expect_raises(Kafka::ConsumerException, "librdkafka error - Local: Invalid argument or configuration") do consumer.subscribe("") end end it "raises an exception when given duplicate topics" do - consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) - + consumer = create_consumer expect_raises(Kafka::ConsumerException, "librdkafka error - Local: Invalid argument or configuration") do consumer.subscribe("foo", "foo") end end + + it "raises an exception when called after the consumer is closed" do + consumer = create_consumer + consumer.close + expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do + consumer.subscribe("foo") + end + end + end + + describe "#poll" do + it "raises an exception when called after the consumer is closed" do + consumer = create_consumer + consumer.close + expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do + consumer.poll(250) + end + end + end + + describe "#each" do + it "returns when stopped" do + consumer = create_consumer + + spawn do + until consumer.running? + sleep(30.milliseconds) + end + consumer.stop + end + + timeout(5.seconds) do + consumer.each(timeout: 10) { } + end + end + + it "raises an exception when called after the consumer is closed" do + consumer = create_consumer + consumer.close + expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do + consumer.each { } + end + end + end + + describe "#open?" do + it "returns true after creation" do + consumer = create_consumer + consumer.open?.should be_true + end + + it "returns false after closing" do + consumer = create_consumer + consumer.close + consumer.open?.should be_false + end + end + + describe "#close" do + it "does not raise an exception when called multiple times" do + consumer = create_consumer + 2.times { consumer.close } + end end end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 23b1df4..e3c78bb 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -1,3 +1,25 @@ require "spec" require "json" require "../src/*" + +def create_consumer + Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", + "group.id" => "foo", + "broker.address.family" => "v4"}) +end + +def timeout(time : Time::Span, &blk) + done = Channel(Nil).new + + spawn do + blk.call + done.send(nil) + end + + select + when done.receive + return + when timeout(time) + raise "Timeout" + end +end diff --git a/src/kafka/consumer.cr b/src/kafka/consumer.cr index 77cd032..a113a5d 100644 --- a/src/kafka/consumer.cr +++ b/src/kafka/consumer.cr @@ -3,6 +3,11 @@ require "./consumer/*" module Kafka class Consumer + getter? running : Bool = false + + @handle : LibRdKafka::KafkaHandle + @stop_requested = false + def initialize(config : Hash(String, String)) conf = Kafka::Config.build(config) LibRdKafka.set_rebalance_cb(conf, Rebalance.callback) @@ -12,7 +17,6 @@ module Kafka @handle = LibRdKafka.kafka_new(LibRdKafka::TYPE_CONSUMER, conf, errstr, error_buffer.size) raise ConsumerException.new(String.new(errstr)) if @handle.null? - @running = true LibRdKafka.poll_set_consumer(@handle) end @@ -27,6 +31,7 @@ module Kafka # # Calls the `rd_kafka_subscribe` C function. def subscribe(*topics) + verify_handle_open! tpl = LibRdKafka.topic_partition_list_new(topics.size) topics.each do |topic| LibRdKafka.topic_partition_list_add(tpl, topic, -1) @@ -41,6 +46,7 @@ module Kafka # # Calls the `rd_kafka_consumer_poll` C function. def poll(timeout_ms : Int32, raise_on_error : Bool = true) : Message? + verify_handle_open! message_ptr = LibRdKafka.consumer_poll(@handle, timeout_ms) return if message_ptr.null? @@ -57,22 +63,47 @@ module Kafka # # At the beginning of each loop, `Fiber.yield` is called allow other Fibers to run. def each(timeout = 250, raise_on_error = true, &) - loop do + verify_handle_open! + @running = true + @stop_requested = false + until @stop_requested Fiber.yield resp = poll(timeout, raise_on_error) next if resp.nil? yield resp - break unless @running end + ensure + @running = false + end + + # Stops the each loop at the next poll interval. + def stop(wait = false) + @stop_requested = true + return unless wait + + while @running + sleep(10.milliseconds) + end + end + + # Returns whether the consumer is open. + def open? + !@handle.null? end # Close the consumer and destroy the Kafka handle. # # Calls the `rd_kafka_consumer_close` and `rd_kafka_destroy` C functions. def close - @running = false + return if @handle.null? + LibRdKafka.consumer_close(@handle) LibRdKafka.kafka_destroy(@handle) + @handle = LibRdKafka::KafkaHandle.null + end + + private def verify_handle_open! + raise ConsumerException.new("Consumer closed") if @handle.null? end end end