From d98433e22e534ab3cb0c4cd1c495c9c27940ae46 Mon Sep 17 00:00:00 2001 From: notmaxx Date: Mon, 11 Mar 2019 19:44:22 -0700 Subject: [PATCH 01/16] extra logging and controllable retry in async producer --- lib/kafka/async_producer.rb | 41 +++++++++++++++++++++++----------- lib/kafka/cluster.rb | 8 +++---- lib/kafka/connection.rb | 10 ++++----- lib/kafka/produce_operation.rb | 20 ++++++++--------- lib/kafka/producer.rb | 16 ++++++++----- 5 files changed, 58 insertions(+), 37 deletions(-) diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index 72b6ee487..7e9397ab6 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -72,7 +72,7 @@ class AsyncProducer # @param delivery_interval [Integer] if greater than zero, the number of # seconds between automatic message deliveries. # - def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, instrumenter:, logger:) + def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, async_max_retries: 10, async_retry_backoff: 5, instrumenter:, logger:) raise ArgumentError unless max_queue_size > 0 raise ArgumentError unless delivery_threshold >= 0 raise ArgumentError unless delivery_interval >= 0 @@ -86,6 +86,8 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli queue: @queue, producer: sync_producer, delivery_threshold: delivery_threshold, + async_max_retries: async_max_retries, + async_retry_backoff: async_retry_backoff, instrumenter: instrumenter, logger: logger, ) @@ -184,16 +186,18 @@ def run end class Worker - def initialize(queue:, producer:, delivery_threshold:, instrumenter:, logger:) + def initialize(queue:, producer:, delivery_threshold:, async_max_retries:, async_retry_backoff:, instrumenter:, logger:) @queue = queue @producer = producer @delivery_threshold = delivery_threshold @instrumenter = instrumenter @logger = logger + @retry_backoff = async_retry_backoff + @max_retries = async_max_retries end def run - @logger.info "Starting async producer in the background..." + @logger.info "ruby-kafka: Starting async producer in the background..." loop do operation, payload = @queue.pop @@ -209,7 +213,7 @@ def run # Deliver any pending messages first. @producer.deliver_messages rescue Error => e - @logger.error("Failed to deliver messages during shutdown: #{e.message}") + @logger.error("ruby-kafka: Failed to deliver messages during shutdown: #{e.message}") @instrumenter.instrument("drop_messages.async_producer", { message_count: @producer.buffer_size + @queue.size, @@ -223,14 +227,14 @@ def run end end rescue Kafka::Error => e - @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" - @logger.info "Restarting in 10 seconds..." + @logger.error "ruby-kafka: Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" + @logger.info "ruby-kafka: Restarting in 10 seconds..." sleep 10 retry rescue Exception => e - @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" - @logger.error "Async producer crashed!" + @logger.error "ruby-kafka: Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" + @logger.error "ruby-kafka: Async producer crashed!" ensure @producer.shutdown end @@ -238,17 +242,28 @@ def run private def produce(*args) - @producer.produce(*args) - rescue BufferOverflow - deliver_messages - retry + retry_count = 0 + begin + @producer.produce(*args) + rescue BufferOverflow + deliver_messages + + if retry_count < @max_retries + retry_count += 1 + @logger.error "ruby-kafka: async_producer:worker.produce: buffer_overflow failure: do retry: #{retry_count}, sleep #{@retry_backoff} sec" + sleep @retry_backoff + retry + else + @logger.error "ruby-kafka: async_producer:worker.produce: buffer_overflow failure: no retries, drop messages" + end + end end def deliver_messages @producer.deliver_messages rescue DeliveryFailed, ConnectionError => e # Failed to deliver messages -- nothing to do but log and try again later. - @logger.error("Failed to asynchronously deliver messages: #{e.message}") + @logger.error("ruby-kafka: Failed to asynchronously deliver messages: #{e.message}") @instrumenter.instrument("error.async_producer", { error: e }) end diff --git a/lib/kafka/cluster.rb b/lib/kafka/cluster.rb index 9979f2874..dd89005db 100644 --- a/lib/kafka/cluster.rb +++ b/lib/kafka/cluster.rb @@ -375,16 +375,16 @@ def fetch_cluster_info cluster_info = broker.fetch_metadata(topics: @target_topics) if cluster_info.brokers.empty? - @logger.error "No brokers in cluster" + @logger.error "ruby-kafka: No brokers in cluster" else - @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}" + @logger.info "ruby-kafka: Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}" @stale = false return cluster_info end rescue Error => e - @logger.error "Failed to fetch metadata from #{node}: #{e}" + @logger.error "ruby-kafka: Failed to fetch metadata from #{node}: #{e}" errors << [node, e] ensure broker.disconnect unless broker.nil? @@ -445,7 +445,7 @@ def get_coordinator(coordinator_type, coordinator_key) sleep 1 retry rescue ConnectionError => e - @logger.error "Failed to get coordinator info from #{broker}: #{e}" + @logger.error "ruby-kafka: Failed to get coordinator info from #{broker}: #{e}" end end diff --git a/lib/kafka/connection.rb b/lib/kafka/connection.rb index ac791e035..dbf07d9c4 100644 --- a/lib/kafka/connection.rb +++ b/lib/kafka/connection.rb @@ -134,10 +134,10 @@ def open @last_request = nil rescue Errno::ETIMEDOUT => e - @logger.error "Timed out while trying to connect to #{self}: #{e}" + @logger.error "ruby-kafka: Timed out while trying to connect to #{self}: #{e}" raise ConnectionError, e rescue SocketError, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e - @logger.error "Failed to connect to #{self}: #{e}" + @logger.error "ruby-kafka: Failed to connect to #{self}: #{e}" raise ConnectionError, e end @@ -166,7 +166,7 @@ def write_request(request, notification) nil rescue Errno::ETIMEDOUT - @logger.error "Timed out while writing request #{@correlation_id}" + @logger.error "ruby-kafka: Timed out while writing request #{@correlation_id}" raise end @@ -192,7 +192,7 @@ def read_response(response_class, notification) return correlation_id, response rescue Errno::ETIMEDOUT - @logger.error "Timed out while waiting for response #{@correlation_id}" + @logger.error "ruby-kafka: Timed out while waiting for response #{@correlation_id}" raise end @@ -205,7 +205,7 @@ def wait_for_response(response_class, notification) # sitting in the socket waiting to be read. If the response we just read # was to a previous request, we can safely skip it. if correlation_id < @correlation_id - @logger.error "Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}" + @logger.error "ruby-kafka: Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}" elsif correlation_id > @correlation_id raise Kafka::Error, "Correlation id mismatch: expected #{@correlation_id} but got #{correlation_id}" else diff --git a/lib/kafka/produce_operation.rb b/lib/kafka/produce_operation.rb index 0dfe5cf6d..cb405e265 100644 --- a/lib/kafka/produce_operation.rb +++ b/lib/kafka/produce_operation.rb @@ -84,7 +84,7 @@ def send_buffered_messages messages_for_broker[broker] ||= MessageBuffer.new messages_for_broker[broker].concat(messages, topic: topic, partition: partition) rescue Kafka::Error => e - @logger.error "Could not connect to leader for partition #{topic}/#{partition}: #{e.message}" + @logger.error "ruby-kafka: Could not connect to leader for partition #{topic}/#{partition}: #{e.message}" @instrumenter.instrument("topic_error.producer", { topic: topic, @@ -104,7 +104,7 @@ def send_buffered_messages messages_for_broker.each do |broker, message_buffer| begin - @logger.info "Sending #{message_buffer.size} messages to #{broker}" + @logger.info "ruby-kafka: Sending #{message_buffer.size} messages to #{broker}" records_for_topics = {} @@ -132,7 +132,7 @@ def send_buffered_messages handle_response(broker, response, records_for_topics) if response rescue ConnectionError => e - @logger.error "Could not connect to broker #{broker}: #{e}" + @logger.error "ruby-kafka: Could not connect to broker #{broker}: #{e}" # Mark the cluster as stale in order to force a cluster metadata refresh. @cluster.mark_as_stale! @@ -177,22 +177,22 @@ def handle_response(broker, response, records_for_topics) }) end rescue Kafka::CorruptMessage - @logger.error "Corrupt message when writing to #{topic}/#{partition} on #{broker}" + @logger.error "ruby-kafka: Corrupt message when writing to #{topic}/#{partition} on #{broker}" rescue Kafka::UnknownTopicOrPartition - @logger.error "Unknown topic or partition #{topic}/#{partition} on #{broker}" + @logger.error "ruby-kafka: Unknown topic or partition #{topic}/#{partition} on #{broker}" @cluster.mark_as_stale! rescue Kafka::LeaderNotAvailable - @logger.error "Leader currently not available for #{topic}/#{partition}" + @logger.error "ruby-kafka: Leader currently not available for #{topic}/#{partition}" @cluster.mark_as_stale! rescue Kafka::NotLeaderForPartition - @logger.error "Broker #{broker} not currently leader for #{topic}/#{partition}" + @logger.error "ruby-kafka: Broker #{broker} not currently leader for #{topic}/#{partition}" @cluster.mark_as_stale! rescue Kafka::RequestTimedOut - @logger.error "Timed out while writing to #{topic}/#{partition} on #{broker}" + @logger.error "ruby-kafka: Timed out while writing to #{topic}/#{partition} on #{broker}" rescue Kafka::NotEnoughReplicas - @logger.error "Not enough in-sync replicas for #{topic}/#{partition}" + @logger.error "ruby-kafka: Not enough in-sync replicas for #{topic}/#{partition}" rescue Kafka::NotEnoughReplicasAfterAppend - @logger.error "Messages written, but to fewer in-sync replicas than required for #{topic}/#{partition}" + @logger.error "ruby-kafka: Messages written, but to fewer in-sync replicas than required for #{topic}/#{partition}" else @logger.debug "Successfully appended #{records.count} messages to #{topic}/#{partition} on #{broker}" diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index b8b5068f0..960ab4706 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -192,14 +192,16 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: create_time: create_time ) + @logger.info "kafka-ruby: producer.produce: buffer_size:#{buffer_size}, buffer_bytesize:#{buffer_bytesize}, message_bytesize:#{message.bytesize}, _pending_message_queue_size:#{@pending_message_queue.size}, _buffer_size:#{@buffer.size}, _pending_message_queue_bytesize:#{@pending_message_queue.bytesize}, _buffer_bytesize:#{@buffer.bytesize}" + if buffer_size >= @max_buffer_size buffer_overflow topic, - "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached" + "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached (buffer_size: #{buffer_size})" end if buffer_bytesize + message.bytesize >= @max_buffer_bytesize buffer_overflow topic, - "Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached" + "Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached (buffer_bytesize: #{buffer_bytesize}, message_bytesize: #{message.bytesize})" end # If the producer is in transactional mode, all the message production @@ -234,6 +236,9 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: # @raise [DeliveryFailed] if not all messages could be successfully sent. # @return [nil] def deliver_messages + + @logger.info "kafka-ruby: producer.deliver_messages: buffer_size:#{buffer_size}, buffer_bytesize:#{buffer_bytesize}, _pending_message_queue_size:#{@pending_message_queue.size}, _buffer_size:#{@buffer.size}" + # There's no need to do anything if the buffer is empty. return if buffer_size == 0 @@ -391,11 +396,11 @@ def deliver_messages_with_retries(notification) if buffer_size.zero? break elsif attempt <= @max_retries - @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s" + @logger.warn "ruby-kafka: Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s" sleep @retry_backoff else - @logger.error "Failed to send all messages; keeping remaining messages in buffer" + @logger.error "ruby-kafka: Failed to send all messages; keeping remaining messages in buffer" break end end @@ -454,7 +459,7 @@ def assign_partitions! if failed_messages.any? failed_messages.group_by(&:topic).each do |topic, messages| - @logger.error "Failed to assign partitions to #{messages.count} messages in #{topic}" + @logger.error "ruby-kafka: Failed to assign partitions to #{messages.count} messages in #{topic}" end @cluster.mark_as_stale! @@ -491,6 +496,7 @@ def buffer_overflow(topic, message) @instrumenter.instrument("buffer_overflow.producer", { topic: topic, }) + @logger.error "ruby-kafka: producer.buffer_overflow: topic:#{topic}, message:#{message}" raise BufferOverflow, message end From b2e9ff667d9aead83e1e5c61cee6221719301714 Mon Sep 17 00:00:00 2001 From: notmaxx Date: Tue, 12 Mar 2019 14:47:49 -0700 Subject: [PATCH 02/16] re-format logs --- lib/kafka/async_producer.rb | 3 ++- lib/kafka/producer.rb | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index 7e9397ab6..04be53fee 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -105,6 +105,7 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli def produce(value, topic:, **options) ensure_threads_running! + @logger.info "ruby-kafka: async_producer.produce: _queue_size=#{@queue.size}" if @queue.size >= @max_queue_size buffer_overflow topic, "Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached" @@ -250,7 +251,7 @@ def produce(*args) if retry_count < @max_retries retry_count += 1 - @logger.error "ruby-kafka: async_producer:worker.produce: buffer_overflow failure: do retry: #{retry_count}, sleep #{@retry_backoff} sec" + @logger.error "ruby-kafka: async_producer:worker.produce: buffer_overflow failure: do retry: retry_count=#{retry_count} sleep_time_sec=#{@retry_backoff}" sleep @retry_backoff retry else diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index 960ab4706..8a7099a2d 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -192,7 +192,7 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: create_time: create_time ) - @logger.info "kafka-ruby: producer.produce: buffer_size:#{buffer_size}, buffer_bytesize:#{buffer_bytesize}, message_bytesize:#{message.bytesize}, _pending_message_queue_size:#{@pending_message_queue.size}, _buffer_size:#{@buffer.size}, _pending_message_queue_bytesize:#{@pending_message_queue.bytesize}, _buffer_bytesize:#{@buffer.bytesize}" + @logger.info "ruby-kafka: producer.produce: buffer_size=#{buffer_size} buffer_bytesize=#{buffer_bytesize} message_bytesize=#{message.bytesize} _pending_message_queue_size=#{@pending_message_queue.size} _buffer_size=#{@buffer.size} _pending_message_queue_bytesize=#{@pending_message_queue.bytesize} _buffer_bytesize=#{@buffer.bytesize}" if buffer_size >= @max_buffer_size buffer_overflow topic, @@ -237,7 +237,7 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: # @return [nil] def deliver_messages - @logger.info "kafka-ruby: producer.deliver_messages: buffer_size:#{buffer_size}, buffer_bytesize:#{buffer_bytesize}, _pending_message_queue_size:#{@pending_message_queue.size}, _buffer_size:#{@buffer.size}" + @logger.info "ruby-kafka: producer.deliver_messages: buffer_size=#{buffer_size} buffer_bytesize=#{buffer_bytesize} _pending_message_queue_size=#{@pending_message_queue.size} _buffer_size=#{@buffer.size}" # There's no need to do anything if the buffer is empty. return if buffer_size == 0 From e3bcc1185c8a36c3dc7a638ca90ebc324364a2d6 Mon Sep 17 00:00:00 2001 From: notmaxx Date: Thu, 14 Mar 2019 16:02:00 -0700 Subject: [PATCH 03/16] remove extra logging and pass async retry params from the settings --- lib/kafka/async_producer.rb | 2 ++ lib/kafka/client.rb | 4 +++- lib/kafka/producer.rb | 8 ++++++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index 04be53fee..20b40b5a9 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -76,6 +76,8 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli raise ArgumentError unless max_queue_size > 0 raise ArgumentError unless delivery_threshold >= 0 raise ArgumentError unless delivery_interval >= 0 + raise ArgumentError unless async_max_retries >= 0 + raise ArgumentError unless async_retry_backoff >= 0 @queue = Queue.new @max_queue_size = max_queue_size diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 3ed231c99..9fa5b074a 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -295,7 +295,7 @@ def producer( # # @see AsyncProducer # @return [AsyncProducer] - def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options) + def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, async_max_retries: 10, async_retry_backoff: 5, **options) sync_producer = producer(**options) AsyncProducer.new( @@ -303,6 +303,8 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: delivery_interval: delivery_interval, delivery_threshold: delivery_threshold, max_queue_size: max_queue_size, + async_max_retries: async_max_retries, + async_retry_backoff: async_retry_backoff, instrumenter: @instrumenter, logger: @logger, ) diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index 8a7099a2d..a8db1aee1 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -192,7 +192,11 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: create_time: create_time ) - @logger.info "ruby-kafka: producer.produce: buffer_size=#{buffer_size} buffer_bytesize=#{buffer_bytesize} message_bytesize=#{message.bytesize} _pending_message_queue_size=#{@pending_message_queue.size} _buffer_size=#{@buffer.size} _pending_message_queue_bytesize=#{@pending_message_queue.bytesize} _buffer_bytesize=#{@buffer.bytesize}" + # @logger.info "ruby-kafka: producer.produce: buffer_size=#{buffer_size} buffer_bytesize=#{buffer_bytesize} message_bytesize=#{message.bytesize} _pending_message_queue_size=#{@pending_message_queue.size} _buffer_size=#{@buffer.size} _pending_message_queue_bytesize=#{@pending_message_queue.bytesize} _buffer_bytesize=#{@buffer.bytesize}" + + if message.bytesize >= @max_buffer_bytesize + buffer_overflow topic, "Message is too big: message_bytesize=#{message.bytesize}" + end if buffer_size >= @max_buffer_size buffer_overflow topic, @@ -237,7 +241,7 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: # @return [nil] def deliver_messages - @logger.info "ruby-kafka: producer.deliver_messages: buffer_size=#{buffer_size} buffer_bytesize=#{buffer_bytesize} _pending_message_queue_size=#{@pending_message_queue.size} _buffer_size=#{@buffer.size}" + # @logger.info "ruby-kafka: producer.deliver_messages: buffer_size=#{buffer_size} buffer_bytesize=#{buffer_bytesize} _pending_message_queue_size=#{@pending_message_queue.size} _buffer_size=#{@buffer.size}" # There's no need to do anything if the buffer is empty. return if buffer_size == 0 From 05233014eff60b39fd08da77005debc70672950a Mon Sep 17 00:00:00 2001 From: notmaxx Date: Sun, 17 Mar 2019 19:59:59 -0700 Subject: [PATCH 04/16] logger with exception handling --- lib/kafka/client.rb | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 9fa5b074a..502cdebab 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -17,6 +17,20 @@ module Kafka class Client + class SafeLogger + def initialize(logger) + @elogger = logger + end + + %w[error info warn debug].each do |type| + define_method(type) do |*args| + @elogger.send(type, *args) + rescue StandardError => ex + puts "ruby-kafka: ERROR logger exception: type #{type}, ex.message #{ex.message}" + end + end + end + # Initializes a new Kafka client. # # @param seed_brokers [Array, String] the list of brokers used to initialize @@ -68,7 +82,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, sasl_over_ssl: true, ssl_ca_certs_from_system: false) - @logger = logger || Logger.new(nil) + @logger = SafeLogger.new(logger || Logger.new(nil)) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) From 115833c41a516a9760f05a1816bf269751a308f7 Mon Sep 17 00:00:00 2001 From: notmaxx Date: Sun, 17 Mar 2019 20:07:58 -0700 Subject: [PATCH 05/16] fix --- lib/kafka/client.rb | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 502cdebab..a4e8c52b7 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -24,9 +24,11 @@ def initialize(logger) %w[error info warn debug].each do |type| define_method(type) do |*args| - @elogger.send(type, *args) - rescue StandardError => ex - puts "ruby-kafka: ERROR logger exception: type #{type}, ex.message #{ex.message}" + begin + @elogger.send(type, *args) + rescue StandardError => ex + puts "ruby-kafka: ERROR logger exception: type #{type}, ex.message #{ex.message}" + end end end end From 9b45dbb1c6b1ef8564548744288a6eb1528529e9 Mon Sep 17 00:00:00 2001 From: notmaxx Date: Thu, 28 Mar 2019 14:01:10 -0700 Subject: [PATCH 06/16] report dropped messages in case of crash --- lib/kafka/async_producer.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index 20b40b5a9..bde1d78e7 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -238,6 +238,9 @@ def run rescue Exception => e @logger.error "ruby-kafka: Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" @logger.error "ruby-kafka: Async producer crashed!" + @instrumenter.instrument("drop_messages.async_producer", { + message_count: @producer.buffer_size + @queue.size, + }) ensure @producer.shutdown end From c302ef109d7cdf8cd0cef340d6d7440bd6d02ec0 Mon Sep 17 00:00:00 2001 From: notmaxx Date: Thu, 28 Mar 2019 14:07:01 -0700 Subject: [PATCH 07/16] delete safelogger code --- lib/kafka/client.rb | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index a4e8c52b7..9fa5b074a 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -17,22 +17,6 @@ module Kafka class Client - class SafeLogger - def initialize(logger) - @elogger = logger - end - - %w[error info warn debug].each do |type| - define_method(type) do |*args| - begin - @elogger.send(type, *args) - rescue StandardError => ex - puts "ruby-kafka: ERROR logger exception: type #{type}, ex.message #{ex.message}" - end - end - end - end - # Initializes a new Kafka client. # # @param seed_brokers [Array, String] the list of brokers used to initialize @@ -84,7 +68,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, sasl_over_ssl: true, ssl_ca_certs_from_system: false) - @logger = SafeLogger.new(logger || Logger.new(nil)) + @logger = logger || Logger.new(nil) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) From 0c8cd3fa10c464b0e2afcad8926933c249f6f099 Mon Sep 17 00:00:00 2001 From: notmaxx Date: Thu, 28 Mar 2019 14:10:27 -0700 Subject: [PATCH 08/16] add prefix for log --- lib/kafka/broker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka/broker.rb b/lib/kafka/broker.rb index 970b6b328..b8c0574e4 100644 --- a/lib/kafka/broker.rb +++ b/lib/kafka/broker.rb @@ -187,7 +187,7 @@ def end_txn(**options) def send_request(request) connection.send_request(request) rescue IdleConnection - @logger.warn "Connection has been unused for too long, re-connecting..." + @logger.warn "ruby-kafka: Connection has been unused for too long, re-connecting..." @connection.close rescue nil @connection = nil retry From aa43ff4542a9d733b2a72a6ef90c7c93211c4a95 Mon Sep 17 00:00:00 2001 From: Vadim Shaulski Date: Wed, 11 Sep 2019 20:08:26 +0300 Subject: [PATCH 09/16] Disable host verification --- lib/kafka/ssl_context.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/kafka/ssl_context.rb b/lib/kafka/ssl_context.rb index de724f7d9..51aedff3b 100644 --- a/lib/kafka/ssl_context.rb +++ b/lib/kafka/ssl_context.rb @@ -54,6 +54,7 @@ def self.build(ca_cert_file_path: nil, ca_cert: nil, client_cert: nil, client_ce store.set_default_paths end ssl_context.cert_store = store + ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER end ssl_context From bf2e2fe86e99e5f3cf745e0cddf6fdef91faf543 Mon Sep 17 00:00:00 2001 From: Vadim Shaulski Date: Thu, 12 Sep 2019 18:58:16 +0300 Subject: [PATCH 10/16] Revert "Disable host verification" This reverts commit aa43ff4542a9d733b2a72a6ef90c7c93211c4a95. --- lib/kafka/ssl_context.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/kafka/ssl_context.rb b/lib/kafka/ssl_context.rb index 51aedff3b..de724f7d9 100644 --- a/lib/kafka/ssl_context.rb +++ b/lib/kafka/ssl_context.rb @@ -54,7 +54,6 @@ def self.build(ca_cert_file_path: nil, ca_cert: nil, client_cert: nil, client_ce store.set_default_paths end ssl_context.cert_store = store - ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER end ssl_context From 156d682c1a90df3733226e4eb5c654bcb00524b1 Mon Sep 17 00:00:00 2001 From: Vadim Shaulski Date: Thu, 12 Sep 2019 19:39:05 +0300 Subject: [PATCH 11/16] Force TLS1.2 --- lib/kafka/ssl_context.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/kafka/ssl_context.rb b/lib/kafka/ssl_context.rb index de724f7d9..cc0099aa3 100644 --- a/lib/kafka/ssl_context.rb +++ b/lib/kafka/ssl_context.rb @@ -54,6 +54,8 @@ def self.build(ca_cert_file_path: nil, ca_cert: nil, client_cert: nil, client_ce store.set_default_paths end ssl_context.cert_store = store + ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER + ssl_context.ssl_version = :TLSv1_2 end ssl_context From cbbf32f717066aceb2f9bf504b90db1bd6d6b695 Mon Sep 17 00:00:00 2001 From: Youssef Boulkaid Date: Fri, 12 May 2023 11:57:20 +0200 Subject: [PATCH 12/16] Use tagged logging for ruby-kafka prefix --- lib/kafka/async_producer.rb | 8 ++++---- lib/kafka/cluster.rb | 2 +- lib/kafka/connection.rb | 10 +++++----- lib/kafka/produce_operation.rb | 20 ++++++++++---------- lib/kafka/producer.rb | 8 ++------ lib/kafka/tagged_logger.rb | 2 +- 6 files changed, 23 insertions(+), 27 deletions(-) diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index dc0b58a30..e54add288 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -110,7 +110,7 @@ def produce(value, topic:, **options) ensure_threads_running! - @logger.info "ruby-kafka: async_producer.produce: _queue_size=#{@queue.size}" + @logger.info "async_producer.produce: _queue_size=#{@queue.size}" if @queue.size >= @max_queue_size buffer_overflow topic, "Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached" @@ -257,8 +257,8 @@ def do_loop end end rescue Kafka::Error => e - @logger.error "ruby-kafka: Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" - @logger.info "ruby-kafka: Restarting in 10 seconds..." + @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" + @logger.info "Restarting in 10 seconds..." sleep 10 retry @@ -287,7 +287,7 @@ def deliver_messages @producer.deliver_messages rescue DeliveryFailed, ConnectionError => e # Failed to deliver messages -- nothing to do but log and try again later. - @logger.error("ruby-kafka: Failed to asynchronously deliver messages: #{e.message}") + @logger.error("Failed to asynchronously deliver messages: #{e.message}") @instrumenter.instrument("error.async_producer", { error: e }) end diff --git a/lib/kafka/cluster.rb b/lib/kafka/cluster.rb index 832743148..8aaf19154 100644 --- a/lib/kafka/cluster.rb +++ b/lib/kafka/cluster.rb @@ -503,7 +503,7 @@ def get_coordinator(coordinator_type, coordinator_key) sleep 1 retry rescue ConnectionError => e - @logger.error "ruby-kafka: Failed to get coordinator info from #{broker}: #{e}" + @logger.error "Failed to get coordinator info from #{broker}: #{e}" end end diff --git a/lib/kafka/connection.rb b/lib/kafka/connection.rb index 8bfed0639..efefbfc0f 100644 --- a/lib/kafka/connection.rb +++ b/lib/kafka/connection.rb @@ -140,10 +140,10 @@ def open @last_request = nil rescue Errno::ETIMEDOUT => e - @logger.error "ruby-kafka: Timed out while trying to connect to #{self}: #{e}" + @logger.error "Timed out while trying to connect to #{self}: #{e}" raise ConnectionError, e rescue SocketError, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e - @logger.error "ruby-kafka: Failed to connect to #{self}: #{e}" + @logger.error "Failed to connect to #{self}: #{e}" raise ConnectionError, e end @@ -172,7 +172,7 @@ def write_request(request, notification) nil rescue Errno::ETIMEDOUT - @logger.error "ruby-kafka: Timed out while writing request #{@correlation_id}" + @logger.error "Timed out while writing request #{@correlation_id}" raise end @@ -198,7 +198,7 @@ def read_response(response_class, notification) return correlation_id, response rescue Errno::ETIMEDOUT - @logger.error "ruby-kafka: Timed out while waiting for response #{@correlation_id}" + @logger.error "Timed out while waiting for response #{@correlation_id}" raise end @@ -211,7 +211,7 @@ def wait_for_response(response_class, notification) # sitting in the socket waiting to be read. If the response we just read # was to a previous request, we can safely skip it. if correlation_id < @correlation_id - @logger.error "ruby-kafka: Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}" + @logger.error "Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}" elsif correlation_id > @correlation_id raise Kafka::Error, "Correlation id mismatch: expected #{@correlation_id} but got #{correlation_id}" else diff --git a/lib/kafka/produce_operation.rb b/lib/kafka/produce_operation.rb index 0ed930902..766fa568d 100644 --- a/lib/kafka/produce_operation.rb +++ b/lib/kafka/produce_operation.rb @@ -84,7 +84,7 @@ def send_buffered_messages messages_for_broker[broker] ||= MessageBuffer.new messages_for_broker[broker].concat(messages, topic: topic, partition: partition) rescue Kafka::Error => e - @logger.error "ruby-kafka: Could not connect to leader for partition #{topic}/#{partition}: #{e.message}" + @logger.error "Could not connect to leader for partition #{topic}/#{partition}: #{e.message}" @instrumenter.instrument("topic_error.producer", { topic: topic, @@ -104,7 +104,7 @@ def send_buffered_messages messages_for_broker.each do |broker, message_buffer| begin - @logger.info "ruby-kafka: Sending #{message_buffer.size} messages to #{broker}" + @logger.info "Sending #{message_buffer.size} messages to #{broker}" records_for_topics = {} @@ -132,7 +132,7 @@ def send_buffered_messages handle_response(broker, response, records_for_topics) if response rescue ConnectionError => e - @logger.error "ruby-kafka: Could not connect to broker #{broker}: #{e}" + @logger.error "Could not connect to broker #{broker}: #{e}" # Mark the cluster as stale in order to force a cluster metadata refresh. @cluster.mark_as_stale! @@ -177,22 +177,22 @@ def handle_response(broker, response, records_for_topics) }) end rescue Kafka::CorruptMessage - @logger.error "ruby-kafka: Corrupt message when writing to #{topic}/#{partition} on #{broker}" + @logger.error "Corrupt message when writing to #{topic}/#{partition} on #{broker}" rescue Kafka::UnknownTopicOrPartition - @logger.error "ruby-kafka: Unknown topic or partition #{topic}/#{partition} on #{broker}" + @logger.error "Unknown topic or partition #{topic}/#{partition} on #{broker}" @cluster.mark_as_stale! rescue Kafka::LeaderNotAvailable - @logger.error "ruby-kafka: Leader currently not available for #{topic}/#{partition}" + @logger.error "Leader currently not available for #{topic}/#{partition}" @cluster.mark_as_stale! rescue Kafka::NotLeaderForPartition - @logger.error "ruby-kafka: Broker #{broker} not currently leader for #{topic}/#{partition}" + @logger.error "Broker #{broker} not currently leader for #{topic}/#{partition}" @cluster.mark_as_stale! rescue Kafka::RequestTimedOut - @logger.error "ruby-kafka: Timed out while writing to #{topic}/#{partition} on #{broker}" + @logger.error "Timed out while writing to #{topic}/#{partition} on #{broker}" rescue Kafka::NotEnoughReplicas - @logger.error "ruby-kafka: Not enough in-sync replicas for #{topic}/#{partition}" + @logger.error "Not enough in-sync replicas for #{topic}/#{partition}" rescue Kafka::NotEnoughReplicasAfterAppend - @logger.error "ruby-kafka: Messages written, but to fewer in-sync replicas than required for #{topic}/#{partition}" + @logger.error "Messages written, but to fewer in-sync replicas than required for #{topic}/#{partition}" else @logger.debug "Successfully appended #{records.count} messages to #{topic}/#{partition} on #{broker}" diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index 5130d6c5e..9a1ca0771 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -206,8 +206,6 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: create_time: create_time )) - # @logger.info "ruby-kafka: producer.produce: buffer_size=#{buffer_size} buffer_bytesize=#{buffer_bytesize} message_bytesize=#{message.bytesize} _pending_message_queue_size=#{@pending_message_queue.size} _buffer_size=#{@buffer.size} _pending_message_queue_bytesize=#{@pending_message_queue.bytesize} _buffer_bytesize=#{@buffer.bytesize}" - if message.bytesize >= @max_buffer_bytesize buffer_overflow topic, "Message is too big: message_bytesize=#{message.bytesize}" end @@ -255,8 +253,6 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: # @return [nil] def deliver_messages - # @logger.info "ruby-kafka: producer.deliver_messages: buffer_size=#{buffer_size} buffer_bytesize=#{buffer_bytesize} _pending_message_queue_size=#{@pending_message_queue.size} _buffer_size=#{@buffer.size}" - # There's no need to do anything if the buffer is empty. return if buffer_size == 0 @@ -493,7 +489,7 @@ def assign_partitions! if failed_messages.any? failed_messages.group_by(&:topic).each do |topic, messages| - @logger.error "ruby-kafka: Failed to assign partitions to #{messages.count} messages in #{topic}" + @logger.error "Failed to assign partitions to #{messages.count} messages in #{topic}" end @cluster.mark_as_stale! @@ -530,7 +526,7 @@ def buffer_overflow(topic, message) @instrumenter.instrument("buffer_overflow.producer", { topic: topic, }) - @logger.error "ruby-kafka: producer.buffer_overflow: topic:#{topic}, message:#{message}" + @logger.error "producer.buffer_overflow: topic:#{topic}, message:#{message}" raise BufferOverflow, message end diff --git a/lib/kafka/tagged_logger.rb b/lib/kafka/tagged_logger.rb index d956696da..eb6af80f8 100644 --- a/lib/kafka/tagged_logger.rb +++ b/lib/kafka/tagged_logger.rb @@ -41,7 +41,7 @@ def clear_tags! def current_tags # We use our object ID here to avoid conflicting with other instances thread_key = @thread_key ||= "kafka_tagged_logging_tags:#{object_id}".freeze - Thread.current[thread_key] ||= [] + Thread.current[thread_key] ||= ["ruby-kafka"] end def tags_text From ae88877721afb2ca977525d78ca48411d51d538f Mon Sep 17 00:00:00 2001 From: Youssef Boulkaid Date: Fri, 12 May 2023 11:59:42 +0200 Subject: [PATCH 13/16] Fix variable name --- lib/kafka/async_producer.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index e54add288..1f0e455ac 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -74,8 +74,8 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli raise ArgumentError unless max_queue_size > 0 raise ArgumentError unless delivery_threshold >= 0 raise ArgumentError unless delivery_interval >= 0 - raise ArgumentError unless async_max_retries >= 0 - raise ArgumentError unless async_retry_backoff >= 0 + raise ArgumentError unless max_retries >= 0 + raise ArgumentError unless retry_backoff >= 0 @queue = Queue.new @max_queue_size = max_queue_size From 33e0d7cd71b3e5ac0ae55d3ff096da1f46f6b35d Mon Sep 17 00:00:00 2001 From: Youssef Boulkaid Date: Fri, 12 May 2023 12:03:20 +0200 Subject: [PATCH 14/16] Fix small git diff --- lib/kafka/broker.rb | 2 +- lib/kafka/producer.rb | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/kafka/broker.rb b/lib/kafka/broker.rb index 77ac88d3b..00919c784 100644 --- a/lib/kafka/broker.rb +++ b/lib/kafka/broker.rb @@ -199,7 +199,7 @@ def txn_offset_commit(**options) def send_request(request) connection.send_request(request) rescue IdleConnection - @logger.warn "ruby-kafka: Connection has been unused for too long, re-connecting..." + @logger.warn "Connection has been unused for too long, re-connecting..." @connection.close rescue nil @connection = nil retry diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index 9a1ca0771..b28e598c5 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -252,7 +252,6 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: # @raise [DeliveryFailed] if not all messages could be successfully sent. # @return [nil] def deliver_messages - # There's no need to do anything if the buffer is empty. return if buffer_size == 0 From cdb7213f20e09751e4169358b00ae53950074864 Mon Sep 17 00:00:00 2001 From: Youssef Boulkaid Date: Fri, 12 May 2023 15:42:48 +0200 Subject: [PATCH 15/16] Fix max_retries count --- lib/kafka/async_producer.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index 1f0e455ac..9e404da01 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -74,7 +74,7 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli raise ArgumentError unless max_queue_size > 0 raise ArgumentError unless delivery_threshold >= 0 raise ArgumentError unless delivery_interval >= 0 - raise ArgumentError unless max_retries >= 0 + raise ArgumentError unless max_retries >= -1 raise ArgumentError unless retry_backoff >= 0 @queue = Queue.new From 23a8f75e88d82c7c1435b2ca54ec5267ef9e8c3d Mon Sep 17 00:00:00 2001 From: Youssef Boulkaid Date: Wed, 31 May 2023 11:36:47 +0200 Subject: [PATCH 16/16] Keep our retry logic --- lib/kafka/async_producer.rb | 27 +++++++++++++-------------- lib/kafka/client.rb | 2 +- spec/async_producer_spec.rb | 6 ++---- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index 9e404da01..31543e2c1 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -70,11 +70,11 @@ class AsyncProducer # @param delivery_interval [Integer] if greater than zero, the number of # seconds between automatic message deliveries. # - def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) + def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: 10, retry_backoff: 5, instrumenter:, logger:) raise ArgumentError unless max_queue_size > 0 raise ArgumentError unless delivery_threshold >= 0 raise ArgumentError unless delivery_interval >= 0 - raise ArgumentError unless max_retries >= -1 + raise ArgumentError unless max_retries >= 0 raise ArgumentError unless retry_backoff >= 0 @queue = Queue.new @@ -201,7 +201,7 @@ def run end class Worker - def initialize(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) + def initialize(queue:, producer:, delivery_threshold:, max_retries: 10, retry_backoff: 5, instrumenter:, logger:) @queue = queue @producer = producer @delivery_threshold = delivery_threshold @@ -264,21 +264,20 @@ def do_loop retry end - def produce(value, **kwargs) - retries = 0 + def produce(*args) + retry_count = 0 begin - @producer.produce(value, **kwargs) - rescue BufferOverflow => e + @producer.produce(*args) + rescue BufferOverflow deliver_messages - if @max_retries == -1 - retry - elsif retries < @max_retries - retries += 1 - sleep @retry_backoff**retries + + if retry_count < @max_retries + retry_count += 1 + @logger.error "async_producer:worker.produce: buffer_overflow failure: do retry: #{retry_count}, sleep #{@retry_backoff} sec" + sleep @retry_backoff retry else - @logger.error("Failed to asynchronously produce messages due to BufferOverflow") - @instrumenter.instrument("error.async_producer", { error: e }) + @logger.error "async_producer:worker.produce: buffer_overflow failure: no retries, drop messages" end end end diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index af0148544..d27f13d4a 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -337,7 +337,7 @@ def producer( # # @see AsyncProducer # @return [AsyncProducer] - def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: -1, retry_backoff: 0, **options) + def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: 10, retry_backoff: 5, **options) sync_producer = producer(**options) AsyncProducer.new( diff --git a/spec/async_producer_spec.rb b/spec/async_producer_spec.rb index ff91e9c8a..a27329c25 100644 --- a/spec/async_producer_spec.rb +++ b/spec/async_producer_spec.rb @@ -86,11 +86,9 @@ def instrument(name, payload = {}) async_producer.produce("hello", topic: "greetings") sleep 0.3 # wait for all retries to be done - expect(log.string).to include "Failed to asynchronously produce messages due to BufferOverflow" + expect(log.string).to include "buffer_overflow failure" - metric = instrumenter.metrics_for("error.async_producer").first - expect(metric.payload[:error]).to be_a(Kafka::BufferOverflow) - expect(sync_producer).to have_received(:produce).exactly(3).times + expect(sync_producer).to have_received(:produce).exactly(2).times async_producer.shutdown end