Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [Unreleased]
### Changed
- Raise an error when trying to use a consumer after it has been closed
- Add `Consumer#stop` method to stop an `each` loop
- Add crystal versions 1.15.1 and 1.16.3 to test matrix

### Fixed
Expand Down
69 changes: 65 additions & 4 deletions spec/kafka/consumer_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,80 @@ describe Kafka::Consumer do

describe "#subscribe" do
it "raises an exception when given no topics" do
consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"})

consumer = create_consumer
expect_raises(Kafka::ConsumerException, "librdkafka error - Local: Invalid argument or configuration") do
consumer.subscribe("")
end
end

it "raises an exception when given duplicate topics" do
consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"})

consumer = create_consumer
expect_raises(Kafka::ConsumerException, "librdkafka error - Local: Invalid argument or configuration") do
consumer.subscribe("foo", "foo")
end
end

it "raises an exception when called after the consumer is closed" do
consumer = create_consumer
consumer.close
expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do
consumer.subscribe("foo")
end
end
end

describe "#poll" do
it "raises an exception when called after the consumer is closed" do
consumer = create_consumer
consumer.close
expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do
consumer.poll(250)
end
end
end

describe "#each" do
it "returns when stopped" do
consumer = create_consumer

spawn do
until consumer.running?
sleep(30.milliseconds)
end
consumer.stop
end

timeout(5.seconds) do
consumer.each(timeout: 10) { }
end
end

it "raises an exception when called after the consumer is closed" do
consumer = create_consumer
consumer.close
expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do
consumer.each { }
end
end
end

describe "#open?" do
it "returns true after creation" do
consumer = create_consumer
consumer.open?.should be_true
end

it "returns false after closing" do
consumer = create_consumer
consumer.close
consumer.open?.should be_false
end
end

describe "#close" do
it "does not raise an exception when called multiple times" do
consumer = create_consumer
2.times { consumer.close }
end
end
end
22 changes: 22 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
require "spec"
require "json"
require "../src/*"

def create_consumer
Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094",
"group.id" => "foo",
"broker.address.family" => "v4"})
end

def timeout(time : Time::Span, &blk)
done = Channel(Nil).new

spawn do
blk.call
done.send(nil)
end

select
when done.receive
return
when timeout(time)
raise "Timeout"
end
end
39 changes: 35 additions & 4 deletions src/kafka/consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ require "./consumer/*"

module Kafka
class Consumer
getter? running : Bool = false

@handle : LibRdKafka::KafkaHandle
@stop_requested = false

def initialize(config : Hash(String, String))
conf = Kafka::Config.build(config)
LibRdKafka.set_rebalance_cb(conf, Rebalance.callback)
Expand All @@ -12,7 +17,6 @@ module Kafka
@handle = LibRdKafka.kafka_new(LibRdKafka::TYPE_CONSUMER, conf, errstr, error_buffer.size)
raise ConsumerException.new(String.new(errstr)) if @handle.null?

@running = true
LibRdKafka.poll_set_consumer(@handle)
end

Expand All @@ -27,6 +31,7 @@ module Kafka
#
# Calls the `rd_kafka_subscribe` C function.
def subscribe(*topics)
verify_handle_open!
tpl = LibRdKafka.topic_partition_list_new(topics.size)
topics.each do |topic|
LibRdKafka.topic_partition_list_add(tpl, topic, -1)
Expand All @@ -41,6 +46,7 @@ module Kafka
#
# Calls the `rd_kafka_consumer_poll` C function.
def poll(timeout_ms : Int32, raise_on_error : Bool = true) : Message?
verify_handle_open!
message_ptr = LibRdKafka.consumer_poll(@handle, timeout_ms)
return if message_ptr.null?

Expand All @@ -57,22 +63,47 @@ module Kafka
#
# At the beginning of each loop, `Fiber.yield` is called allow other Fibers to run.
def each(timeout = 250, raise_on_error = true, &)
loop do
verify_handle_open!
@running = true
@stop_requested = false
until @stop_requested
Fiber.yield
resp = poll(timeout, raise_on_error)
next if resp.nil?
yield resp
break unless @running
end
ensure
@running = false
end

# Stops the each loop at the next poll interval.
def stop(wait = false)
@stop_requested = true
return unless wait

while @running
sleep(10.milliseconds)
end
end

# Returns whether the consumer is open.
def open?
!@handle.null?
end

# Close the consumer and destroy the Kafka handle.
#
# Calls the `rd_kafka_consumer_close` and `rd_kafka_destroy` C functions.
def close
@running = false
return if @handle.null?

LibRdKafka.consumer_close(@handle)
LibRdKafka.kafka_destroy(@handle)
@handle = LibRdKafka::KafkaHandle.null
end

private def verify_handle_open!
raise ConsumerException.new("Consumer closed") if @handle.null?
end
end
end