From c4bafa963833bdf068432994fcde0c618dc70f2b Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Fri, 30 Jan 2026 13:35:40 +0100 Subject: [PATCH] Refactor IStream to single generic parameter interface Refactored IStream and related classes to use a single generic parameter (IStream), removing the internal TCurrent type from the public API. Updated all builder interfaces, handlers, and usages to match the new signature. Introduced IBranchInfo for branch metadata, and updated GetBranches() to return IBranchInfo instances. This simplifies the stream abstraction and improves API encapsulation. --- .../Behaviors/StreamEmittingCommandBehavior.cs | 4 ++-- .../StreamEmittingNotificationBehavior.cs | 4 ++-- .../ServiceCollectionExtensions.cs | 4 ++-- .../Handlers/StreamBackedStreamQueryHandler.cs | 4 ++-- .../StreamEmittingNotificationHandler.cs | 8 ++++---- src/Cortex.Streams/Abstractions/IBranchInfo.cs | 15 +++++++++++++++ .../Abstractions/IFanOutBuilder.cs | 2 +- src/Cortex.Streams/Abstractions/ISinkBuilder.cs | 4 ++-- src/Cortex.Streams/Abstractions/IStream.cs | 6 +++--- .../Abstractions/IStreamBuilder.cs | 2 +- src/Cortex.Streams/FanOutBuilder.cs | 2 +- src/Cortex.Streams/Operators/BranchOperator.cs | 5 +++-- src/Cortex.Streams/SinkBuilder.cs | 2 +- src/Cortex.Streams/Stream.cs | 9 +++++---- src/Cortex.Streams/StreamBuilder.cs | 2 +- .../Tests/StreamEmittingCommandBehaviorTests.cs | 12 ++++++------ .../Tests/StreamEmittingHandlerTests.cs | 16 ++++++++-------- 17 files changed, 59 insertions(+), 42 deletions(-) create mode 100644 src/Cortex.Streams/Abstractions/IBranchInfo.cs diff --git a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs index 9b850b0..192bf3f 100644 --- a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs +++ b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs @@ -14,7 +14,7 @@ namespace Cortex.Streams.Mediator.Behaviors public class StreamEmittingCommandBehavior : ICommandPipelineBehavior where TCommand : ICommand { - private readonly IStream, CommandExecutionEvent> _stream; + private readonly IStream> _stream; private readonly bool _emitBeforeExecution; private readonly bool _emitAfterExecution; @@ -25,7 +25,7 @@ public class StreamEmittingCommandBehavior : ICommandPipeline /// If true, emit an event before command execution. /// If true, emit an event after command execution. public StreamEmittingCommandBehavior( - IStream, CommandExecutionEvent> stream, + IStream> stream, bool emitBeforeExecution = false, bool emitAfterExecution = true) { diff --git a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs index 5c585b5..75cf796 100644 --- a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs +++ b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs @@ -18,7 +18,7 @@ namespace Cortex.Streams.Mediator.Behaviors public class StreamEmittingNotificationBehavior : INotificationPipelineBehavior where TNotification : INotification { - private readonly IStream, NotificationEvent> _stream; + private readonly IStream> _stream; private readonly bool _emitBeforeHandling; private readonly bool _emitAfterHandling; @@ -29,7 +29,7 @@ public class StreamEmittingNotificationBehavior : INotificationPi /// If true, emit an event before notification handling. /// If true, emit an event after notification handling. public StreamEmittingNotificationBehavior( - IStream, NotificationEvent> stream, + IStream> stream, bool emitBeforeHandling = false, bool emitAfterHandling = true) { diff --git a/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs b/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs index 6a24dae..243fe66 100644 --- a/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs @@ -22,7 +22,7 @@ public static class ServiceCollectionExtensions /// The service collection for chaining. public static IServiceCollection AddStreamEmittingNotificationHandler( this IServiceCollection services, - Func> streamFactory, + Func> streamFactory, Action errorHandler = null) where TNotification : INotification { @@ -47,7 +47,7 @@ public static IServiceCollection AddStreamEmittingNotificationHandlerThe service collection for chaining. public static IServiceCollection AddTransformingStreamNotificationHandler( this IServiceCollection services, - Func> streamFactory, + Func> streamFactory, Func transformer, Action errorHandler = null) where TNotification : INotification diff --git a/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs b/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs index 3f6a313..52e572e 100644 --- a/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs +++ b/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs @@ -17,7 +17,7 @@ namespace Cortex.Streams.Mediator.Handlers public abstract class StreamBackedStreamQueryHandler : IStreamQueryHandler where TQuery : IStreamQuery { - private readonly IStream _stream; + private readonly IStream _stream; private readonly int _channelCapacity; /// @@ -25,7 +25,7 @@ public abstract class StreamBackedStreamQueryHandler : IStreamQ /// /// The Cortex Stream to read data from. /// The capacity of the internal channel buffer. Default is 100. - protected StreamBackedStreamQueryHandler(IStream stream, int channelCapacity = 100) + protected StreamBackedStreamQueryHandler(IStream stream, int channelCapacity = 100) { _stream = stream ?? throw new ArgumentNullException(nameof(stream)); _channelCapacity = channelCapacity; diff --git a/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs b/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs index 6a0c2c9..f6dfe01 100644 --- a/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs +++ b/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs @@ -13,7 +13,7 @@ namespace Cortex.Streams.Mediator.Handlers public class StreamEmittingNotificationHandler : INotificationHandler where TNotification : INotification { - private readonly IStream _stream; + private readonly IStream _stream; private readonly Action _errorHandler; /// @@ -22,7 +22,7 @@ public class StreamEmittingNotificationHandler : INotificationHan /// The stream to emit notifications to. /// Optional handler for errors during emission. public StreamEmittingNotificationHandler( - IStream stream, + IStream stream, Action errorHandler = null) { _stream = stream ?? throw new ArgumentNullException(nameof(stream)); @@ -62,7 +62,7 @@ public async Task Handle(TNotification notification, CancellationToken cancellat public class TransformingStreamNotificationHandler : INotificationHandler where TNotification : INotification { - private readonly IStream _stream; + private readonly IStream _stream; private readonly Func _transformer; private readonly Action _errorHandler; @@ -73,7 +73,7 @@ public class TransformingStreamNotificationHandler /// A function to transform notifications into stream input. /// Optional handler for errors during emission. public TransformingStreamNotificationHandler( - IStream stream, + IStream stream, Func transformer, Action errorHandler = null) { diff --git a/src/Cortex.Streams/Abstractions/IBranchInfo.cs b/src/Cortex.Streams/Abstractions/IBranchInfo.cs new file mode 100644 index 0000000..d801ce4 --- /dev/null +++ b/src/Cortex.Streams/Abstractions/IBranchInfo.cs @@ -0,0 +1,15 @@ +namespace Cortex.Streams.Abstractions +{ + /// + /// Provides information about a branch in the stream processing pipeline. + /// This is a non-generic interface that allows accessing branch metadata + /// without exposing internal type parameters. + /// + public interface IBranchInfo + { + /// + /// Gets the name of the branch. + /// + string BranchName { get; } + } +} diff --git a/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs b/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs index 031b802..0dfd645 100644 --- a/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs @@ -126,6 +126,6 @@ public interface IFanOutBuilder /// /// The built stream instance ready to be started. /// Thrown when no sinks have been configured. - IStream Build(); + IStream Build(); } } diff --git a/src/Cortex.Streams/Abstractions/ISinkBuilder.cs b/src/Cortex.Streams/Abstractions/ISinkBuilder.cs index 54e35dd..3dab53c 100644 --- a/src/Cortex.Streams/Abstractions/ISinkBuilder.cs +++ b/src/Cortex.Streams/Abstractions/ISinkBuilder.cs @@ -4,13 +4,13 @@ /// Provides a method to build the stream after adding a sink. /// /// The type of the initial input to the stream. - /// The current type of data in the stream. + /// The current type of data in the stream (internal use only). public interface ISinkBuilder { /// /// Builds the stream and returns a stream instance that can be started and stopped. /// /// A stream instance. - IStream Build(); + IStream Build(); } } diff --git a/src/Cortex.Streams/Abstractions/IStream.cs b/src/Cortex.Streams/Abstractions/IStream.cs index cb96d83..8e3523c 100644 --- a/src/Cortex.Streams/Abstractions/IStream.cs +++ b/src/Cortex.Streams/Abstractions/IStream.cs @@ -1,5 +1,5 @@ using Cortex.States; -using Cortex.Streams.Operators; +using Cortex.Streams.Abstractions; using Cortex.Streams.Performance; using System.Collections.Generic; using System.Threading; @@ -7,7 +7,7 @@ namespace Cortex.Streams { - public interface IStream + public interface IStream { /// /// Start the stream processing. @@ -66,7 +66,7 @@ public interface IStream StreamStatuses GetStatus(); - IReadOnlyDictionary> GetBranches(); + IReadOnlyDictionary GetBranches(); TStateStore GetStateStoreByName(string name) where TStateStore : IDataStore; IEnumerable GetStateStoresByType() where TStateStore : IDataStore; diff --git a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs index 0b14ffc..aae8d7a 100644 --- a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs @@ -107,7 +107,7 @@ public interface IStreamBuilder /// Builds the stream /// /// - IStream Build(); + IStream Build(); /// diff --git a/src/Cortex.Streams/FanOutBuilder.cs b/src/Cortex.Streams/FanOutBuilder.cs index 2507f34..620cd07 100644 --- a/src/Cortex.Streams/FanOutBuilder.cs +++ b/src/Cortex.Streams/FanOutBuilder.cs @@ -166,7 +166,7 @@ public IFanOutBuilder ToWithTransform(string name, Func< } /// - public IStream Build() + public IStream Build() { if (_branchOperators.Count == 0) { diff --git a/src/Cortex.Streams/Operators/BranchOperator.cs b/src/Cortex.Streams/Operators/BranchOperator.cs index d10f02a..4395427 100644 --- a/src/Cortex.Streams/Operators/BranchOperator.cs +++ b/src/Cortex.Streams/Operators/BranchOperator.cs @@ -1,4 +1,5 @@ -using Cortex.Streams.ErrorHandling; +using Cortex.Streams.Abstractions; +using Cortex.Streams.ErrorHandling; using Cortex.Telemetry; using System; using System.Collections.Generic; @@ -10,7 +11,7 @@ namespace Cortex.Streams.Operators /// Represents a branch in a fan-out pattern that processes data independently. /// Forwards telemetry and error handling configuration to the branch's operator chain. /// - public class BranchOperator : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled + public class BranchOperator : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled, IBranchInfo { private readonly string _branchName; private readonly IOperator _branchOperator; diff --git a/src/Cortex.Streams/SinkBuilder.cs b/src/Cortex.Streams/SinkBuilder.cs index 0e4df77..785e17d 100644 --- a/src/Cortex.Streams/SinkBuilder.cs +++ b/src/Cortex.Streams/SinkBuilder.cs @@ -43,7 +43,7 @@ public SinkBuilder( /// Builds the stream and returns a stream instance. /// /// A stream instance. - public IStream Build() + public IStream Build() { return new Stream(_name, _firstOperator, _branchOperators, _telemetryProvider, _executionOptions, _performanceOptions); } diff --git a/src/Cortex.Streams/Stream.cs b/src/Cortex.Streams/Stream.cs index 8f41cd1..8aebf6f 100644 --- a/src/Cortex.Streams/Stream.cs +++ b/src/Cortex.Streams/Stream.cs @@ -1,5 +1,6 @@ using Cortex.States; using Cortex.States.Operators; +using Cortex.Streams.Abstractions; using Cortex.Streams.ErrorHandling; using Cortex.Streams.Operators; using Cortex.Streams.Performance; @@ -16,8 +17,8 @@ namespace Cortex.Streams /// Represents a built stream that can be started and stopped. /// /// The type of the initial input to the stream. - /// The current type of data in the stream. - public class Stream : IStream, IStatefulOperator, IDisposable + /// The current type of data in the stream (internal use only). + public class Stream : IStream, IStatefulOperator, IDisposable { private readonly string _name; private readonly IOperator _operatorChain; @@ -362,9 +363,9 @@ public BufferStatistics GetBufferStatistics() }; } - public IReadOnlyDictionary> GetBranches() + public IReadOnlyDictionary GetBranches() { - var branchDict = new Dictionary>(); + var branchDict = new Dictionary(); foreach (var branchOperator in _branchOperators) { branchDict[branchOperator.BranchName] = branchOperator; diff --git a/src/Cortex.Streams/StreamBuilder.cs b/src/Cortex.Streams/StreamBuilder.cs index 6d6b1f0..0836773 100644 --- a/src/Cortex.Streams/StreamBuilder.cs +++ b/src/Cortex.Streams/StreamBuilder.cs @@ -215,7 +215,7 @@ IStreamBuilder IInitialStreamBuilder.Stream() } - public IStream Build() + public IStream Build() { //return new Stream(_name, _firstOperator, _branchOperators); return new Stream(_name, _firstOperator, _branchOperators, _telemetryProvider, _executionOptions, _performanceOptions); diff --git a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs index ca92b57..eb2588b 100644 --- a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs +++ b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs @@ -34,7 +34,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull() public async Task Handle_EmitsAfterExecutionEvent_WhenConfigured() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); CommandExecutionEvent? capturedEvent = null; mockStream @@ -68,7 +68,7 @@ public async Task Handle_EmitsAfterExecutionEvent_WhenConfigured() public async Task Handle_EmitsBeforeExecutionEvent_WhenConfigured() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); var capturedEvents = new List>(); mockStream @@ -97,7 +97,7 @@ public async Task Handle_EmitsBeforeExecutionEvent_WhenConfigured() public async Task Handle_EmitsFailedEvent_WhenCommandThrows() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); CommandExecutionEvent? capturedEvent = null; mockStream @@ -128,7 +128,7 @@ await Assert.ThrowsAsync(() => public async Task Handle_DoesNotEmit_WhenBothFlagsAreFalse() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); var behavior = new StreamEmittingCommandBehavior( mockStream.Object, @@ -151,7 +151,7 @@ public async Task Handle_DoesNotEmit_WhenBothFlagsAreFalse() public async Task Handle_IncludesDuration_InAfterEvent() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); CommandExecutionEvent? capturedEvent = null; mockStream @@ -184,7 +184,7 @@ public async Task Handle_IncludesDuration_InAfterEvent() public async Task Handle_PropagatesResultCorrectly() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); mockStream .Setup(s => s.EmitAsync(It.IsAny>(), It.IsAny())) .Returns(Task.CompletedTask); diff --git a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs index f04cddd..9f384de 100644 --- a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs +++ b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs @@ -35,7 +35,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull() public async Task Handle_EmitsNotificationToStream() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); OrderCreatedNotification? capturedNotification = null; mockStream @@ -63,7 +63,7 @@ public async Task Handle_EmitsNotificationToStream() public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); OrderCreatedNotification? capturedNotification = null; Exception? capturedException = null; @@ -95,7 +95,7 @@ public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails() public async Task Handle_ThrowsException_WhenNoErrorHandlerAndStreamFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); mockStream .Setup(s => s.EmitAsync(It.IsAny(), It.IsAny())) @@ -113,7 +113,7 @@ await Assert.ThrowsAsync(() => public async Task Handle_PassesCancellationToken_ToStream() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); CancellationToken capturedToken = default; mockStream @@ -149,7 +149,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull() public void Constructor_ThrowsArgumentNullException_WhenTransformerIsNull() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); // Act & Assert Assert.Throws(() => @@ -162,7 +162,7 @@ public void Constructor_ThrowsArgumentNullException_WhenTransformerIsNull() public async Task Handle_TransformsAndEmitsNotificationToStream() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); OrderStreamData? capturedData = null; mockStream @@ -197,7 +197,7 @@ public async Task Handle_TransformsAndEmitsNotificationToStream() public async Task Handle_InvokesErrorHandler_WhenTransformFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); Exception? capturedException = null; var handler = new TransformingStreamNotificationHandler( @@ -220,7 +220,7 @@ public async Task Handle_InvokesErrorHandler_WhenTransformFails() public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); Exception? capturedException = null; mockStream