diff --git a/src/instana/instrumentation/kafka/confluent_kafka_python.py b/src/instana/instrumentation/kafka/confluent_kafka_python.py index 9ca273dd..3b226ec0 100644 --- a/src/instana/instrumentation/kafka/confluent_kafka_python.py +++ b/src/instana/instrumentation/kafka/confluent_kafka_python.py @@ -17,8 +17,12 @@ from instana.span.span import InstanaSpan from instana.util.traceutils import get_tracer_tuple - consumer_token = None - consumer_span = contextvars.ContextVar("confluent_kafka_consumer_span") + consumer_token = contextvars.ContextVar( + "confluent_kafka_consumer_token", default=None + ) + consumer_span = contextvars.ContextVar( + "confluent_kafka_consumer_span", default=None + ) # As confluent_kafka is a wrapper around the C-developed librdkafka # (provided automatically via binary wheels), we have to create new classes @@ -178,24 +182,23 @@ def create_span( ) # pragma: no cover def save_consumer_span_into_context(span: "InstanaSpan") -> None: - global consumer_token ctx = trace.set_span_in_context(span) - consumer_token = context.attach(ctx) + token = context.attach(ctx) + consumer_token.set(token) consumer_span.set(span) def close_consumer_span(span: "InstanaSpan") -> None: - global consumer_token if span.is_recording(): span.end() consumer_span.set(None) - if consumer_token is not None: - context.detach(consumer_token) - consumer_token = None + token = consumer_token.get(None) + if token is not None: + context.detach(token) + consumer_token.set(None) def clear_context() -> None: - global consumer_token context.attach(trace.set_span_in_context(None)) - consumer_token = None + consumer_token.set(None) consumer_span.set(None) def trace_kafka_consume( @@ -253,6 +256,10 @@ def trace_kafka_poll( res = wrapped(*args, **kwargs) if res: create_span("poll", res.topic(), res.headers()) + else: + span = consumer_span.get(None) + if span is not None: + close_consumer_span(span) return res except Exception as exc: exception = exc diff --git a/tests/clients/kafka/test_confluent_kafka.py b/tests/clients/kafka/test_confluent_kafka.py index bc1d85b7..b8913649 100644 --- a/tests/clients/kafka/test_confluent_kafka.py +++ b/tests/clients/kafka/test_confluent_kafka.py @@ -709,19 +709,19 @@ def test_save_consumer_span_into_context(self, span: "InstanaSpan") -> None: """Test save_consumer_span_into_context function.""" # Verify initial state assert consumer_span.get(None) is None - assert confluent_kafka_python.consumer_token is None + assert confluent_kafka_python.consumer_token.get(None) is None # Save span into context save_consumer_span_into_context(span) # Verify token is stored - assert confluent_kafka_python.consumer_token is not None + assert confluent_kafka_python.consumer_token.get(None) is not None def test_close_consumer_span_recording_span(self, span: "InstanaSpan") -> None: """Test close_consumer_span with a recording span.""" # Save span into context first save_consumer_span_into_context(span) - assert confluent_kafka_python.consumer_token is not None + assert confluent_kafka_python.consumer_token.get(None) is not None # Verify span is recording assert span.is_recording() @@ -732,7 +732,7 @@ def test_close_consumer_span_recording_span(self, span: "InstanaSpan") -> None: # Verify span was ended and context cleared assert not span.is_recording() assert consumer_span.get(None) is None - assert confluent_kafka_python.consumer_token is None + assert confluent_kafka_python.consumer_token.get(None) is None def test_clear_context(self, span: "InstanaSpan") -> None: """Test clear_context function.""" @@ -741,14 +741,14 @@ def test_clear_context(self, span: "InstanaSpan") -> None: # Verify context has data assert consumer_span.get(None) == span - assert confluent_kafka_python.consumer_token is not None + assert confluent_kafka_python.consumer_token.get(None) is not None # Clear context clear_context() # Verify all context is cleared assert consumer_span.get(None) is None - assert confluent_kafka_python.consumer_token is None + assert confluent_kafka_python.consumer_token.get(None) is None def test_trace_kafka_close_exception_handling(self, span: "InstanaSpan") -> None: """Test trace_kafka_close handles exceptions and still cleans up spans.""" @@ -757,7 +757,7 @@ def test_trace_kafka_close_exception_handling(self, span: "InstanaSpan") -> None # Verify span is in context assert consumer_span.get(None) == span - assert confluent_kafka_python.consumer_token is not None + assert confluent_kafka_python.consumer_token.get(None) is not None # Mock a wrapped function that raises an exception mock_wrapped = Mock(side_effect=Exception("Close operation failed")) @@ -772,7 +772,7 @@ def test_trace_kafka_close_exception_handling(self, span: "InstanaSpan") -> None # Verify that despite the exception, the span was cleaned up assert consumer_span.get(None) is None - assert confluent_kafka_python.consumer_token is None + assert confluent_kafka_python.consumer_token.get(None) is None # Verify span was ended assert not span.is_recording()