Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions lib/kafka/async_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ class AsyncProducer
# @param delivery_interval [Integer] if greater than zero, the number of
# seconds between automatic message deliveries.
#
def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: 10, retry_backoff: 5, instrumenter:, logger:)
raise ArgumentError unless max_queue_size > 0
raise ArgumentError unless delivery_threshold >= 0
raise ArgumentError unless delivery_interval >= 0
raise ArgumentError unless max_retries >= 0
raise ArgumentError unless retry_backoff >= 0
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks were in our fork but not in the official version. We backport them to keep existing behavior.


@queue = Queue.new
@max_queue_size = max_queue_size
Expand Down Expand Up @@ -108,6 +110,7 @@ def produce(value, topic:, **options)

ensure_threads_running!

@logger.info "async_producer.produce: _queue_size=#{@queue.size}"
if @queue.size >= @max_queue_size
buffer_overflow topic,
"Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached"
Expand Down Expand Up @@ -198,7 +201,7 @@ def run
end

class Worker
def initialize(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
def initialize(queue:, producer:, delivery_threshold:, max_retries: 10, retry_backoff: 5, instrumenter:, logger:)
@queue = queue
@producer = producer
@delivery_threshold = delivery_threshold
Expand Down Expand Up @@ -261,21 +264,20 @@ def do_loop
retry
end

def produce(value, **kwargs)
retries = 0
def produce(*args)
retry_count = 0
begin
@producer.produce(value, **kwargs)
rescue BufferOverflow => e
@producer.produce(*args)
rescue BufferOverflow
deliver_messages
if @max_retries == -1
retry
elsif retries < @max_retries
retries += 1
sleep @retry_backoff**retries

if retry_count < @max_retries
retry_count += 1
@logger.error "async_producer:worker.produce: buffer_overflow failure: do retry: #{retry_count}, sleep #{@retry_backoff} sec"
sleep @retry_backoff
retry
else
@logger.error("Failed to asynchronously produce messages due to BufferOverflow")
@instrumenter.instrument("error.async_producer", { error: e })
@logger.error "async_producer:worker.produce: buffer_overflow failure: no retries, drop messages"
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ def producer(
#
# @see AsyncProducer
# @return [AsyncProducer]
def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: -1, retry_backoff: 0, **options)
def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: 10, retry_backoff: 5, **options)
sync_producer = producer(**options)

AsyncProducer.new(
Expand Down
9 changes: 7 additions & 2 deletions lib/kafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,18 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key:
create_time: create_time
))

if message.bytesize >= @max_buffer_bytesize
buffer_overflow topic, "Message is too big: message_bytesize=#{message.bytesize}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yboulkaid where is buffer_overflow function defined?

Copy link
Author

@yboulkaid yboulkaid May 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's defined in the AsyncProducer:

def buffer_overflow(topic, message)

This is the helper method that's used by the rest of the class to raise BufferOverflow errors

end

if buffer_size >= @max_buffer_size
buffer_overflow topic,
"Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached"
"Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached (buffer_size: #{buffer_size})"
end

if buffer_bytesize + message.bytesize >= @max_buffer_bytesize
buffer_overflow topic,
"Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached"
"Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached (buffer_bytesize: #{buffer_bytesize}, message_bytesize: #{message.bytesize})"
end

# If the producer is in transactional mode, all the message production
Expand Down Expand Up @@ -521,6 +525,7 @@ def buffer_overflow(topic, message)
@instrumenter.instrument("buffer_overflow.producer", {
topic: topic,
})
@logger.error "producer.buffer_overflow: topic:#{topic}, message:#{message}"

raise BufferOverflow, message
end
Expand Down
2 changes: 2 additions & 0 deletions lib/kafka/ssl_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def self.build(ca_cert_file_path: nil, ca_cert: nil, client_cert: nil, client_ce
store.set_default_paths
end
ssl_context.cert_store = store
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
ssl_context.ssl_version = :TLSv1_2
end

ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka/tagged_logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def clear_tags!
def current_tags
# We use our object ID here to avoid conflicting with other instances
thread_key = @thread_key ||= "kafka_tagged_logging_tags:#{object_id}".freeze
Thread.current[thread_key] ||= []
Thread.current[thread_key] ||= ["ruby-kafka"]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tagged logging in action:
Screenshot 2023-05-12 at 15 05 00

end

def tags_text
Expand Down
6 changes: 2 additions & 4 deletions spec/async_producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ def instrument(name, payload = {})
async_producer.produce("hello", topic: "greetings")
sleep 0.3 # wait for all retries to be done

expect(log.string).to include "Failed to asynchronously produce messages due to BufferOverflow"
expect(log.string).to include "buffer_overflow failure"

metric = instrumenter.metrics_for("error.async_producer").first
expect(metric.payload[:error]).to be_a(Kafka::BufferOverflow)
expect(sync_producer).to have_received(:produce).exactly(3).times
expect(sync_producer).to have_received(:produce).exactly(2).times

async_producer.shutdown
end
Expand Down