Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions src/Cortex.Streams/Operators/SinkOperatorAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public class SinkOperatorAdapter<TInput> : IOperator, IHasNextOperators, ITeleme
{
private readonly ISinkOperator<TInput> _sinkOperator;

// Cached operator name to avoid string allocation on hot path
private static readonly string OperatorName = $"SinkOperatorAdapter<{typeof(TInput).Name}>";

// Telemetry fields
private ITelemetryProvider _telemetryProvider;
private ICounter _processedCounter;
Expand All @@ -22,6 +25,9 @@ public class SinkOperatorAdapter<TInput> : IOperator, IHasNextOperators, ITeleme
private Action _incrementProcessedCounter;
private Action<double> _recordProcessingTime;

// Error handling fields
private StreamExecutionOptions _executionOptions = StreamExecutionOptions.Default;

public SinkOperatorAdapter(ISinkOperator<TInput> sinkOperator)
{
_sinkOperator = sinkOperator ?? throw new ArgumentNullException(nameof(sinkOperator));
Expand Down Expand Up @@ -51,9 +57,12 @@ public void SetTelemetryProvider(ITelemetryProvider telemetryProvider)

/// <summary>
/// Forwards error handling configuration to the wrapped sink operator if it implements IErrorHandlingEnabled.
/// Also stores the options for use in this adapter's error handling.
/// </summary>
public void SetErrorHandling(StreamExecutionOptions options)
{
_executionOptions = options ?? StreamExecutionOptions.Default;

// Forward error handling to the wrapped sink operator if it supports it
if (_sinkOperator is IErrorHandlingEnabled errorHandlingEnabled)
{
Expand All @@ -71,8 +80,19 @@ public void Process(object input)
{
try
{
_sinkOperator.Process((TInput)input);
span.SetAttribute("status", "success");
var executed = ErrorHandlingHelper.TryExecute<TInput>(
_executionOptions,
OperatorName,
input,
item => _sinkOperator.Process(item));

span.SetAttribute("status", executed ? "success" : "skipped");
}
catch (StreamStoppedException ex)
{
span.SetAttribute("status", "stopped");
span.SetAttribute("exception", ex.ToString());
throw;
}
catch (Exception ex)
{
Expand All @@ -83,14 +103,18 @@ public void Process(object input)
finally
{
stopwatch.Stop();
_recordProcessingTime(stopwatch.Elapsed.TotalMilliseconds);
_incrementProcessedCounter();
_recordProcessingTime?.Invoke(stopwatch.Elapsed.TotalMilliseconds);
_incrementProcessedCounter?.Invoke();
}
}
}
else
{
_sinkOperator.Process((TInput)input);
ErrorHandlingHelper.TryExecute<TInput>(
_executionOptions,
OperatorName,
input,
item => _sinkOperator.Process(item));
}
}

Expand Down
51 changes: 47 additions & 4 deletions src/Cortex.Streams/Operators/SourceOperatorAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@

namespace Cortex.Streams.Operators
{
public class SourceOperatorAdapter<TOutput> : IOperator, IHasNextOperators, ITelemetryEnabled
/// <summary>
/// Adapter that wraps an ISourceOperator to work within the operator chain.
/// Handles telemetry, error handling, and lifecycle management for source operators.
/// </summary>
public class SourceOperatorAdapter<TOutput> : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled
{
private readonly ISourceOperator<TOutput> _sourceOperator;
private IOperator _nextOperator;

// Cached operator name to avoid string allocation on hot path
private static readonly string OperatorName = $"SourceOperatorAdapter<{typeof(TOutput).Name}>";

// Telemetry fields
private ITelemetryProvider _telemetryProvider;
private ICounter _emittedCounter;
Expand All @@ -19,11 +26,28 @@ public class SourceOperatorAdapter<TOutput> : IOperator, IHasNextOperators, ITel
private Action _incrementEmittedCounter;
private Action<double> _recordEmissionTime;

// Error handling fields
private StreamExecutionOptions _executionOptions = StreamExecutionOptions.Default;

public SourceOperatorAdapter(ISourceOperator<TOutput> sourceOperator)
{
_sourceOperator = sourceOperator;
}

/// <summary>
/// Sets the error handling options for this operator and propagates to next operators.
/// </summary>
public void SetErrorHandling(StreamExecutionOptions options)
{
_executionOptions = options ?? StreamExecutionOptions.Default;

// Propagate to the next operator if it supports error handling
if (_nextOperator is IErrorHandlingEnabled nextWithErrorHandling)
{
nextWithErrorHandling.SetErrorHandling(_executionOptions);
}
}

public void SetTelemetryProvider(ITelemetryProvider telemetryProvider)
{
_telemetryProvider = telemetryProvider;
Expand Down Expand Up @@ -67,6 +91,12 @@ public void SetNext(IOperator nextOperator)
nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider);
}

// Propagate error handling to the next operator
if (_nextOperator is IErrorHandlingEnabled nextWithErrorHandling && _executionOptions != null)
{
nextWithErrorHandling.SetErrorHandling(_executionOptions);
}

// Start the source operator
Start();
}
Expand All @@ -84,8 +114,15 @@ private void Start()
try
{
_incrementEmittedCounter?.Invoke();
_nextOperator?.Process(output);
span.SetAttribute("status", "success");

// Use error handling helper to properly handle errors according to stream configuration
var executed = ErrorHandlingHelper.TryExecute<TOutput>(
_executionOptions,
OperatorName,
output,
item => _nextOperator?.Process(item));

span.SetAttribute("status", executed ? "success" : "skipped");
}
catch (StreamStoppedException ex)
{
Expand All @@ -99,6 +136,7 @@ private void Start()
{
span.SetAttribute("status", "error");
span.SetAttribute("exception", ex.ToString());
// Re-throw to let the source operator handle it (e.g., logging, stopping)
throw;
}
finally
Expand All @@ -112,7 +150,12 @@ private void Start()
{
try
{
_nextOperator?.Process(output);
// Use error handling helper to properly handle errors according to stream configuration
ErrorHandlingHelper.TryExecute<TOutput>(
_executionOptions,
OperatorName,
output,
item => _nextOperator?.Process(item));
}
catch (StreamStoppedException)
{
Expand Down
1 change: 1 addition & 0 deletions src/Cortex.Tests/Cortex.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<ProjectReference Include="..\Cortex.Mediator.Behaviors.Transactional\Cortex.Mediator.Behaviors.Transactional.csproj" />
<ProjectReference Include="..\Cortex.Mediator\Cortex.Mediator.csproj" />
<ProjectReference Include="..\Cortex.Serialization.Yaml\Cortex.Serialization.Yaml.csproj" />
<ProjectReference Include="..\Cortex.Streams.Kafka\Cortex.Streams.Kafka.csproj" />
<ProjectReference Include="..\Cortex.Streams.Mediator\Cortex.Streams.Mediator.csproj" />
<ProjectReference Include="..\Cortex.Streams\Cortex.Streams.csproj" />
<ProjectReference Include="..\Cortex.Telemetry\Cortex.Telemetry.csproj" />
Expand Down
Loading
Loading