-
Notifications
You must be signed in to change notification settings - Fork 9
Description
Summary
Add checkpointing capabilities to Cortex.Streams to enable fault-tolerant stream processing with exactly-once or at-least-once processing semantics, allowing streams to recover from failures without data loss.
Problem Statement
Currently, if a Cortex stream fails or is restarted:
- All in-flight data is lost: Events being processed are not recoverable
- Window state is lost: Aggregations in progress are discarded
- Source offsets are not tracked: Cannot resume from where processing stopped
- State stores may be inconsistent: Partial updates may have been applied
- No way to upgrade: Cannot update stream logic without losing state
Current Behavior
var stream = StreamBuilder<Order>.CreateNewStream("OrderProcessor")
.Stream(kafkaSource)
.Aggregate(
keySelector: o => o.CustomerId,
aggregateFunction: (acc, order) => acc + order.Amount,
stateStore: rocksDbStore)
.Sink(SendAlert)
.Build();
await stream.StartAsync();
// If the application crashes here...
// - Kafka offset is unknown (will re-read from beginning or miss messages)
// - RocksDB state may have partial aggregations
// - No way to recover to a consistent stateImpact
Without checkpointing:
- Production deployments risk data loss on failures
- Cannot guarantee exactly-once processing
- Rolling upgrades require manual state migration
- No disaster recovery capability
Acceptance Criteria
- Streams can be configured with checkpointing enabled
- Checkpoints are automatically created at configured intervals
- Streams automatically restore from latest checkpoint on startup
- Source offsets are captured and restored correctly
- Operator state is captured and restored correctly
- Old checkpoints are automatically cleaned up
- Manual checkpoint triggering is supported
- Restoration from specific checkpoint is supported
- At-least-once semantics work correctly
- Exactly-once semantics work with transactional sinks
- Telemetry/metrics for checkpoint operations
- Backward compatible - streams without checkpointing continue to work
Technical Considerations
-
Serialization: Need efficient serialization for state. Consider using MessagePack or protobuf.
-
Checkpoint Barriers: For exactly-once, need to inject barriers and ensure all operators snapshot at consistent points.
-
State Store Integration:
IDataStoreimplementations should support snapshotting. RocksDB has native snapshot support. -
Large State: For large state stores, consider incremental checkpoints (only changed data).
-
Async Snapshots: Snapshots should be non-blocking to minimize latency impact.
-
Failure During Checkpoint: If checkpoint fails, the stream should continue and retry.
-
Upgrade Compatibility: Consider state schema versioning for upgrades.