-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Stream transport refactor #20359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Stream transport refactor #20359
Conversation
📝 WalkthroughWalkthroughEnhances Arrow Flight stream transport robustness by introducing asynchronous stream initialization with header prefetching, replacing Optional-based root handling with nullable fields, removing ThreadContext dependencies in favor of async flow patterns, improving thread naming in event loops, and refining error handling throughout the transport layer. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant ClientChannel as FlightClientChannel
participant Executor
participant Stream as FlightStream
participant Header as Header
participant Handler as ResponseHandler
rect rgb(240, 248, 255)
Note over Client,Handler: NEW: Asynchronous Prefetch Flow
end
Client->>ClientChannel: openStreamAndInvokeHandler()
ClientChannel->>ClientChannel: Get executor (warn if SAME)
ClientChannel->>Stream: Open stream asynchronously
Stream->>Stream: Prefetch first header
rect rgb(255, 250, 240)
Note over Stream,Handler: Async header retrieval
Stream-->>Header: Header ready (CompletableFuture)
end
Header->>Executor: Schedule handler task
Executor->>Handler: Stash thread context
Handler->>Handler: Validate header
Handler->>Handler: Set headers on thread context
Handler->>Handler: Delegate to response handler
Handler-->>Client: Response with header metadata
rect rgb(240, 255, 240)
Note over Handler,Client: Exception path
Handler->>Handler: Clean up on error
Handler-->>Client: Propagate exception
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (1 warning, 2 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
f22b0e7 to
08c1ef7
Compare
* make FlightClientChannel sendMessage async and use of virtual threads * fix for headers when no batches are sent * set version from headers for incoming response deserialization Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
08c1ef7 to
6cf97cc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java (1)
258-260: Consider reducing log verbosity for SAME executor warning.This warning logs on every stream using the
SAMEexecutor. IfSAMEis a common or intentional choice for certain handlers, this could generate significant log noise. Consider:
- Using
debuglevel instead ofwarn- Logging only once per handler type
- Adding a rate limiter
🔎 Suggested change
if (ThreadPool.Names.SAME.equals(executor)) { - logger.warn("Stream transport handler using SAME executor, which may cause blocking behavior"); + logger.debug("Stream transport handler using SAME executor, which may cause blocking behavior"); }plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.java (1)
73-76: Emptyflush()may cause confusion.The
flush()method is now a no-op, but callers inheriting fromStreamOutputmight expect callingflush()to commit buffered data. The actual buffer flush only happens ingetRoot()or whenwriteBytes()is called. This could lead to unexpected behavior if a caller relies on explicit flushing.Consider either:
- Having
flush()callflushTempBuffer()to commit pending bytes- Adding a comment explaining the no-op behavior
🔎 Suggested fix
@Override public void flush() { - + flushTempBuffer(); }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
CHANGELOG.mdplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightStreamPlugin.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.javaplugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/ArrowStreamSerializationTests.javaplugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightClientChannelTests.javaplugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightTransportTestBase.java
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Applied to files:
plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightClientChannelTests.java
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
Applied to files:
plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightTransportTestBase.java
🧬 Code graph analysis (2)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java (1)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightUtils.java (1)
FlightUtils(13-30)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java (3)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ClientHeaderMiddleware.java (1)
ClientHeaderMiddleware(35-127)plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightErrorMapper.java (1)
FlightErrorMapper(31-116)plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightUtils.java (1)
FlightUtils(13-30)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: Analyze (java)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (25, windows-latest)
🔇 Additional comments (15)
CHANGELOG.md (1)
29-29: LGTM!The changelog entry is correctly formatted and appropriately placed in the Fixed section.
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.java (1)
207-208: LGTM!Exposing
FLIGHT_THREAD_POOL_MIN_SIZEin the settings list allows external configuration of the minimum thread pool size. The setting is properly defined (lines 71-76) and consumed in theinit()method (line 143).plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java (1)
218-218: LGTM!The change ensures headers are propagated when completing a stream, even when no batches were sent. This aligns with the PR objective and maintains consistency with the batch sending path (line 160).
plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightClientChannelTests.java (1)
558-561: LGTM!The enhanced error handling properly captures transport exceptions during framework-level stream creation and signals test completion. This ensures the test can reliably observe and verify framework-level errors (verified at line 577).
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightStreamPlugin.java (1)
367-368: LGTM! ExposingSETTING_FLIGHT_PUBLISH_PORTallows external configuration of the Flight publish port, following the same pattern as the existingSETTING_FLIGHT_PUBLISH_HOSTsetting. The setting is properly defined inServerComponents.javaas an integer setting with key"arrow.flight.publish_port"and default value-1, and is correctly integrated with the transport publish port resolution logic.plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/ArrowStreamSerializationTests.java (1)
53-53: LGTM!The constructor argument change from
Optional.empty()tonullcorrectly aligns with the refactoredVectorStreamOutputAPI that now accepts a nullableVectorSchemaRootinstead ofOptional<VectorSchemaRoot>.plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightTransportTestBase.java (1)
108-110: LGTM!The TaskManager mock setup correctly stubs
taskExecutionStarted(any())to return a mockStoredContext, providing the necessary scaffolding for tests that exercise the new async thread context handling flow inFlightClientChannel.plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.java (2)
136-136: Verify the impact of reducing worker thread count.The worker event loop group size was reduced from
availableProcessors() * 2toavailableProcessors(). This halves the number of worker threads available for handling I/O operations.While this may reduce resource consumption, it could potentially impact throughput under high concurrent load. Ensure this change has been benchmarked or is aligned with observed resource utilization patterns.
412-416: Good improvement for observability.The custom
ThreadFactorywith meaningful thread names (os-grpc-boss-ELG-N,os-grpc-worker-ELG-N) improves debuggability and makes it easier to identify Flight transport threads in thread dumps and monitoring tools.plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java (1)
115-119: Good approach for correlation ID uniqueness.The scheme combining timestamp (upper bits) with a 20-bit channel counter ensures uniqueness across multiple channels and time. However, note that the 20-bit channel ID will wrap after ~1 million channels, though the timestamp component should still maintain uniqueness in practice.
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java (2)
129-141: Good fix for header propagation.This change addresses the PR objective to "fix header propagation when no batches are sent." When
root == null(no batches were sent), the header is now explicitly set viamiddleware.setHeader(header)before completing the stream. This ensures headers are always propagated to the client regardless of whether data batches were transmitted.
51-51: Appropriate simplification.Replacing
Optional<VectorSchemaRoot>with a nullableVectorSchemaRootis appropriate here sinceOptionalwas being used for a mutable field rather than return values. Thevolatilemodifier ensures visibility across threads.plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.java (1)
44-71: Good buffering optimization.The buffering strategy effectively reduces vector operations for byte-at-a-time writes by accumulating them in an 8KB buffer before committing to the vector. The
writeBytespath correctly flushes any pending buffered data before performing the direct write.plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java (2)
89-109: Good use of virtual threads for async I/O.Using
Thread.ofVirtual()for the prefetch operation is appropriate as it moves potentially blocking gRPC I/O off the calling thread without the overhead of platform threads. This aligns with the PR objective to "make FlightClientChannel sendMessage asynchronous."However, note the timing calculation on lines 93 and 97:
117-143: Well-structured batch iteration with version propagation.The
nextResponse()method correctly:
- Uses the prefetched first batch without calling
next()again- Sets the version from the initial header for proper deserialization (addressing PR objective "Set version from headers for incoming response deserialization")
- Includes slow-log tracking for performance monitoring
Note: The ternary expression on line 124 assumes single-threaded consumption, which aligns with typical stream response usage patterns.
| Runnable task = () -> { | ||
| try (var ignored = threadContext.stashContext()) { | ||
| if (header == null) { | ||
| cleanupStreamResponse(streamResponse); | ||
| throw new StreamException(StreamErrorCode.INTERNAL, "Header is null"); | ||
| } | ||
| threadContext.setHeaders(header.getHeaders()); | ||
| handler.handleStreamResponse(streamResponse); | ||
| } catch (Exception e) { | ||
| cleanupStreamResponse(streamResponse); | ||
| throw e; | ||
| } | ||
| }; | ||
|
|
||
| if (ThreadPool.Names.SAME.equals(executor)) { | ||
| task.run(); | ||
| } else { | ||
| threadPool.executor(executor).execute(task); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception thrown inside executor task may go unhandled.
When header == null (line 274-277) or an exception occurs (line 280-283), the exception is re-thrown from within the Runnable. For the SAME executor case (line 287), this propagates up. For the thread pool executor case (line 289), the exception may be swallowed or only logged by the executor's uncaught exception handler.
Consider wrapping the task execution in a try-catch that routes exceptions through handleStreamException for consistent error handling:
🔎 Suggested fix
Runnable task = () -> {
try (var ignored = threadContext.stashContext()) {
if (header == null) {
cleanupStreamResponse(streamResponse);
- throw new StreamException(StreamErrorCode.INTERNAL, "Header is null");
+ handleStreamException(streamResponse, new StreamException(StreamErrorCode.INTERNAL, "Header is null"));
+ return;
}
threadContext.setHeaders(header.getHeaders());
handler.handleStreamResponse(streamResponse);
} catch (Exception e) {
cleanupStreamResponse(streamResponse);
- throw e;
+ handleStreamException(streamResponse, e);
}
};Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java
around lines 272 to 291, exceptions thrown inside the Runnable (e.g., when
header == null or other errors) are re-thrown and may be unhandled when executed
on a thread pool; wrap task execution so all exceptions are caught and delegated
to the existing handleStreamException mechanism (or equivalent error handler)
instead of re-throwing: modify the Runnable to catch Throwable, call
cleanupStreamResponse as needed, and invoke
handleStreamException(streamResponse, throwable) (or the proper handler) before
returning; also ensure the executor invocation (both SAME and
threadPool.executor(executor).execute(...)) does not re-throw but relies on this
catch-and-handle behavior.
| if ((System.nanoTime() - start) / 1_000_000 > 10) { | ||
| logger.debug( | ||
| "FlightClient.getStream() for correlationId: {} took {}ms", | ||
| correlationId, | ||
| (System.nanoTime() - start) / 1_000_000 | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timing calculation called twice.
System.nanoTime() is called twice—once for the threshold check (line 93) and again for logging (line 97). The logged value will be slightly larger than what was compared against the threshold. Store the elapsed time in a variable:
🔎 Suggested fix
- if ((System.nanoTime() - start) / 1_000_000 > 10) {
+ long elapsedMs = (System.nanoTime() - start) / 1_000_000;
+ if (elapsedMs > 10) {
logger.debug(
"FlightClient.getStream() for correlationId: {} took {}ms",
correlationId,
- (System.nanoTime() - start) / 1_000_000
+ elapsedMs
);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if ((System.nanoTime() - start) / 1_000_000 > 10) { | |
| logger.debug( | |
| "FlightClient.getStream() for correlationId: {} took {}ms", | |
| correlationId, | |
| (System.nanoTime() - start) / 1_000_000 | |
| ); | |
| long elapsedMs = (System.nanoTime() - start) / 1_000_000; | |
| if (elapsedMs > 10) { | |
| logger.debug( | |
| "FlightClient.getStream() for correlationId: {} took {}ms", | |
| correlationId, | |
| elapsedMs | |
| ); |
🤖 Prompt for AI Agents
In
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java
around lines 93 to 98, the code calls System.nanoTime() twice for the threshold
check and the debug log, causing the logged elapsed time to differ from the
value used in the comparison; compute System.nanoTime() once, store elapsedNanos
(or elapsedMillis) in a local variable, use that variable for both the > 10ms
check and the logger.debug call, and convert to milliseconds once for both
usages to ensure consistency.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #20359 +/- ##
============================================
- Coverage 73.30% 73.30% -0.01%
+ Complexity 71777 71745 -32
============================================
Files 5784 5784
Lines 328141 328133 -8
Branches 47269 47275 +6
============================================
- Hits 240531 240522 -9
+ Misses 68329 68308 -21
- Partials 19281 19303 +22 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
FlightClientChannelsendMessageasyncMinor
OptionalinFlightServerChannel.Description
[Describe what this change achieves]
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.
Summary by CodeRabbit
New Features
Bug Fixes
Chores
✏️ Tip: You can customize this high-level summary in your review settings.