@@ -210,7 +210,7 @@ _kafka_state_to_str(kafka_state state)
210210static void
211211_kafka_set_state (kafka_ctx_t ctx , const char * func , kafka_state state )
212212{
213- _nmsg_dprintf (3 , "%s changing state from %s to %s\n" , func ,
213+ _nmsg_dprintf (3 , "%s: changing state from %s to %s\n" , func ,
214214 _kafka_state_to_str (ctx -> state ), _kafka_state_to_str (state ));
215215 ctx -> state = state ;
216216}
@@ -597,19 +597,15 @@ _kafka_error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
597597 return ;
598598 }
599599 switch (err_kafka ) {
600- /* Keep retrying on socket disconnect, brokers down and message timeout */
601600 case RD_KAFKA_RESP_ERR__TRANSPORT :
602601 case RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN :
603602 case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT :
604- _nmsg_dprintf (2 , "%s: got Kafka error %d: %s\n" , __func__ , err , reason );
605- break ;
606603 case RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION :
607604 case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART :
608605 case RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE :
609- /* At the moment treat any broker's error as fatal */
610606 default :
607+ /* Just log, let librdkafka handle all errors */
611608 _nmsg_dprintf (2 , "%s: got Kafka error %d: %s\n" , __func__ , err , reason );
612- _kafka_set_state (ctx , __func__ , kafka_state_break );
613609 break ;
614610 }
615611}
@@ -630,10 +626,7 @@ _kafka_delivery_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *op
630626 }
631627 if (rkmessage -> err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
632628 int level = 2 ;
633- if (rkmessage -> err != RD_KAFKA_RESP_ERR__MSG_TIMED_OUT ) {
634- _kafka_set_state (ctx , __func__ , kafka_state_break );
635- rd_kafka_yield (rk );
636- } else {
629+ if (rkmessage -> err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT ) {
637630 ctx -> dropped ++ ;
638631 level = 4 ;
639632 }
0 commit comments