Skip to content

Conversation

@Hailong-am
Copy link

@Hailong-am Hailong-am commented Jan 5, 2026

Description

Restore thread context for streaming when streaming is finished. The streaming processing is an async operation, the logic in transport action could manipulate thread context like stash/restore, that may lead the thread context polluted.

Related Issues

opensearch-project/OpenSearch-Dashboards#11090

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Summary by CodeRabbit

Bug Fixes

  • Fixed thread context restoration for streaming HTTP requests in Reactor Netty 4 transport, ensuring proper context handling and cleanup during streaming operations.

Tests

  • Added integration tests to validate thread context restoration behavior for streaming responses.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 5, 2026

📝 Walkthrough

Walkthrough

This PR fixes thread context restoration for streaming HTTP responses in Reactor Netty 4. It introduces thread context storage during streaming lifecycle, ensures proper cleanup after streaming completes, and adds integration testing with mock streaming handlers to validate the fix.

Changes

Cohort / File(s) Summary
Documentation
CHANGELOG.md
Added changelog entry documenting the thread context restoration fix for streaming responses.
Test Implementation
plugins/transport-reactor-netty4/src/internalClusterTest/java/.../ReactorNetty4ThreadContextRestorationIT.java
Added integration test class with MockStreamingPlugin (registers mock REST streaming endpoint) and MockStreamingRestHandler (simulates ML streaming behavior with SSE-like chunks and thread context restoration). Test validates that thread context is properly restored during streaming and no duplicate opaque ID errors occur.
HTTP Transport
plugins/transport-reactor-netty4/src/main/java/.../ReactorNetty4HttpServerTransport.java
Modified streaming response handling to store thread context at start and ensure cleanup after streaming completes by chaining finalization logic.
REST Channel Handling
server/src/main/java/org/opensearch/rest/RestController.java, server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableRestChannel.java
Added TraceableRestChannel unwrapping in RestController's streaming dispatch path to correctly access underlying StreamingRestChannel; added public unwrap() accessor method to TraceableRestChannel to expose the delegate channel.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Suggested labels

bug

Suggested reviewers

  • andrross
  • cwperks
  • reta

Poem

🐰 A thread's context was lost in the stream,
But now with our fix, it's restored to gleam!
Through Netty we store, then carefully free,
The context flows clean—no duplicates we see! ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 15.79% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The description provides the main objective and rationale, but the checkbox for 'Functionality includes testing' is unchecked despite the PR including an integration test. Mark the 'Functionality includes testing' checkbox as checked since ReactorNetty4ThreadContextRestorationIT integration test is included in the PR.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly describes the main change: restoring thread context for streaming operations, which directly aligns with the primary objective of the pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Contributor

github-actions bot commented Jan 5, 2026

❌ Gradle check result for ebff832: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@Hailong-am Hailong-am force-pushed the streaming_threadcontext branch from ebff832 to c35d432 Compare January 6, 2026 00:35
@Hailong-am Hailong-am changed the title restore threadcontext for streaming Fix: restore threadcontext for streaming Jan 6, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Jan 6, 2026

✅ Gradle check result for c35d432: SUCCESS

@codecov
Copy link

codecov bot commented Jan 6, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.24%. Comparing base (d866be8) to head (c35d432).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #20361      +/-   ##
============================================
- Coverage     73.30%   73.24%   -0.06%     
- Complexity    71777    71790      +13     
============================================
  Files          5784     5784              
  Lines        328141   328146       +5     
  Branches      47269    47270       +1     
============================================
- Hits         240531   240364     -167     
- Misses        68329    68580     +251     
+ Partials      19281    19202      -79     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Hailong Cui <ihailong@amazon.com>
Signed-off-by: Hailong Cui <ihailong@amazon.com>
@Hailong-am Hailong-am force-pushed the streaming_threadcontext branch 2 times, most recently from 02fdf8f to 8d7c105 Compare January 8, 2026 10:07
Signed-off-by: Hailong Cui <ihailong@amazon.com>
@Hailong-am Hailong-am force-pushed the streaming_threadcontext branch from 8d7c105 to 328583f Compare January 8, 2026 10:15
@Hailong-am Hailong-am marked this pull request as ready for review January 8, 2026 10:16
@Hailong-am Hailong-am requested a review from a team as a code owner January 8, 2026 10:16
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In
@plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java:
- Around line 73-81: The method featureFlagSettings() in
ReactorNetty4ThreadContextRestorationIT is intended to override the base
implementation but lacks the @Override annotation; add the @Override annotation
above the featureFlagSettings() method declaration to match other overrides and
enable compile-time signature checks.
- Around line 147-175: The code currently double-closes storedContext by using
try-with-resources (try (ThreadContext.StoredContext context = storedContext) {
... }) which auto-closes, and also calling storedContext.close() in doFinally;
remove the try-with-resources construct: replace the try(...) block with a plain
block that calls storedContext.restore() (use storedContext.restore() instead of
context.restore()) and remove the corresponding automatic close, leaving the
doFinally(signalType -> storedContext.close()) as the sole cleanup.
🧹 Nitpick comments (2)
plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java (2)

