diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index 8c87e9d9..31543e2c 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -70,10 +70,12 @@ class AsyncProducer # @param delivery_interval [Integer] if greater than zero, the number of # seconds between automatic message deliveries. # - def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) + def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: 10, retry_backoff: 5, instrumenter:, logger:) raise ArgumentError unless max_queue_size > 0 raise ArgumentError unless delivery_threshold >= 0 raise ArgumentError unless delivery_interval >= 0 + raise ArgumentError unless max_retries >= 0 + raise ArgumentError unless retry_backoff >= 0 @queue = Queue.new @max_queue_size = max_queue_size @@ -108,6 +110,7 @@ def produce(value, topic:, **options) ensure_threads_running! + @logger.info "async_producer.produce: _queue_size=#{@queue.size}" if @queue.size >= @max_queue_size buffer_overflow topic, "Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached" @@ -198,7 +201,7 @@ def run end class Worker - def initialize(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) + def initialize(queue:, producer:, delivery_threshold:, max_retries: 10, retry_backoff: 5, instrumenter:, logger:) @queue = queue @producer = producer @delivery_threshold = delivery_threshold @@ -261,21 +264,20 @@ def do_loop retry end - def produce(value, **kwargs) - retries = 0 + def produce(*args) + retry_count = 0 begin - @producer.produce(value, **kwargs) - rescue BufferOverflow => e + @producer.produce(*args) + rescue BufferOverflow deliver_messages - if @max_retries == -1 - retry - elsif retries < @max_retries - retries += 1 - sleep @retry_backoff**retries + + if retry_count < @max_retries + retry_count += 1 + @logger.error "async_producer:worker.produce: buffer_overflow failure: do retry: #{retry_count}, sleep #{@retry_backoff} sec" + sleep @retry_backoff retry else - @logger.error("Failed to asynchronously produce messages due to BufferOverflow") - @instrumenter.instrument("error.async_producer", { error: e }) + @logger.error "async_producer:worker.produce: buffer_overflow failure: no retries, drop messages" end end end diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index af014854..d27f13d4 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -337,7 +337,7 @@ def producer( # # @see AsyncProducer # @return [AsyncProducer] - def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: -1, retry_backoff: 0, **options) + def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: 10, retry_backoff: 5, **options) sync_producer = producer(**options) AsyncProducer.new( diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index b504dbfd..b28e598c 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -206,14 +206,18 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: create_time: create_time )) + if message.bytesize >= @max_buffer_bytesize + buffer_overflow topic, "Message is too big: message_bytesize=#{message.bytesize}" + end + if buffer_size >= @max_buffer_size buffer_overflow topic, - "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached" + "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached (buffer_size: #{buffer_size})" end if buffer_bytesize + message.bytesize >= @max_buffer_bytesize buffer_overflow topic, - "Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached" + "Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached (buffer_bytesize: #{buffer_bytesize}, message_bytesize: #{message.bytesize})" end # If the producer is in transactional mode, all the message production @@ -521,6 +525,7 @@ def buffer_overflow(topic, message) @instrumenter.instrument("buffer_overflow.producer", { topic: topic, }) + @logger.error "producer.buffer_overflow: topic:#{topic}, message:#{message}" raise BufferOverflow, message end diff --git a/lib/kafka/ssl_context.rb b/lib/kafka/ssl_context.rb index c5eac454..3f8d1fd9 100644 --- a/lib/kafka/ssl_context.rb +++ b/lib/kafka/ssl_context.rb @@ -54,6 +54,8 @@ def self.build(ca_cert_file_path: nil, ca_cert: nil, client_cert: nil, client_ce store.set_default_paths end ssl_context.cert_store = store + ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER + ssl_context.ssl_version = :TLSv1_2 end ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER diff --git a/lib/kafka/tagged_logger.rb b/lib/kafka/tagged_logger.rb index d956696d..eb6af80f 100644 --- a/lib/kafka/tagged_logger.rb +++ b/lib/kafka/tagged_logger.rb @@ -41,7 +41,7 @@ def clear_tags! def current_tags # We use our object ID here to avoid conflicting with other instances thread_key = @thread_key ||= "kafka_tagged_logging_tags:#{object_id}".freeze - Thread.current[thread_key] ||= [] + Thread.current[thread_key] ||= ["ruby-kafka"] end def tags_text diff --git a/spec/async_producer_spec.rb b/spec/async_producer_spec.rb index ff91e9c8..a27329c2 100644 --- a/spec/async_producer_spec.rb +++ b/spec/async_producer_spec.rb @@ -86,11 +86,9 @@ def instrument(name, payload = {}) async_producer.produce("hello", topic: "greetings") sleep 0.3 # wait for all retries to be done - expect(log.string).to include "Failed to asynchronously produce messages due to BufferOverflow" + expect(log.string).to include "buffer_overflow failure" - metric = instrumenter.metrics_for("error.async_producer").first - expect(metric.payload[:error]).to be_a(Kafka::BufferOverflow) - expect(sync_producer).to have_received(:produce).exactly(3).times + expect(sync_producer).to have_received(:produce).exactly(2).times async_producer.shutdown end