-
Notifications
You must be signed in to change notification settings - Fork 25
[CRE-1601] shard-orchestrator implementation #1747
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
3d888b8 to
aecf0cb
Compare
aecf0cb to
e193376
Compare
bcdcbcb to
4526ebc
Compare
4526ebc to
273098b
Compare
273098b to
f78484e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements the ShardOrchestrator gRPC service that exposes workflow-to-shard routing decisions from Ring OCR consensus to other shards in the system. The implementation moves the proto definition from the Ring package to a dedicated shardorchestrator package and establishes a dual-update pattern where Ring's transmitter updates both its internal store and the ShardOrchestrator store for external consumption.
Key changes:
- Moved proto definitions and generated code to
shardorchestrator/pb/package for better separation of concerns - Implemented thread-safe in-memory store with version tracking for cache invalidation
- Created gRPC server and client implementations for cross-shard communication
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
pkg/workflows/shardorchestrator/store.go |
New in-memory store managing workflow-to-shard mappings with version tracking |
pkg/workflows/shardorchestrator/service.go |
gRPC server implementation exposing workflow mappings and accepting shard registrations |
pkg/workflows/shardorchestrator/client.go |
gRPC client for querying workflow mappings from shard 0 |
pkg/workflows/shardorchestrator/pb/*.proto |
Relocated proto definitions from ring package with timestamp fields removed |
pkg/workflows/ring/transmitter.go |
Updated to perform dual updates to both ring store and orchestrator store |
pkg/workflows/ring/factory.go |
Updated constructor to accept orchestrator store parameter |
pkg/workflows/shardorchestrator/*_test.go |
Comprehensive test coverage for store, service, client, and integration scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| existingMapping, err := t.shardOrchestratorStore.GetWorkflowMapping(ctx, workflowID) | ||
| if err != nil { | ||
| // New workflow - no previous assignment |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment "New workflow - no previous assignment" is misleading because the error could represent any retrieval failure, not just missing workflows. Consider checking specifically for "not found" errors versus other error types, or update the comment to reflect that any error is treated as a new workflow.
| // New workflow - no previous assignment | |
| // Treat any retrieval error as no previous assignment (new workflow or store failure) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't agree, the GetWorkflowMapping implementation only fails if the workflowID is not present in the map. the comment is correct
|
|
||
| if err := t.shardOrchestratorStore.BatchUpdateWorkflowMappings(ctx, mappings); err != nil { | ||
| t.lggr.Errorw("failed to update ShardOrchestrator store", "err", err, "workflowCount", len(mappings)) | ||
| // Don't fail the entire transmission if ShardOrchestrator update fails |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment "Don't fail the entire transmission" explains the behavior but doesn't explain the rationale. Consider documenting why it's safe to continue despite this failure, such as "ShardOrchestrator updates are for external consumption only and don't affect Ring's internal consensus state."
| // Don't fail the entire transmission if ShardOrchestrator update fails | |
| // Don't fail the entire transmission if ShardOrchestrator update fails. | |
| // The Ring store is the authoritative source of routing state and workflow mappings. | |
| // ShardOrchestrator updates are best-effort, external-facing metadata that do not | |
| // participate in Ring's internal consensus or routing decisions, so failing here | |
| // would unnecessarily impact consensus progress without improving safety. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't agree, its too verbose. the original comment explain why we don't return and it makes clear this is intentional.
mchain0
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls just fix suggestions from copilot
| // Determine if system is in transition state | ||
| systemInTransition := false | ||
| if outcome.State != nil { | ||
| if _, ok := outcome.State.State.(*pb.RoutingState_Transition); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should log a warning if the conversion fails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not converting it, is a type assertion that is basically asking: "is the state of type *pb.RoutingState_Transition? if so I'll mark the systemInTranstion = true"
| WorkflowID string | ||
| OldShardID uint32 | ||
| NewShardID uint32 | ||
| TransitionState TransitionState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm late to the game here but why are we storing the state separately for each workflow? The whole system is either in steady or transitioning state.
ShardOrchestrator gRPC Service Implementation
Implements the ShardOrchestrator gRPC service that exposes workflow-to-shard routing decisions from Ring OCR consensus to other shards in the system.
Changes
shard_orchestrator.protofromring/pb/→shardorchestrator/pb/Reason: Proto should live with its service implementation, not in the Ring OCR package
gRPC
Test Helpers (test_adapter.go): Test utilities for simulating Ring OCR updates
Ring OCR consensus → Ring Transmitter → Dual update:
CRE-1601
Requires
Supports