109-109: Unused logger instance.

The logger field is declared but never used in the class.

♻️ Remove unused logger or use it for error logging

Either remove the unused logger:

-        private final Logger logger = LogManager.getLogger(MockStreamingRestHandler.class);

Or use it in the empty catch block (see next comment).


180-182: Empty catch block silently swallows errors.

The catch block has only a comment. Use the declared logger to log the error for debugging.

♻️ Log the error
                             } catch (Exception e) {
-                                // Log error
+                                logger.warn("Failed to send error chunk", e);
                             }
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 733931b and 328583f.

📒 Files selected for processing (5)
  • CHANGELOG.md
  • plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java
  • server/src/main/java/org/opensearch/rest/RestController.java
  • server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableRestChannel.java
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.

Applied to files:

  • plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java
📚 Learning: 2025-12-12T13:31:51.234Z
Learnt from: andriyredko
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:642-646
Timestamp: 2025-12-12T13:31:51.234Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), the createBuilderWithPort() helper intentionally uses randomBoolean() for the HTTP/3 enabled setting to ensure all tests validate correct behavior with both HTTP/3 enabled and disabled configurations.

Applied to files:

  • plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.

Applied to files:

  • plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java
🧬 Code graph analysis (2)
plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java (2)
server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java (1)
  • FeatureFlagSettings (21-43)
server/src/main/java/org/opensearch/common/util/FeatureFlags.java (1)
  • FeatureFlags (29-341)
server/src/main/java/org/opensearch/rest/RestController.java (1)
server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableRestChannel.java (1)
  • TraceableRestChannel (27-118)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (21)
  • GitHub Check: gradle-check
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: Analyze (java)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: detect-breaking-change
  • GitHub Check: Mend Security Check
🔇 Additional comments (5)
CHANGELOG.md (1)

33-33: LGTM!

The changelog entry follows the established format and is correctly placed under the Fixed section.

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java (1)

385-400: Thread context restoration approach looks correct.

The StoredContext is properly created before streaming begins and closed after streaming completes via then(Mono.fromRunnable(storedContext::close)). The then() operator executes after both successful completion and error signals, ensuring cleanup happens in all cases.

One consideration: newStoredContext(false) means response headers won't be preserved. Verify this aligns with the intended behavior for streaming responses.

server/src/main/java/org/opensearch/rest/RestController.java (1)

349-352: LGTM!

The unwrapping logic correctly handles the case where TraceableRestChannel wraps a StreamingRestChannel. This ensures the streaming detection works properly when telemetry tracing is enabled.

server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableRestChannel.java (1)

62-68: LGTM!

The unwrap() method provides a clean way to access the underlying delegate channel, enabling proper streaming channel detection in RestController.

plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java (1)

213-240: Test logic looks sound for verifying thread context restoration.

The test sends multiple streaming requests and verifies that no duplicate X-Opaque-Id errors occur, which validates that thread context is properly restored between requests.

Comment on lines +73 to +81
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting<?> builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
// disable Telemetry setting
featureSettings.put(FeatureFlags.TELEMETRY_SETTING.getKey(), false);
return featureSettings.build();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find the test class and its parent
rg -n "class ReactorNetty4ThreadContextRestorationIT" --type java -A 3

# Search for featureFlagSettings method definitions in the codebase
rg -n "protected.*Settings.*featureFlagSettings" --type java -B 2 -A 5

# Check for invocations of featureFlagSettings
rg -n "featureFlagSettings()" --type java

Repository: opensearch-project/OpenSearch

Length of output: 19384


🏁 Script executed:

#!/bin/bash
# Find OpenSearchReactorNetty4IntegTestCase definition
rg -n "class OpenSearchReactorNetty4IntegTestCase" --type java -A 5

