diff --git a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs index 9b850b0..192bf3f 100644 --- a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs +++ b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs @@ -14,7 +14,7 @@ namespace Cortex.Streams.Mediator.Behaviors public class StreamEmittingCommandBehavior : ICommandPipelineBehavior where TCommand : ICommand { - private readonly IStream, CommandExecutionEvent> _stream; + private readonly IStream> _stream; private readonly bool _emitBeforeExecution; private readonly bool _emitAfterExecution; @@ -25,7 +25,7 @@ public class StreamEmittingCommandBehavior : ICommandPipeline /// If true, emit an event before command execution. /// If true, emit an event after command execution. public StreamEmittingCommandBehavior( - IStream, CommandExecutionEvent> stream, + IStream> stream, bool emitBeforeExecution = false, bool emitAfterExecution = true) { diff --git a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs index 5c585b5..75cf796 100644 --- a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs +++ b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs @@ -18,7 +18,7 @@ namespace Cortex.Streams.Mediator.Behaviors public class StreamEmittingNotificationBehavior : INotificationPipelineBehavior where TNotification : INotification { - private readonly IStream, NotificationEvent> _stream; + private readonly IStream> _stream; private readonly bool _emitBeforeHandling; private readonly bool _emitAfterHandling; @@ -29,7 +29,7 @@ public class StreamEmittingNotificationBehavior : INotificationPi /// If true, emit an event before notification handling. /// If true, emit an event after notification handling. public StreamEmittingNotificationBehavior( - IStream, NotificationEvent> stream, + IStream> stream, bool emitBeforeHandling = false, bool emitAfterHandling = true) { diff --git a/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs b/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs index 6a24dae..243fe66 100644 --- a/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs @@ -22,7 +22,7 @@ public static class ServiceCollectionExtensions /// The service collection for chaining. public static IServiceCollection AddStreamEmittingNotificationHandler( this IServiceCollection services, - Func> streamFactory, + Func> streamFactory, Action errorHandler = null) where TNotification : INotification { @@ -47,7 +47,7 @@ public static IServiceCollection AddStreamEmittingNotificationHandlerThe service collection for chaining. public static IServiceCollection AddTransformingStreamNotificationHandler( this IServiceCollection services, - Func> streamFactory, + Func> streamFactory, Func transformer, Action errorHandler = null) where TNotification : INotification diff --git a/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs b/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs index 3f6a313..52e572e 100644 --- a/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs +++ b/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs @@ -17,7 +17,7 @@ namespace Cortex.Streams.Mediator.Handlers public abstract class StreamBackedStreamQueryHandler : IStreamQueryHandler where TQuery : IStreamQuery { - private readonly IStream _stream; + private readonly IStream _stream; private readonly int _channelCapacity; /// @@ -25,7 +25,7 @@ public abstract class StreamBackedStreamQueryHandler : IStreamQ /// /// The Cortex Stream to read data from. /// The capacity of the internal channel buffer. Default is 100. - protected StreamBackedStreamQueryHandler(IStream stream, int channelCapacity = 100) + protected StreamBackedStreamQueryHandler(IStream stream, int channelCapacity = 100) { _stream = stream ?? throw new ArgumentNullException(nameof(stream)); _channelCapacity = channelCapacity; diff --git a/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs b/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs index 6a0c2c9..f6dfe01 100644 --- a/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs +++ b/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs @@ -13,7 +13,7 @@ namespace Cortex.Streams.Mediator.Handlers public class StreamEmittingNotificationHandler : INotificationHandler where TNotification : INotification { - private readonly IStream _stream; + private readonly IStream _stream; private readonly Action _errorHandler; /// @@ -22,7 +22,7 @@ public class StreamEmittingNotificationHandler : INotificationHan /// The stream to emit notifications to. /// Optional handler for errors during emission. public StreamEmittingNotificationHandler( - IStream stream, + IStream stream, Action errorHandler = null) { _stream = stream ?? throw new ArgumentNullException(nameof(stream)); @@ -62,7 +62,7 @@ public async Task Handle(TNotification notification, CancellationToken cancellat public class TransformingStreamNotificationHandler : INotificationHandler where TNotification : INotification { - private readonly IStream _stream; + private readonly IStream _stream; private readonly Func _transformer; private readonly Action _errorHandler; @@ -73,7 +73,7 @@ public class TransformingStreamNotificationHandler /// A function to transform notifications into stream input. /// Optional handler for errors during emission. public TransformingStreamNotificationHandler( - IStream stream, + IStream stream, Func transformer, Action errorHandler = null) { diff --git a/src/Cortex.Streams/Abstractions/IBranchInfo.cs b/src/Cortex.Streams/Abstractions/IBranchInfo.cs new file mode 100644 index 0000000..d801ce4 --- /dev/null +++ b/src/Cortex.Streams/Abstractions/IBranchInfo.cs @@ -0,0 +1,15 @@ +namespace Cortex.Streams.Abstractions +{ + /// + /// Provides information about a branch in the stream processing pipeline. + /// This is a non-generic interface that allows accessing branch metadata + /// without exposing internal type parameters. + /// + public interface IBranchInfo + { + /// + /// Gets the name of the branch. + /// + string BranchName { get; } + } +} diff --git a/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs b/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs index 031b802..0dfd645 100644 --- a/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs @@ -126,6 +126,6 @@ public interface IFanOutBuilder /// /// The built stream instance ready to be started. /// Thrown when no sinks have been configured. - IStream Build(); + IStream Build(); } } diff --git a/src/Cortex.Streams/Abstractions/ISinkBuilder.cs b/src/Cortex.Streams/Abstractions/ISinkBuilder.cs index 54e35dd..3dab53c 100644 --- a/src/Cortex.Streams/Abstractions/ISinkBuilder.cs +++ b/src/Cortex.Streams/Abstractions/ISinkBuilder.cs @@ -4,13 +4,13 @@ /// Provides a method to build the stream after adding a sink. /// /// The type of the initial input to the stream. - /// The current type of data in the stream. + /// The current type of data in the stream (internal use only). public interface ISinkBuilder { /// /// Builds the stream and returns a stream instance that can be started and stopped. /// /// A stream instance. - IStream Build(); + IStream Build(); } } diff --git a/src/Cortex.Streams/Abstractions/IStream.cs b/src/Cortex.Streams/Abstractions/IStream.cs index cb96d83..8e3523c 100644 --- a/src/Cortex.Streams/Abstractions/IStream.cs +++ b/src/Cortex.Streams/Abstractions/IStream.cs @@ -1,5 +1,5 @@ using Cortex.States; -using Cortex.Streams.Operators; +using Cortex.Streams.Abstractions; using Cortex.Streams.Performance; using System.Collections.Generic; using System.Threading; @@ -7,7 +7,7 @@ namespace Cortex.Streams { - public interface IStream + public interface IStream { /// /// Start the stream processing. @@ -66,7 +66,7 @@ public interface IStream StreamStatuses GetStatus(); - IReadOnlyDictionary> GetBranches(); + IReadOnlyDictionary GetBranches(); TStateStore GetStateStoreByName(string name) where TStateStore : IDataStore; IEnumerable GetStateStoresByType() where TStateStore : IDataStore; diff --git a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs index 0b14ffc..aae8d7a 100644 --- a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs @@ -107,7 +107,7 @@ public interface IStreamBuilder /// Builds the stream /// /// - IStream Build(); + IStream Build(); /// diff --git a/src/Cortex.Streams/FanOutBuilder.cs b/src/Cortex.Streams/FanOutBuilder.cs index 2507f34..620cd07 100644 --- a/src/Cortex.Streams/FanOutBuilder.cs +++ b/src/Cortex.Streams/FanOutBuilder.cs @@ -166,7 +166,7 @@ public IFanOutBuilder ToWithTransform(string name, Func< } /// - public IStream Build() + public IStream Build() { if (_branchOperators.Count == 0) { diff --git a/src/Cortex.Streams/Operators/BranchOperator.cs b/src/Cortex.Streams/Operators/BranchOperator.cs index d10f02a..4395427 100644 --- a/src/Cortex.Streams/Operators/BranchOperator.cs +++ b/src/Cortex.Streams/Operators/BranchOperator.cs @@ -1,4 +1,5 @@ -using Cortex.Streams.ErrorHandling; +using Cortex.Streams.Abstractions; +using Cortex.Streams.ErrorHandling; using Cortex.Telemetry; using System; using System.Collections.Generic; @@ -10,7 +11,7 @@ namespace Cortex.Streams.Operators /// Represents a branch in a fan-out pattern that processes data independently. /// Forwards telemetry and error handling configuration to the branch's operator chain. /// - public class BranchOperator : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled + public class BranchOperator : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled, IBranchInfo { private readonly string _branchName; private readonly IOperator _branchOperator; diff --git a/src/Cortex.Streams/SinkBuilder.cs b/src/Cortex.Streams/SinkBuilder.cs index 0e4df77..785e17d 100644 --- a/src/Cortex.Streams/SinkBuilder.cs +++ b/src/Cortex.Streams/SinkBuilder.cs @@ -43,7 +43,7 @@ public SinkBuilder( /// Builds the stream and returns a stream instance. /// /// A stream instance. - public IStream Build() + public IStream Build() { return new Stream(_name, _firstOperator, _branchOperators, _telemetryProvider, _executionOptions, _performanceOptions); } diff --git a/src/Cortex.Streams/Stream.cs b/src/Cortex.Streams/Stream.cs index 8f41cd1..8aebf6f 100644 --- a/src/Cortex.Streams/Stream.cs +++ b/src/Cortex.Streams/Stream.cs @@ -1,5 +1,6 @@ using Cortex.States; using Cortex.States.Operators; +using Cortex.Streams.Abstractions; using Cortex.Streams.ErrorHandling; using Cortex.Streams.Operators; using Cortex.Streams.Performance; @@ -16,8 +17,8 @@ namespace Cortex.Streams /// Represents a built stream that can be started and stopped. /// /// The type of the initial input to the stream. - /// The current type of data in the stream. - public class Stream : IStream, IStatefulOperator, IDisposable + /// The current type of data in the stream (internal use only). + public class Stream : IStream, IStatefulOperator, IDisposable { private readonly string _name; private readonly IOperator _operatorChain; @@ -362,9 +363,9 @@ public BufferStatistics GetBufferStatistics() }; } - public IReadOnlyDictionary> GetBranches() + public IReadOnlyDictionary GetBranches() { - var branchDict = new Dictionary>(); + var branchDict = new Dictionary(); foreach (var branchOperator in _branchOperators) { branchDict[branchOperator.BranchName] = branchOperator; diff --git a/src/Cortex.Streams/StreamBuilder.cs b/src/Cortex.Streams/StreamBuilder.cs index 6d6b1f0..0836773 100644 --- a/src/Cortex.Streams/StreamBuilder.cs +++ b/src/Cortex.Streams/StreamBuilder.cs @@ -215,7 +215,7 @@ IStreamBuilder IInitialStreamBuilder.Stream() } - public IStream Build() + public IStream Build() { //return new Stream(_name, _firstOperator, _branchOperators); return new Stream(_name, _firstOperator, _branchOperators, _telemetryProvider, _executionOptions, _performanceOptions); diff --git a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs index ca92b57..eb2588b 100644 --- a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs +++ b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs @@ -34,7 +34,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull() public async Task Handle_EmitsAfterExecutionEvent_WhenConfigured() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); CommandExecutionEvent? capturedEvent = null; mockStream @@ -68,7 +68,7 @@ public async Task Handle_EmitsAfterExecutionEvent_WhenConfigured() public async Task Handle_EmitsBeforeExecutionEvent_WhenConfigured() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); var capturedEvents = new List>(); mockStream @@ -97,7 +97,7 @@ public async Task Handle_EmitsBeforeExecutionEvent_WhenConfigured() public async Task Handle_EmitsFailedEvent_WhenCommandThrows() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); CommandExecutionEvent? capturedEvent = null; mockStream @@ -128,7 +128,7 @@ await Assert.ThrowsAsync(() => public async Task Handle_DoesNotEmit_WhenBothFlagsAreFalse() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); var behavior = new StreamEmittingCommandBehavior( mockStream.Object, @@ -151,7 +151,7 @@ public async Task Handle_DoesNotEmit_WhenBothFlagsAreFalse() public async Task Handle_IncludesDuration_InAfterEvent() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); CommandExecutionEvent? capturedEvent = null; mockStream @@ -184,7 +184,7 @@ public async Task Handle_IncludesDuration_InAfterEvent() public async Task Handle_PropagatesResultCorrectly() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); mockStream .Setup(s => s.EmitAsync(It.IsAny>(), It.IsAny())) .Returns(Task.CompletedTask); diff --git a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs index f04cddd..9f384de 100644 --- a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs +++ b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs @@ -35,7 +35,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull() public async Task Handle_EmitsNotificationToStream() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); OrderCreatedNotification? capturedNotification = null; mockStream @@ -63,7 +63,7 @@ public async Task Handle_EmitsNotificationToStream() public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); OrderCreatedNotification? capturedNotification = null; Exception? capturedException = null; @@ -95,7 +95,7 @@ public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails() public async Task Handle_ThrowsException_WhenNoErrorHandlerAndStreamFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); mockStream .Setup(s => s.EmitAsync(It.IsAny(), It.IsAny())) @@ -113,7 +113,7 @@ await Assert.ThrowsAsync(() => public async Task Handle_PassesCancellationToken_ToStream() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); CancellationToken capturedToken = default; mockStream @@ -149,7 +149,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull() public void Constructor_ThrowsArgumentNullException_WhenTransformerIsNull() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); // Act & Assert Assert.Throws(() => @@ -162,7 +162,7 @@ public void Constructor_ThrowsArgumentNullException_WhenTransformerIsNull() public async Task Handle_TransformsAndEmitsNotificationToStream() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); OrderStreamData? capturedData = null; mockStream @@ -197,7 +197,7 @@ public async Task Handle_TransformsAndEmitsNotificationToStream() public async Task Handle_InvokesErrorHandler_WhenTransformFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); Exception? capturedException = null; var handler = new TransformingStreamNotificationHandler( @@ -220,7 +220,7 @@ public async Task Handle_InvokesErrorHandler_WhenTransformFails() public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); Exception? capturedException = null; mockStream