From 40037de1ce48b37b77d0821ee9166ca4396ccf8b Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Fri, 6 Feb 2026 20:01:46 +0100 Subject: [PATCH] v3/bug/209 : Improve error handling and diagnostics in Streams/Kafka - Add robust error handling to Source/SinkOperatorAdapters using ErrorHandlingHelper.TryExecute and propagate StreamExecutionOptions. - Implement IErrorHandlingEnabled in SourceOperatorAdapter and ensure error handling is set on downstream operators. - Add comprehensive integration and diagnostic tests for KafkaSinkOperator, including scenarios for Start(), error strategies, and custom configs. - Introduce KafkaSinkDiagnostic console tool for troubleshooting Kafka sink usage. - Add extensive unit tests for source operator error handling (skip, stop, retry, custom handler, no hangs/loops, FlatMap). - Reference Cortex.Streams.Kafka in test project for integration coverage. --- .../Operators/SinkOperatorAdapter.cs | 34 +- .../Operators/SourceOperatorAdapter.cs | 51 +- src/Cortex.Tests/Cortex.Tests.csproj | 1 + .../KafkaSinkOperatorIntegrationTests.cs | 368 +++++++++++++ .../Tests/SourceOperatorErrorHandlingTests.cs | 504 ++++++++++++++++++ tools/KafkaSinkDiagnostic.cs | 145 +++++ 6 files changed, 1094 insertions(+), 9 deletions(-) create mode 100644 src/Cortex.Tests/Streams.Kafka/KafkaSinkOperatorIntegrationTests.cs create mode 100644 src/Cortex.Tests/Streams/Tests/SourceOperatorErrorHandlingTests.cs create mode 100644 tools/KafkaSinkDiagnostic.cs diff --git a/src/Cortex.Streams/Operators/SinkOperatorAdapter.cs b/src/Cortex.Streams/Operators/SinkOperatorAdapter.cs index 3492a53..92adde6 100644 --- a/src/Cortex.Streams/Operators/SinkOperatorAdapter.cs +++ b/src/Cortex.Streams/Operators/SinkOperatorAdapter.cs @@ -14,6 +14,9 @@ public class SinkOperatorAdapter : IOperator, IHasNextOperators, ITeleme { private readonly ISinkOperator _sinkOperator; + // Cached operator name to avoid string allocation on hot path + private static readonly string OperatorName = $"SinkOperatorAdapter<{typeof(TInput).Name}>"; + // Telemetry fields private ITelemetryProvider _telemetryProvider; private ICounter _processedCounter; @@ -22,6 +25,9 @@ public class SinkOperatorAdapter : IOperator, IHasNextOperators, ITeleme private Action _incrementProcessedCounter; private Action _recordProcessingTime; + // Error handling fields + private StreamExecutionOptions _executionOptions = StreamExecutionOptions.Default; + public SinkOperatorAdapter(ISinkOperator sinkOperator) { _sinkOperator = sinkOperator ?? throw new ArgumentNullException(nameof(sinkOperator)); @@ -51,9 +57,12 @@ public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) /// /// Forwards error handling configuration to the wrapped sink operator if it implements IErrorHandlingEnabled. + /// Also stores the options for use in this adapter's error handling. /// public void SetErrorHandling(StreamExecutionOptions options) { + _executionOptions = options ?? StreamExecutionOptions.Default; + // Forward error handling to the wrapped sink operator if it supports it if (_sinkOperator is IErrorHandlingEnabled errorHandlingEnabled) { @@ -71,8 +80,19 @@ public void Process(object input) { try { - _sinkOperator.Process((TInput)input); - span.SetAttribute("status", "success"); + var executed = ErrorHandlingHelper.TryExecute( + _executionOptions, + OperatorName, + input, + item => _sinkOperator.Process(item)); + + span.SetAttribute("status", executed ? "success" : "skipped"); + } + catch (StreamStoppedException ex) + { + span.SetAttribute("status", "stopped"); + span.SetAttribute("exception", ex.ToString()); + throw; } catch (Exception ex) { @@ -83,14 +103,18 @@ public void Process(object input) finally { stopwatch.Stop(); - _recordProcessingTime(stopwatch.Elapsed.TotalMilliseconds); - _incrementProcessedCounter(); + _recordProcessingTime?.Invoke(stopwatch.Elapsed.TotalMilliseconds); + _incrementProcessedCounter?.Invoke(); } } } else { - _sinkOperator.Process((TInput)input); + ErrorHandlingHelper.TryExecute( + _executionOptions, + OperatorName, + input, + item => _sinkOperator.Process(item)); } } diff --git a/src/Cortex.Streams/Operators/SourceOperatorAdapter.cs b/src/Cortex.Streams/Operators/SourceOperatorAdapter.cs index e43e33c..3cafe6b 100644 --- a/src/Cortex.Streams/Operators/SourceOperatorAdapter.cs +++ b/src/Cortex.Streams/Operators/SourceOperatorAdapter.cs @@ -6,11 +6,18 @@ namespace Cortex.Streams.Operators { - public class SourceOperatorAdapter : IOperator, IHasNextOperators, ITelemetryEnabled + /// + /// Adapter that wraps an ISourceOperator to work within the operator chain. + /// Handles telemetry, error handling, and lifecycle management for source operators. + /// + public class SourceOperatorAdapter : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled { private readonly ISourceOperator _sourceOperator; private IOperator _nextOperator; + // Cached operator name to avoid string allocation on hot path + private static readonly string OperatorName = $"SourceOperatorAdapter<{typeof(TOutput).Name}>"; + // Telemetry fields private ITelemetryProvider _telemetryProvider; private ICounter _emittedCounter; @@ -19,11 +26,28 @@ public class SourceOperatorAdapter : IOperator, IHasNextOperators, ITel private Action _incrementEmittedCounter; private Action _recordEmissionTime; + // Error handling fields + private StreamExecutionOptions _executionOptions = StreamExecutionOptions.Default; + public SourceOperatorAdapter(ISourceOperator sourceOperator) { _sourceOperator = sourceOperator; } + /// + /// Sets the error handling options for this operator and propagates to next operators. + /// + public void SetErrorHandling(StreamExecutionOptions options) + { + _executionOptions = options ?? StreamExecutionOptions.Default; + + // Propagate to the next operator if it supports error handling + if (_nextOperator is IErrorHandlingEnabled nextWithErrorHandling) + { + nextWithErrorHandling.SetErrorHandling(_executionOptions); + } + } + public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) { _telemetryProvider = telemetryProvider; @@ -67,6 +91,12 @@ public void SetNext(IOperator nextOperator) nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); } + // Propagate error handling to the next operator + if (_nextOperator is IErrorHandlingEnabled nextWithErrorHandling && _executionOptions != null) + { + nextWithErrorHandling.SetErrorHandling(_executionOptions); + } + // Start the source operator Start(); } @@ -84,8 +114,15 @@ private void Start() try { _incrementEmittedCounter?.Invoke(); - _nextOperator?.Process(output); - span.SetAttribute("status", "success"); + + // Use error handling helper to properly handle errors according to stream configuration + var executed = ErrorHandlingHelper.TryExecute( + _executionOptions, + OperatorName, + output, + item => _nextOperator?.Process(item)); + + span.SetAttribute("status", executed ? "success" : "skipped"); } catch (StreamStoppedException ex) { @@ -99,6 +136,7 @@ private void Start() { span.SetAttribute("status", "error"); span.SetAttribute("exception", ex.ToString()); + // Re-throw to let the source operator handle it (e.g., logging, stopping) throw; } finally @@ -112,7 +150,12 @@ private void Start() { try { - _nextOperator?.Process(output); + // Use error handling helper to properly handle errors according to stream configuration + ErrorHandlingHelper.TryExecute( + _executionOptions, + OperatorName, + output, + item => _nextOperator?.Process(item)); } catch (StreamStoppedException) { diff --git a/src/Cortex.Tests/Cortex.Tests.csproj b/src/Cortex.Tests/Cortex.Tests.csproj index 067a52f..24f83f5 100644 --- a/src/Cortex.Tests/Cortex.Tests.csproj +++ b/src/Cortex.Tests/Cortex.Tests.csproj @@ -30,6 +30,7 @@ + diff --git a/src/Cortex.Tests/Streams.Kafka/KafkaSinkOperatorIntegrationTests.cs b/src/Cortex.Tests/Streams.Kafka/KafkaSinkOperatorIntegrationTests.cs new file mode 100644 index 0000000..306c6d1 --- /dev/null +++ b/src/Cortex.Tests/Streams.Kafka/KafkaSinkOperatorIntegrationTests.cs @@ -0,0 +1,368 @@ +using Confluent.Kafka; +using Cortex.Streams.ErrorHandling; +using Cortex.Streams.Kafka; +using Microsoft.Extensions.Logging; +using Moq; +using System; +using System.Threading; +using Xunit; +using Xunit.Abstractions; + +namespace Cortex.Tests.Streams.Kafka +{ + /// + /// Integration tests for KafkaSinkOperator to diagnose message production issues. + /// + public class KafkaSinkOperatorIntegrationTests + { + private readonly ITestOutputHelper _output; + private readonly string _bootstrapServers = "145.239.0.42:29092"; + private readonly string _topic = "cortex.events_tests"; + + public KafkaSinkOperatorIntegrationTests(ITestOutputHelper output) + { + _output = output; + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void Process_ProducesMessage_WhenStartIsCalled() + { + // Arrange + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + // Act - Start is REQUIRED before processing + sinkOperator.Start(); + + var testMessage = $"Test message at {DateTime.UtcNow:O}"; + _output.WriteLine($"Sending message: {testMessage}"); + + sinkOperator.Process(testMessage); + + // Give Kafka time to produce + Thread.Sleep(2000); + + // Stop to flush pending messages + sinkOperator.Stop(); + sinkOperator.Dispose(); + + // Assert + _output.WriteLine("Message should be produced to Kafka"); + } + + [Fact] + public void Process_DoesNotProduceMessage_WhenStartIsNotCalled() + { + // Arrange + var mockLogger = new Mock>>(); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: mockLogger.Object); + + // Act - DO NOT call Start() + var testMessage = "This message should not be produced"; + sinkOperator.Process(testMessage); + + // Assert - Should log warning + mockLogger.Verify( + x => x.Log( + LogLevel.Warning, + It.IsAny(), + It.Is((v, t) => v.ToString().Contains("not running")), + It.IsAny(), + It.IsAny>()), + Times.Once); + + sinkOperator.Dispose(); + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void Process_WithErrorHandling_ProducesMessages() + { + // Arrange + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + // Configure error handling + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, + MaxRetries = 3, + RetryDelay = TimeSpan.FromMilliseconds(100), + OnError = ctx => + { + _output.WriteLine($"Error in {ctx.OperatorName}: {ctx.Exception.Message}"); + return ErrorHandlingDecision.Retry; + } + }; + sinkOperator.SetErrorHandling(executionOptions); + + // Act + sinkOperator.Start(); + + for (int i = 0; i < 5; i++) + { + var message = $"Message {i} at {DateTime.UtcNow:O}"; + _output.WriteLine($"Producing: {message}"); + sinkOperator.Process(message); + } + + // Give time for async production + Thread.Sleep(3000); + + sinkOperator.Stop(); + sinkOperator.Dispose(); + + // Assert + _output.WriteLine("All 5 messages should be produced to Kafka"); + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void Process_WithNullMessages_SkipsNulls() + { + // Arrange + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + // Act + sinkOperator.Start(); + + sinkOperator.Process("Valid message 1"); + sinkOperator.Process(null); // Should skip + sinkOperator.Process("Valid message 2"); + + Thread.Sleep(2000); + + sinkOperator.Stop(); + sinkOperator.Dispose(); + + // Assert + _output.WriteLine("Only 2 messages should be produced (nulls are skipped)"); + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void Process_WithCustomConfig_ProducesMessages() + { + // Arrange + var config = new ProducerConfig + { + BootstrapServers = _bootstrapServers, + Acks = Acks.All, + EnableIdempotence = true, + MaxInFlight = 5, + LingerMs = 0, // Send immediately for testing + CompressionType = CompressionType.None + }; + + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + config: config, + logger: logger); + + // Act + sinkOperator.Start(); + + var message = $"Custom config test at {DateTime.UtcNow:O}"; + _output.WriteLine($"Producing: {message}"); + sinkOperator.Process(message); + + Thread.Sleep(2000); + + sinkOperator.Stop(); + sinkOperator.Dispose(); + + // Assert + _output.WriteLine("Message should be produced with custom config"); + } + + [Fact] + public void Start_CanBeCalledMultipleTimes_WithoutError() + { + // Arrange + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic); + + // Act - Multiple starts should be safe + sinkOperator.Start(); + sinkOperator.Start(); // Should not throw + + // Assert + sinkOperator.Process("test"); + + sinkOperator.Stop(); + sinkOperator.Dispose(); + } + + [Fact] + public void Dispose_AfterStart_FlushesMessages() + { + // Arrange + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + // Act + sinkOperator.Start(); + sinkOperator.Process("Message before dispose"); + + // Dispose should flush + sinkOperator.Dispose(); + + // Assert - Should not throw + Assert.Throws(() => sinkOperator.Process("After dispose")); + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void FullStreamTest_WithKafkaSink() + { + // Arrange + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Skip, + OnError = ctx => + { + _output.WriteLine($"ERROR: {ctx.Exception.Message}"); + return ErrorHandlingDecision.Skip; + } + }; + + sinkOperator.SetErrorHandling(executionOptions); + + // Act - Simulate stream processing + sinkOperator.Start(); + + _output.WriteLine("=== Starting message production ==="); + + for (int i = 0; i < 10; i++) + { + var message = $"Stream message {i} at {DateTime.UtcNow:O}"; + _output.WriteLine($"Processing: {message}"); + sinkOperator.Process(message); + Thread.Sleep(100); // Small delay between messages + } + + _output.WriteLine("=== Stopping and flushing ==="); + sinkOperator.Stop(); + + _output.WriteLine("=== Disposing ==="); + sinkOperator.Dispose(); + + _output.WriteLine("=== Test complete ==="); + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void DiagnosticTest_CheckWhatIsHappening() + { + // This test helps diagnose exactly what's happening with the new implementation + var logger = new TestLogger>(_output); + + _output.WriteLine("=== Creating KafkaSinkOperator ==="); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + _output.WriteLine("=== Setting Error Handling ==="); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, + MaxRetries = 3, + RetryDelay = TimeSpan.FromSeconds(1), + OnError = ctx => + { + _output.WriteLine($"[ERROR HANDLER] Operator: {ctx.OperatorName}"); + _output.WriteLine($"[ERROR HANDLER] Exception: {ctx.Exception?.Message}"); + _output.WriteLine($"[ERROR HANDLER] Attempt: {ctx.Attempt}"); + return ErrorHandlingDecision.Retry; + } + }; + sinkOperator.SetErrorHandling(executionOptions); + + _output.WriteLine("=== Starting Operator (THIS IS CRITICAL!) ==="); + sinkOperator.Start(); + + _output.WriteLine("=== Processing Test Message ==="); + var testMessage = $"Diagnostic test at {DateTime.UtcNow:O}"; + _output.WriteLine($"Message content: {testMessage}"); + + try + { + sinkOperator.Process(testMessage); + _output.WriteLine("Process() completed without exception"); + } + catch (Exception ex) + { + _output.WriteLine($"Process() threw exception: {ex.GetType().Name}: {ex.Message}"); + } + + _output.WriteLine("=== Waiting 5 seconds for async production ==="); + Thread.Sleep(5000); + + _output.WriteLine("=== Stopping (flushes pending messages) ==="); + try + { + sinkOperator.Stop(); + _output.WriteLine("Stop() completed"); + } + catch (Exception ex) + { + _output.WriteLine($"Stop() threw exception: {ex.GetType().Name}: {ex.Message}"); + } + + _output.WriteLine("=== Disposing ==="); + sinkOperator.Dispose(); + + _output.WriteLine("=== Test Complete ==="); + _output.WriteLine(""); + _output.WriteLine("Check Kafka topic 'cortex.events_tests' for the message."); + } + } + + /// + /// Test logger that outputs to xUnit test output. + /// + internal class TestLogger : ILogger + { + private readonly ITestOutputHelper _output; + + public TestLogger(ITestOutputHelper output) + { + _output = output; + } + + public IDisposable BeginScope(TState state) => null; + + public bool IsEnabled(LogLevel logLevel) => true; + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) + { + var message = formatter(state, exception); + _output.WriteLine($"[{logLevel}] {message}"); + if (exception != null) + { + _output.WriteLine($"Exception: {exception}"); + } + } + } +} diff --git a/src/Cortex.Tests/Streams/Tests/SourceOperatorErrorHandlingTests.cs b/src/Cortex.Tests/Streams/Tests/SourceOperatorErrorHandlingTests.cs new file mode 100644 index 0000000..b6c6a15 --- /dev/null +++ b/src/Cortex.Tests/Streams/Tests/SourceOperatorErrorHandlingTests.cs @@ -0,0 +1,504 @@ +using Cortex.Streams; +using Cortex.Streams.ErrorHandling; +using Cortex.Streams.Operators; +using System.Collections.Concurrent; + +namespace Cortex.Streams.Tests +{ + /// + /// Tests for error handling with source operators. + /// These tests verify that: + /// - Source operators properly propagate error handling to downstream operators + /// - Skip strategy allows source operators to continue emitting after downstream errors + /// - Stop strategy properly stops source operators when errors occur + /// - Retry strategy works with source operators + /// - Source operators don't hang or loop when errors occur + /// + public class SourceOperatorErrorHandlingTests + { + #region Mock Source Operator + + /// + /// A test source operator that emits values on demand and tracks its state. + /// + private class TestSourceOperator : ISourceOperator + { + private Action _emit; + private bool _isRunning; + private readonly ConcurrentQueue _pendingItems = new(); + private readonly ManualResetEventSlim _emitEvent = new(false); + private Task _emitTask; + private CancellationTokenSource _cts; + + public bool IsRunning => _isRunning; + public int EmitCount { get; private set; } + public bool WasStoppedExplicitly { get; private set; } + + public void Start(Action emit) + { + _emit = emit ?? throw new ArgumentNullException(nameof(emit)); + _isRunning = true; + _cts = new CancellationTokenSource(); + + // Process pending items in background + _emitTask = Task.Run(() => + { + while (!_cts.Token.IsCancellationRequested) + { + _emitEvent.Wait(_cts.Token); + _emitEvent.Reset(); + + while (_pendingItems.TryDequeue(out var item) && !_cts.Token.IsCancellationRequested) + { + try + { + _emit(item); + EmitCount++; + } + catch (Exception) + { + // Source operators typically catch exceptions from emit + // to prevent the consume loop from crashing + } + } + } + }, _cts.Token); + } + + public void Stop() + { + WasStoppedExplicitly = true; + _isRunning = false; + _cts?.Cancel(); + _emitEvent.Set(); // Wake up the task to exit + } + + /// + /// Enqueue an item to be emitted by the source operator. + /// + public void EnqueueItem(T item) + { + _pendingItems.Enqueue(item); + _emitEvent.Set(); + } + + /// + /// Wait for all pending items to be processed. + /// + public async Task WaitForProcessingAsync(TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (_pendingItems.Count > 0 && DateTime.UtcNow < deadline) + { + await Task.Delay(10); + } + } + } + + /// + /// A simpler synchronous test source operator for immediate testing. + /// + private class SynchronousTestSourceOperator : ISourceOperator + { + private Action _emit; + public bool IsRunning { get; private set; } + public bool WasStoppedExplicitly { get; private set; } + public int EmitCount { get; private set; } + + public void Start(Action emit) + { + _emit = emit; + IsRunning = true; + } + + public void Stop() + { + WasStoppedExplicitly = true; + IsRunning = false; + } + + /// + /// Emit an item synchronously through the source operator. + /// + public void EmitItem(T item) + { + _emit(item); + EmitCount++; + } + } + + #endregion + + #region Skip Strategy Tests + + [Fact] + public void SourceOperator_WithSkipStrategy_ContinuesAfterDownstreamError() + { + // Arrange + var processedItems = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Skip + }; + + var stream = StreamBuilder + .CreateNewStream("SourceSkipTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + if (x == 2) throw new InvalidOperationException("Error on item 2"); + return x * 10; + }) + .Sink(x => processedItems.Add(x)) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + sourceOperator.EmitItem(2); // This should be skipped + sourceOperator.EmitItem(3); + sourceOperator.EmitItem(4); + + stream.Stop(); + + // Assert + Assert.Equal(new[] { 10, 30, 40 }, processedItems); + Assert.Equal(4, sourceOperator.EmitCount); // All items were emitted + Assert.False(sourceOperator.WasStoppedExplicitly || !sourceOperator.IsRunning == false); // Source was stopped by stream.Stop(), not by error + } + + [Fact] + public void SourceOperator_WithSkipStrategy_SkipsSinkErrors() + { + // Arrange + var processedItems = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Skip + }; + + var stream = StreamBuilder + .CreateNewStream("SourceSkipSinkTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => x * 10) + .Sink(x => + { + if (x == 20) throw new InvalidOperationException("Sink error on 20"); + processedItems.Add(x); + }) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + sourceOperator.EmitItem(2); // Sink will error on 20 + sourceOperator.EmitItem(3); + + stream.Stop(); + + // Assert + Assert.Equal(new[] { 10, 30 }, processedItems); + } + + #endregion + + #region Stop Strategy Tests + + [Fact] + public void SourceOperator_WithStopStrategy_StopsAfterError() + { + // Arrange + var processedItems = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Stop + }; + + var stream = StreamBuilder + .CreateNewStream("SourceStopTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + if (x == 2) throw new InvalidOperationException("Error on item 2"); + return x * 10; + }) + .Sink(x => processedItems.Add(x)) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + sourceOperator.EmitItem(2); // This should trigger stop + + // Assert + Assert.Equal(new[] { 10 }, processedItems); + Assert.True(sourceOperator.WasStoppedExplicitly); + } + + #endregion + + #region Retry Strategy Tests + + [Fact] + public void SourceOperator_WithRetryStrategy_RetriesAndSucceeds() + { + // Arrange + var attemptCount = 0; + var processedItems = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, + MaxRetries = 3, + RetryDelay = TimeSpan.Zero + }; + + var stream = StreamBuilder + .CreateNewStream("SourceRetryTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + attemptCount++; + if (attemptCount < 3 && x == 1) + throw new InvalidOperationException("Transient error"); + return x * 10; + }) + .Sink(x => processedItems.Add(x)) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + + stream.Stop(); + + // Assert + Assert.Equal(3, attemptCount); + Assert.Equal(new[] { 10 }, processedItems); + } + + [Fact] + public void SourceOperator_WithRetryStrategy_StopsWhenMaxRetriesExceeded() + { + // Arrange + var attemptCount = 0; + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, + MaxRetries = 2, + RetryDelay = TimeSpan.Zero + }; + + var stream = StreamBuilder + .CreateNewStream("SourceRetryExceededTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + attemptCount++; + throw new InvalidOperationException("Always fails"); +#pragma warning disable CS0162 + return x; +#pragma warning restore CS0162 + }) + .Sink(x => { }) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + + // Assert - Source should be stopped after max retries exceeded + Assert.True(sourceOperator.WasStoppedExplicitly); + Assert.Equal(2, attemptCount); + } + + #endregion + + #region Custom Error Handler Tests + + [Fact] + public void SourceOperator_WithCustomErrorHandler_ReceivesCorrectContext() + { + // Arrange + var capturedContexts = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + OnError = ctx => + { + capturedContexts.Add(ctx); + return ErrorHandlingDecision.Skip; + } + }; + + var stream = StreamBuilder + .CreateNewStream("SourceCustomHandlerTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + if (x == 2) throw new InvalidOperationException("Test error"); + return x * 10; + }) + .Sink(x => { }) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + sourceOperator.EmitItem(2); // Should trigger error handler + + stream.Stop(); + + // Assert + Assert.Single(capturedContexts); + Assert.Equal("SourceCustomHandlerTest", capturedContexts[0].StreamName); + Assert.Equal(2, capturedContexts[0].Input); + Assert.IsType(capturedContexts[0].Exception); + Assert.Equal(1, capturedContexts[0].Attempt); + } + + #endregion + + #region No Hanging/Looping Tests + + [Fact] + public async Task SourceOperator_WithErrors_DoesNotHang() + { + // Arrange + var sourceOperator = new TestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Skip + }; + + var stream = StreamBuilder + .CreateNewStream("SourceNoHangTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + if (x % 2 == 0) throw new InvalidOperationException("Even number error"); + return x * 10; + }) + .Sink(x => { }) + .Build(); + + stream.Start(); + + // Act - Emit many items including errors + for (int i = 1; i <= 100; i++) + { + sourceOperator.EnqueueItem(i); + } + + // Wait for processing with timeout - this should not hang + var completed = await Task.Run(async () => + { + await sourceOperator.WaitForProcessingAsync(TimeSpan.FromSeconds(5)); + return true; + }).WaitAsync(TimeSpan.FromSeconds(10)); + + stream.Stop(); + + // Assert + Assert.True(completed, "Processing should complete without hanging"); + } + + [Fact] + public async Task SourceOperator_WithStopStrategy_DoesNotLoop() + { + // Arrange + var emitCount = 0; + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Stop + }; + + var stream = StreamBuilder + .CreateNewStream("SourceNoLoopTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + Interlocked.Increment(ref emitCount); + throw new InvalidOperationException("Always fails"); +#pragma warning disable CS0162 + return x; +#pragma warning restore CS0162 + }) + .Sink(x => { }) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + + // Small delay to ensure no looping + await Task.Delay(100); + + // Assert - Should only process once, not loop + Assert.Equal(1, emitCount); + Assert.True(sourceOperator.WasStoppedExplicitly); + } + + #endregion + + #region Integration with FlatMap Operator + + [Fact] + public void SourceOperator_WithSkipStrategy_HandlesFlatMapErrors() + { + // Arrange + var processedItems = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Skip + }; + + var stream = StreamBuilder + .CreateNewStream("SourceFlatMapTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .FlatMap(x => + { + if (x == 2) throw new InvalidOperationException("FlatMap error"); + return new[] { x, x * 10 }; + }) + .Sink(x => processedItems.Add(x)) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + sourceOperator.EmitItem(2); // Should be skipped + sourceOperator.EmitItem(3); + + stream.Stop(); + + // Assert + Assert.Equal(new[] { 1, 10, 3, 30 }, processedItems); + } + + #endregion + } +} diff --git a/tools/KafkaSinkDiagnostic.cs b/tools/KafkaSinkDiagnostic.cs new file mode 100644 index 0000000..1cd4d08 --- /dev/null +++ b/tools/KafkaSinkDiagnostic.cs @@ -0,0 +1,145 @@ +using Cortex.Streams.ErrorHandling; +using Cortex.Streams.Kafka; +using System; +using System.Threading; + +namespace Cortex.DiagnosticTools +{ + /// + /// Diagnostic tool to demonstrate and fix Kafka message production issues. + /// + class KafkaSinkDiagnostic + { + static void Main(string[] args) + { + Console.WriteLine("===================================="); + Console.WriteLine("Kafka Sink Operator Diagnostic Tool"); + Console.WriteLine("===================================="); + Console.WriteLine(); + + var bootstrapServers = "localhost:29092"; + var topic = "cortex.events_tests"; + + Console.WriteLine($"Bootstrap Servers: {bootstrapServers}"); + Console.WriteLine($"Topic: {topic}"); + Console.WriteLine(); + + // Demonstrate the WRONG way (without calling Start()) + Console.WriteLine("=== Test 1: WITHOUT calling Start() - MESSAGES WILL NOT BE PRODUCED ==="); + TestWithoutStart(bootstrapServers, topic); + Console.WriteLine(); + + Thread.Sleep(1000); + + // Demonstrate the CORRECT way (with calling Start()) + Console.WriteLine("=== Test 2: WITH calling Start() - MESSAGES WILL BE PRODUCED ==="); + TestWithStart(bootstrapServers, topic); + Console.WriteLine(); + + Thread.Sleep(1000); + + // Demonstrate with error handling + Console.WriteLine("=== Test 3: WITH Start() and Error Handling ==="); + TestWithErrorHandling(bootstrapServers, topic); + Console.WriteLine(); + + Console.WriteLine("===================================="); + Console.WriteLine("Diagnostic Complete!"); + Console.WriteLine("===================================="); + Console.WriteLine(); + Console.WriteLine("Check your Kafka topic to see the messages."); + Console.WriteLine("Press any key to exit..."); + Console.ReadKey(); + } + + static void TestWithoutStart(string bootstrapServers, string topic) + { + Console.WriteLine("[CREATING OPERATOR]"); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: bootstrapServers, + topic: topic); + + Console.WriteLine("[PROCESSING MESSAGE - WITHOUT Start()]"); + sinkOperator.Process("This message WILL NOT be produced because Start() was not called!"); + + Console.WriteLine("[DISPOSING]"); + sinkOperator.Dispose(); + + Console.WriteLine("[RESULT] ? Message was NOT produced (operator not running)"); + } + + static void TestWithStart(string bootstrapServers, string topic) + { + Console.WriteLine("[CREATING OPERATOR]"); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: bootstrapServers, + topic: topic); + + Console.WriteLine("[CALLING Start() - THIS IS CRITICAL!]"); + sinkOperator.Start(); + + Console.WriteLine("[PROCESSING MESSAGE - WITH Start()]"); + var message = $"SUCCESS: Message sent at {DateTime.UtcNow:O}"; + sinkOperator.Process(message); + Console.WriteLine($" Content: {message}"); + + Console.WriteLine("[WAITING 2 seconds for async production]"); + Thread.Sleep(2000); + + Console.WriteLine("[STOPPING - Flushes pending messages]"); + sinkOperator.Stop(); + + Console.WriteLine("[DISPOSING]"); + sinkOperator.Dispose(); + + Console.WriteLine("[RESULT] ? Message WAS produced!"); + } + + static void TestWithErrorHandling(string bootstrapServers, string topic) + { + Console.WriteLine("[CREATING OPERATOR]"); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: bootstrapServers, + topic: topic); + + Console.WriteLine("[CONFIGURING ERROR HANDLING]"); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, + MaxRetries = 3, + RetryDelay = TimeSpan.FromSeconds(1), + OnError = ctx => + { + Console.WriteLine($" [ERROR HANDLER] Operator: {ctx.OperatorName}"); + Console.WriteLine($" [ERROR HANDLER] Exception: {ctx.Exception?.Message}"); + Console.WriteLine($" [ERROR HANDLER] Attempt: {ctx.Attempt}"); + return ErrorHandlingDecision.Retry; + } + }; + sinkOperator.SetErrorHandling(executionOptions); + + Console.WriteLine("[STARTING OPERATOR]"); + sinkOperator.Start(); + + Console.WriteLine("[PRODUCING 5 MESSAGES]"); + for (int i = 1; i <= 5; i++) + { + var message = $"Message #{i} at {DateTime.UtcNow:O}"; + Console.WriteLine($" Producing: {message}"); + sinkOperator.Process(message); + Thread.Sleep(100); + } + + Console.WriteLine("[WAITING 3 seconds for async production]"); + Thread.Sleep(3000); + + Console.WriteLine("[STOPPING]"); + sinkOperator.Stop(); + + Console.WriteLine("[DISPOSING]"); + sinkOperator.Dispose(); + + Console.WriteLine("[RESULT] ? All 5 messages WAS produced with error handling!"); + } + } +}