# Check its parent
rg -n "OpenSearchReactorNetty4IntegTestCase" --type java -B 2 | head -30

Repository: opensearch-project/OpenSearch

Length of output: 5410


Add @Override annotation to featureFlagSettings() method.

The method overrides the base implementation from OpenSearchIntegTestCase and is properly invoked by the test framework. However, it is missing the @Override annotation. Every other featureFlagSettings() override in the codebase includes this annotation for consistency and to enable compile-time detection of signature mismatches.

🤖 Prompt for AI Agents
In
@plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java
around lines 73 - 81, The method featureFlagSettings() in
ReactorNetty4ThreadContextRestorationIT is intended to override the base
implementation but lacks the @Override annotation; add the @Override annotation
above the featureFlagSettings() method declaration to match other overrides and
enable compile-time signature checks.

Comment on lines +147 to +175
Flux.from(streamingChannel).ofType(HttpChunk.class).collectList().flatMap(chunks -> {
try (ThreadContext.StoredContext context = storedContext) {
context.restore();

String opaqueId = request.header(Task.X_OPAQUE_ID);
streamingChannel.sendChunk(
createHttpChunk("data: {\"status\":\"streaming\",\"opaque_id\":\"" + opaqueId + "\"}\n\n", false)
);

final CompletableFuture<HttpChunk> future = new CompletableFuture<>();

Flux.just(
createHttpChunk("data: {\"content\":\"test chunk 1\"}\n\n", false),
createHttpChunk("data: {\"content\":\"test chunk 2\"}\n\n", false),
createHttpChunk("data: {\"content\":\"final chunk\",\"is_last\":true}\n\n", true)
)
.delayElements(Duration.ofMillis(100))
.doOnNext(streamingChannel::sendChunk)
.doOnComplete(() -> future.complete(createHttpChunk("", true)))
.doOnError(future::completeExceptionally)
.subscribe(); // Simulate streaming delay

return Mono.fromCompletionStage(future);
} catch (Exception e) {
return Mono.error(e);
}
})
.doOnNext(streamingChannel::sendChunk)
.doFinally(signalType -> storedContext.close()) // Restore context when done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Double-close of StoredContext detected.

The storedContext is closed twice:

  1. Line 148: try (ThreadContext.StoredContext context = storedContext) auto-closes at block exit
  2. Line 175: doFinally(signalType -> storedContext.close()) closes again

This can lead to unexpected behavior. Consider removing one of the close mechanisms.

🐛 Proposed fix

