From be62e53a02767f21368ba83edc0513b06ab70b51 Mon Sep 17 00:00:00 2001 From: David Bull Date: Wed, 25 Jun 2025 16:17:08 +0100 Subject: [PATCH 1/3] feat: Add `Consumer#stop` method to stop an `each` loop Previously if the `Consumer#each` method was called it would loop indefinitely with no way of gracefully stopping it. Calling `Consumer#close` would close and destroy the kafka handle, but would then crash the app with an "Invalid memory access" inside the each loop the next time it called `poll`. This commit adds an internal flag to keep track of whether the each loop is running, and adds a `Consumer#stop` method to stop it. This can be called from, for example, a signal handler to gracefully stop the consumer. E.g.: ``` require "crafka" consumer = Kafka::Consumer.new({ "bootstrap.servers" => "localhost:9092", "client.id" => "test", "group.id" => "test", "broker.address.family" => "v4", }) Signal::INT.trap do Log.info { "Received SIGINT" } consumer.stop end Log.info { "Subscribing to topic..." } consumer.subscribe("test-topic") Log.info { "Starting consumer loop..." } consumer.each do |kafka_event| payload = String.new(kafka_event.payload) Log.info { "Consumed event: #{payload}" } end Log.info { "Exited consumer loop" } Log.info { "Closing consumer..." } consumer.close Log.info { "Bye" } ``` --- CHANGELOG.md | 1 + spec/kafka/consumer_spec.cr | 17 +++++++++++++++++ spec/spec_helper.cr | 16 ++++++++++++++++ src/kafka/consumer.cr | 25 +++++++++++++++++++++---- 4 files changed, 55 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a48c2c..0abea0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## [Unreleased] ### Changed +- Add `Consumer#stop` method to stop an `each` loop - Add crystal versions 1.15.1 and 1.16.3 to test matrix ### Fixed diff --git a/spec/kafka/consumer_spec.cr b/spec/kafka/consumer_spec.cr index 314e63a..92b5b06 100644 --- a/spec/kafka/consumer_spec.cr +++ b/spec/kafka/consumer_spec.cr @@ -26,4 +26,21 @@ describe Kafka::Consumer do end end end + + describe "#each" do + it "returns when stopped" do + consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + + spawn do + until consumer.running? + sleep(30.milliseconds) + end + consumer.stop + end + + timeout(5.seconds) do + consumer.each(timeout: 10) { } + end + end + end end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 23b1df4..d93d601 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -1,3 +1,19 @@ require "spec" require "json" require "../src/*" + +def timeout(time : Time::Span, &blk) + done = Channel(Nil).new + + spawn do + blk.call + done.send(nil) + end + + select + when done.receive + return + when timeout(time) + raise "Timeout" + end +end diff --git a/src/kafka/consumer.cr b/src/kafka/consumer.cr index 77cd032..90853ba 100644 --- a/src/kafka/consumer.cr +++ b/src/kafka/consumer.cr @@ -3,6 +3,11 @@ require "./consumer/*" module Kafka class Consumer + getter? running : Bool = false + + @handle : LibRdKafka::KafkaHandle + @stop_requested = false + def initialize(config : Hash(String, String)) conf = Kafka::Config.build(config) LibRdKafka.set_rebalance_cb(conf, Rebalance.callback) @@ -12,7 +17,6 @@ module Kafka @handle = LibRdKafka.kafka_new(LibRdKafka::TYPE_CONSUMER, conf, errstr, error_buffer.size) raise ConsumerException.new(String.new(errstr)) if @handle.null? - @running = true LibRdKafka.poll_set_consumer(@handle) end @@ -57,12 +61,25 @@ module Kafka # # At the beginning of each loop, `Fiber.yield` is called allow other Fibers to run. def each(timeout = 250, raise_on_error = true, &) - loop do + @running = true + @stop_requested = false + until @stop_requested Fiber.yield resp = poll(timeout, raise_on_error) next if resp.nil? yield resp - break unless @running + end + ensure + @running = false + end + + # Stops the each loop at the next poll interval. + def stop(wait = false) + @stop_requested = true + return unless wait + + while @running + sleep(10.milliseconds) end end @@ -70,9 +87,9 @@ module Kafka # # Calls the `rd_kafka_consumer_close` and `rd_kafka_destroy` C functions. def close - @running = false LibRdKafka.consumer_close(@handle) LibRdKafka.kafka_destroy(@handle) + @handle = LibRdKafka::KafkaHandle.null end end end From 1263778a49c3bf9b94b127d983e9a712383d40c2 Mon Sep 17 00:00:00 2001 From: David Bull Date: Wed, 25 Jun 2025 16:36:08 +0100 Subject: [PATCH 2/3] feat: Raise an error when trying to use a consumer after it has been closed --- CHANGELOG.md | 1 + spec/kafka/consumer_spec.cr | 46 +++++++++++++++++++++++++++++++++++++ src/kafka/consumer.cr | 14 +++++++++++ 3 files changed, 61 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0abea0d..6896990 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## [Unreleased] ### Changed +- Raise an error when trying to use a consumer after it has been closed - Add `Consumer#stop` method to stop an `each` loop - Add crystal versions 1.15.1 and 1.16.3 to test matrix diff --git a/spec/kafka/consumer_spec.cr b/spec/kafka/consumer_spec.cr index 92b5b06..f2c1581 100644 --- a/spec/kafka/consumer_spec.cr +++ b/spec/kafka/consumer_spec.cr @@ -25,6 +25,24 @@ describe Kafka::Consumer do consumer.subscribe("foo", "foo") end end + + it "raises an exception when called after the consumer is closed" do + consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer.close + expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do + consumer.subscribe("foo") + end + end + end + + describe "#poll" do + it "raises an exception when called after the consumer is closed" do + consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer.close + expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do + consumer.poll(250) + end + end end describe "#each" do @@ -42,5 +60,33 @@ describe Kafka::Consumer do consumer.each(timeout: 10) { } end end + + it "raises an exception when called after the consumer is closed" do + consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer.close + expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do + consumer.each { } + end + end + end + + describe "#open?" do + it "returns true after creation" do + consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer.open?.should be_true + end + + it "returns false after closing" do + consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer.close + consumer.open?.should be_false + end + end + + describe "#close" do + it "does not raise an exception when called multiple times" do + consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + 2.times { consumer.close } + end end end diff --git a/src/kafka/consumer.cr b/src/kafka/consumer.cr index 90853ba..a113a5d 100644 --- a/src/kafka/consumer.cr +++ b/src/kafka/consumer.cr @@ -31,6 +31,7 @@ module Kafka # # Calls the `rd_kafka_subscribe` C function. def subscribe(*topics) + verify_handle_open! tpl = LibRdKafka.topic_partition_list_new(topics.size) topics.each do |topic| LibRdKafka.topic_partition_list_add(tpl, topic, -1) @@ -45,6 +46,7 @@ module Kafka # # Calls the `rd_kafka_consumer_poll` C function. def poll(timeout_ms : Int32, raise_on_error : Bool = true) : Message? + verify_handle_open! message_ptr = LibRdKafka.consumer_poll(@handle, timeout_ms) return if message_ptr.null? @@ -61,6 +63,7 @@ module Kafka # # At the beginning of each loop, `Fiber.yield` is called allow other Fibers to run. def each(timeout = 250, raise_on_error = true, &) + verify_handle_open! @running = true @stop_requested = false until @stop_requested @@ -83,13 +86,24 @@ module Kafka end end + # Returns whether the consumer is open. + def open? + !@handle.null? + end + # Close the consumer and destroy the Kafka handle. # # Calls the `rd_kafka_consumer_close` and `rd_kafka_destroy` C functions. def close + return if @handle.null? + LibRdKafka.consumer_close(@handle) LibRdKafka.kafka_destroy(@handle) @handle = LibRdKafka::KafkaHandle.null end + + private def verify_handle_open! + raise ConsumerException.new("Consumer closed") if @handle.null? + end end end From 5bd1221e085811545e8160d87dd2e72909a66f02 Mon Sep 17 00:00:00 2001 From: David Bull Date: Thu, 26 Jun 2025 09:18:48 +0100 Subject: [PATCH 3/3] test: Refactor consumer create logic into a spec helper method to remove duplication --- spec/kafka/consumer_spec.cr | 20 +++++++++----------- spec/spec_helper.cr | 6 ++++++ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/spec/kafka/consumer_spec.cr b/spec/kafka/consumer_spec.cr index f2c1581..062ea05 100644 --- a/spec/kafka/consumer_spec.cr +++ b/spec/kafka/consumer_spec.cr @@ -11,23 +11,21 @@ describe Kafka::Consumer do describe "#subscribe" do it "raises an exception when given no topics" do - consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) - + consumer = create_consumer expect_raises(Kafka::ConsumerException, "librdkafka error - Local: Invalid argument or configuration") do consumer.subscribe("") end end it "raises an exception when given duplicate topics" do - consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) - + consumer = create_consumer expect_raises(Kafka::ConsumerException, "librdkafka error - Local: Invalid argument or configuration") do consumer.subscribe("foo", "foo") end end it "raises an exception when called after the consumer is closed" do - consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer = create_consumer consumer.close expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do consumer.subscribe("foo") @@ -37,7 +35,7 @@ describe Kafka::Consumer do describe "#poll" do it "raises an exception when called after the consumer is closed" do - consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer = create_consumer consumer.close expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do consumer.poll(250) @@ -47,7 +45,7 @@ describe Kafka::Consumer do describe "#each" do it "returns when stopped" do - consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer = create_consumer spawn do until consumer.running? @@ -62,7 +60,7 @@ describe Kafka::Consumer do end it "raises an exception when called after the consumer is closed" do - consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer = create_consumer consumer.close expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do consumer.each { } @@ -72,12 +70,12 @@ describe Kafka::Consumer do describe "#open?" do it "returns true after creation" do - consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer = create_consumer consumer.open?.should be_true end it "returns false after closing" do - consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer = create_consumer consumer.close consumer.open?.should be_false end @@ -85,7 +83,7 @@ describe Kafka::Consumer do describe "#close" do it "does not raise an exception when called multiple times" do - consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"}) + consumer = create_consumer 2.times { consumer.close } end end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index d93d601..e3c78bb 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -2,6 +2,12 @@ require "spec" require "json" require "../src/*" +def create_consumer + Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", + "group.id" => "foo", + "broker.address.family" => "v4"}) +end + def timeout(time : Time::Span, &blk) done = Channel(Nil).new