Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions src/instana/instrumentation/kafka/confluent_kafka_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
from instana.span.span import InstanaSpan
from instana.util.traceutils import get_tracer_tuple

consumer_token = None
consumer_span = contextvars.ContextVar("confluent_kafka_consumer_span")
consumer_token = contextvars.ContextVar(
"confluent_kafka_consumer_token", default=None
)
consumer_span = contextvars.ContextVar(
"confluent_kafka_consumer_span", default=None
)

# As confluent_kafka is a wrapper around the C-developed librdkafka
# (provided automatically via binary wheels), we have to create new classes
Expand Down Expand Up @@ -178,24 +182,23 @@ def create_span(
) # pragma: no cover

def save_consumer_span_into_context(span: "InstanaSpan") -> None:
global consumer_token
ctx = trace.set_span_in_context(span)
consumer_token = context.attach(ctx)
token = context.attach(ctx)
consumer_token.set(token)
consumer_span.set(span)

def close_consumer_span(span: "InstanaSpan") -> None:
global consumer_token
if span.is_recording():
span.end()
consumer_span.set(None)
if consumer_token is not None:
context.detach(consumer_token)
consumer_token = None
token = consumer_token.get(None)
if token is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if token is not None:
if token:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PEP8 document refers using is not None for checking None conditions:

context.detach(token)
consumer_token.set(None)

def clear_context() -> None:
global consumer_token
context.attach(trace.set_span_in_context(None))
consumer_token = None
consumer_token.set(None)
consumer_span.set(None)

def trace_kafka_consume(
Expand Down Expand Up @@ -253,6 +256,10 @@ def trace_kafka_poll(
res = wrapped(*args, **kwargs)
if res:
create_span("poll", res.topic(), res.headers())
else:
span = consumer_span.get(None)
if span is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if span is not None:
if span:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PEP8 document refers using is not None for checking None conditions:

close_consumer_span(span)
return res
except Exception as exc:
exception = exc
Expand Down
16 changes: 8 additions & 8 deletions tests/clients/kafka/test_confluent_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,19 +709,19 @@ def test_save_consumer_span_into_context(self, span: "InstanaSpan") -> None:
"""Test save_consumer_span_into_context function."""
# Verify initial state
assert consumer_span.get(None) is None
assert confluent_kafka_python.consumer_token is None
assert confluent_kafka_python.consumer_token.get(None) is None

# Save span into context
save_consumer_span_into_context(span)

# Verify token is stored
assert confluent_kafka_python.consumer_token is not None
assert confluent_kafka_python.consumer_token.get(None) is not None

def test_close_consumer_span_recording_span(self, span: "InstanaSpan") -> None:
"""Test close_consumer_span with a recording span."""
# Save span into context first
save_consumer_span_into_context(span)
assert confluent_kafka_python.consumer_token is not None
assert confluent_kafka_python.consumer_token.get(None) is not None

# Verify span is recording
assert span.is_recording()
Expand All @@ -732,7 +732,7 @@ def test_close_consumer_span_recording_span(self, span: "InstanaSpan") -> None:
# Verify span was ended and context cleared
assert not span.is_recording()
assert consumer_span.get(None) is None
assert confluent_kafka_python.consumer_token is None
assert confluent_kafka_python.consumer_token.get(None) is None

def test_clear_context(self, span: "InstanaSpan") -> None:
"""Test clear_context function."""
Expand All @@ -741,14 +741,14 @@ def test_clear_context(self, span: "InstanaSpan") -> None:

# Verify context has data
assert consumer_span.get(None) == span
assert confluent_kafka_python.consumer_token is not None
assert confluent_kafka_python.consumer_token.get(None) is not None

# Clear context
clear_context()

# Verify all context is cleared
assert consumer_span.get(None) is None
assert confluent_kafka_python.consumer_token is None
assert confluent_kafka_python.consumer_token.get(None) is None

def test_trace_kafka_close_exception_handling(self, span: "InstanaSpan") -> None:
"""Test trace_kafka_close handles exceptions and still cleans up spans."""
Expand All @@ -757,7 +757,7 @@ def test_trace_kafka_close_exception_handling(self, span: "InstanaSpan") -> None

# Verify span is in context
assert consumer_span.get(None) == span
assert confluent_kafka_python.consumer_token is not None
assert confluent_kafka_python.consumer_token.get(None) is not None

# Mock a wrapped function that raises an exception
mock_wrapped = Mock(side_effect=Exception("Close operation failed"))
Expand All @@ -772,7 +772,7 @@ def test_trace_kafka_close_exception_handling(self, span: "InstanaSpan") -> None

# Verify that despite the exception, the span was cleaned up
assert consumer_span.get(None) is None
assert confluent_kafka_python.consumer_token is None
assert confluent_kafka_python.consumer_token.get(None) is None

# Verify span was ended
assert not span.is_recording()
Expand Down
Loading