Remove the try-with-resources and rely solely on doFinally for cleanup:

                     Flux.from(streamingChannel).ofType(HttpChunk.class).collectList().flatMap(chunks -> {
-                        try (ThreadContext.StoredContext context = storedContext) {
-                            context.restore();
+                        try {
+                            storedContext.restore();
 
                             String opaqueId = request.header(Task.X_OPAQUE_ID);
                             streamingChannel.sendChunk(
@@ -168,7 +168,7 @@ public class ReactorNetty4ThreadContextRestorationIT extends OpenSearchReactorNe
                         }
                     })
                         .doOnNext(streamingChannel::sendChunk)
-                        .doFinally(signalType -> storedContext.close()) // Restore context when done
+                        .doFinally(signalType -> storedContext.close())
                         .onErrorResume(ex -> {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Flux.from(streamingChannel).ofType(HttpChunk.class).collectList().flatMap(chunks -> {
try (ThreadContext.StoredContext context = storedContext) {
context.restore();
String opaqueId = request.header(Task.X_OPAQUE_ID);
streamingChannel.sendChunk(
createHttpChunk("data: {\"status\":\"streaming\",\"opaque_id\":\"" + opaqueId + "\"}\n\n", false)
);
final CompletableFuture<HttpChunk> future = new CompletableFuture<>();
Flux.just(
createHttpChunk("data: {\"content\":\"test chunk 1\"}\n\n", false),
createHttpChunk("data: {\"content\":\"test chunk 2\"}\n\n", false),
createHttpChunk("data: {\"content\":\"final chunk\",\"is_last\":true}\n\n", true)
)
.delayElements(Duration.ofMillis(100))
.doOnNext(streamingChannel::sendChunk)
.doOnComplete(() -> future.complete(createHttpChunk("", true)))
.doOnError(future::completeExceptionally)
.subscribe(); // Simulate streaming delay
return Mono.fromCompletionStage(future);
} catch (Exception e) {
return Mono.error(e);
}
})
.doOnNext(streamingChannel::sendChunk)
.doFinally(signalType -> storedContext.close()) // Restore context when done
Flux.from(streamingChannel).ofType(HttpChunk.class).collectList().flatMap(chunks -> {
try {
storedContext.restore();
String opaqueId = request.header(Task.X_OPAQUE_ID);
streamingChannel.sendChunk(
createHttpChunk("data: {\"status\":\"streaming\",\"opaque_id\":\"" + opaqueId + "\"}\n\n", false)
);
final CompletableFuture<HttpChunk> future = new CompletableFuture<>();
Flux.just(
createHttpChunk("data: {\"content\":\"test chunk 1\"}\n\n", false),
createHttpChunk("data: {\"content\":\"test chunk 2\"}\n\n", false),
createHttpChunk("data: {\"content\":\"final chunk\",\"is_last\":true}\n\n", true)
)
.delayElements(Duration.ofMillis(100))
.doOnNext(streamingChannel::sendChunk)
.doOnComplete(() -> future.complete(createHttpChunk("", true)))
.doOnError(future::completeExceptionally)
.subscribe(); // Simulate streaming delay
return Mono.fromCompletionStage(future);
} catch (Exception e) {
return Mono.error(e);
}
})
.doOnNext(streamingChannel::sendChunk)
.doFinally(signalType -> storedContext.close())
🤖 Prompt for AI Agents
In
@plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java
around lines 147 - 175, The code currently double-closes storedContext by using
try-with-resources (try (ThreadContext.StoredContext context = storedContext) {
... }) which auto-closes, and also calling storedContext.close() in doFinally;
remove the try-with-resources construct: replace the try(...) block with a plain
block that calls storedContext.restore() (use storedContext.restore() instead of
context.restore()) and remove the corresponding automatic close, leaving the
doFinally(signalType -> storedContext.close()) as the sole cleanup.

@github-actions
Copy link
Contributor

github-actions bot commented Jan 8, 2026

❌ Gradle check result for 328583f: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?


if (handler.supportsStreaming()) {
// for the case TraceableRestChannel wrapped steamingRestChannel, we need to unwrap it
if (channel instanceof TraceableRestChannel traceableRestChannel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch with TraceableRestChannel, to address this issue, we would need:

  • introduce TraceableStreamingRestChannel that implements StreamingRestChannel
  • change TraceableRestChannel to return either TraceableRestChannel or TraceableStreamingRestChannel

request.params()
);
if (dispatchHandlerOpt.map(RestHandler::supportsStreaming).orElse(false)) {
final ThreadContext.StoredContext storedContext = threadPool.getThreadContext().newStoredContext(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's find out the cause first before doing that, the polluted thread context may pop up in other places and we should understand first why it happens.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the test cass could reproduce it and you could help to check whether other place could potentially have same issue

StreamingRestChannel streamingChannel = (StreamingRestChannel) channel;

// Mimic ML streaming behavior - store thread context
final ThreadContext.StoredContext storedContext = client.threadPool().getThreadContext().newStoredContext(true);
Copy link
Contributor

@reta reta Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hailong-am OK, preliminary, I think the issue here is coming from improper ThreadContext manipulation within streaming handler implementation (in this case, ml-common): the client.threadPool().getThreadContext().newStoredContext(true) , context.restore(); and storedContext.close() are very likely executed in different threads, hence not restored properly. I need a bit more time to confirm that and suggest a fix if that is the issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the proper way to manage the context in this case would be;

final Supplier<ThreadContext.StoredContext> restorable = client
    .threadPool()
    .getThreadContext()
    .newRestorableContext(true);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding. It turns out the way we used to managing thread context is not the right way and make the thread context not well restored.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta with the correct way to manage context i don't have duplicate header issue anymore. Do you think this PR is still valid and worked as a fallback to clean up thread context?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hailong-am we definitely should not do any fallbacks to clean the context - it should be fixed, not workarounded. But tracing related finding (https://github.com/opensearch-project/OpenSearch/pull/20361/changes#r2672199308) would certainly worth addressing. Thank you.


// Use Flux pattern like ML streaming action
Flux.from(streamingChannel).ofType(HttpChunk.class).collectList().flatMap(chunks -> {
try (ThreadContext.StoredContext context = storedContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And than just use it like this:

try (ThreadContext.StoredContext ctx = restorable.get()) {
   .... // no need to call `context.restore();` 
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants