Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Cortex.Streams.Mediator.Behaviors
public class StreamEmittingCommandBehavior<TCommand, TResult> : ICommandPipelineBehavior<TCommand, TResult>
where TCommand : ICommand<TResult>
{
private readonly IStream<CommandExecutionEvent<TCommand, TResult>, CommandExecutionEvent<TCommand, TResult>> _stream;
private readonly IStream<CommandExecutionEvent<TCommand, TResult>> _stream;
private readonly bool _emitBeforeExecution;
private readonly bool _emitAfterExecution;

Expand All @@ -25,7 +25,7 @@ public class StreamEmittingCommandBehavior<TCommand, TResult> : ICommandPipeline
/// <param name="emitBeforeExecution">If true, emit an event before command execution.</param>
/// <param name="emitAfterExecution">If true, emit an event after command execution.</param>
public StreamEmittingCommandBehavior(
IStream<CommandExecutionEvent<TCommand, TResult>, CommandExecutionEvent<TCommand, TResult>> stream,
IStream<CommandExecutionEvent<TCommand, TResult>> stream,
bool emitBeforeExecution = false,
bool emitAfterExecution = true)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Cortex.Streams.Mediator.Behaviors
public class StreamEmittingNotificationBehavior<TNotification> : INotificationPipelineBehavior<TNotification>
where TNotification : INotification
{
private readonly IStream<NotificationEvent<TNotification>, NotificationEvent<TNotification>> _stream;
private readonly IStream<NotificationEvent<TNotification>> _stream;
private readonly bool _emitBeforeHandling;
private readonly bool _emitAfterHandling;

Expand All @@ -29,7 +29,7 @@ public class StreamEmittingNotificationBehavior<TNotification> : INotificationPi
/// <param name="emitBeforeHandling">If true, emit an event before notification handling.</param>
/// <param name="emitAfterHandling">If true, emit an event after notification handling.</param>
public StreamEmittingNotificationBehavior(
IStream<NotificationEvent<TNotification>, NotificationEvent<TNotification>> stream,
IStream<NotificationEvent<TNotification>> stream,
bool emitBeforeHandling = false,
bool emitAfterHandling = true)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static class ServiceCollectionExtensions
/// <returns>The service collection for chaining.</returns>
public static IServiceCollection AddStreamEmittingNotificationHandler<TNotification>(
this IServiceCollection services,
Func<IServiceProvider, IStream<TNotification, TNotification>> streamFactory,
Func<IServiceProvider, IStream<TNotification>> streamFactory,
Action<TNotification, Exception> errorHandler = null)
where TNotification : INotification
{
Expand All @@ -47,7 +47,7 @@ public static IServiceCollection AddStreamEmittingNotificationHandler<TNotificat
/// <returns>The service collection for chaining.</returns>
public static IServiceCollection AddTransformingStreamNotificationHandler<TNotification, TStreamInput>(
this IServiceCollection services,
Func<IServiceProvider, IStream<TStreamInput, TStreamInput>> streamFactory,
Func<IServiceProvider, IStream<TStreamInput>> streamFactory,
Func<TNotification, TStreamInput> transformer,
Action<TNotification, Exception> errorHandler = null)
where TNotification : INotification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ namespace Cortex.Streams.Mediator.Handlers
public abstract class StreamBackedStreamQueryHandler<TQuery, TResult> : IStreamQueryHandler<TQuery, TResult>
where TQuery : IStreamQuery<TResult>
{
private readonly IStream<TResult, TResult> _stream;
private readonly IStream<TResult> _stream;
private readonly int _channelCapacity;

/// <summary>
/// Initializes a new instance of the <see cref="StreamBackedStreamQueryHandler{TQuery, TResult}"/> class.
/// </summary>
/// <param name="stream">The Cortex Stream to read data from.</param>
/// <param name="channelCapacity">The capacity of the internal channel buffer. Default is 100.</param>
protected StreamBackedStreamQueryHandler(IStream<TResult, TResult> stream, int channelCapacity = 100)
protected StreamBackedStreamQueryHandler(IStream<TResult> stream, int channelCapacity = 100)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_channelCapacity = channelCapacity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Cortex.Streams.Mediator.Handlers
public class StreamEmittingNotificationHandler<TNotification> : INotificationHandler<TNotification>
where TNotification : INotification
{
private readonly IStream<TNotification, TNotification> _stream;
private readonly IStream<TNotification> _stream;
private readonly Action<TNotification, Exception> _errorHandler;

/// <summary>
Expand All @@ -22,7 +22,7 @@ public class StreamEmittingNotificationHandler<TNotification> : INotificationHan
/// <param name="stream">The stream to emit notifications to.</param>
/// <param name="errorHandler">Optional handler for errors during emission.</param>
public StreamEmittingNotificationHandler(
IStream<TNotification, TNotification> stream,
IStream<TNotification> stream,
Action<TNotification, Exception> errorHandler = null)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
Expand Down Expand Up @@ -62,7 +62,7 @@ public async Task Handle(TNotification notification, CancellationToken cancellat
public class TransformingStreamNotificationHandler<TNotification, TStreamInput> : INotificationHandler<TNotification>
where TNotification : INotification
{
private readonly IStream<TStreamInput, TStreamInput> _stream;
private readonly IStream<TStreamInput> _stream;
private readonly Func<TNotification, TStreamInput> _transformer;
private readonly Action<TNotification, Exception> _errorHandler;

Expand All @@ -73,7 +73,7 @@ public class TransformingStreamNotificationHandler<TNotification, TStreamInput>
/// <param name="transformer">A function to transform notifications into stream input.</param>
/// <param name="errorHandler">Optional handler for errors during emission.</param>
public TransformingStreamNotificationHandler(
IStream<TStreamInput, TStreamInput> stream,
IStream<TStreamInput> stream,
Func<TNotification, TStreamInput> transformer,
Action<TNotification, Exception> errorHandler = null)
{
Expand Down
15 changes: 15 additions & 0 deletions src/Cortex.Streams/Abstractions/IBranchInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Cortex.Streams.Abstractions
{
/// <summary>
/// Provides information about a branch in the stream processing pipeline.
/// This is a non-generic interface that allows accessing branch metadata
/// without exposing internal type parameters.
/// </summary>
public interface IBranchInfo
{
/// <summary>
/// Gets the name of the branch.
/// </summary>
string BranchName { get; }
}
}
2 changes: 1 addition & 1 deletion src/Cortex.Streams/Abstractions/IFanOutBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ public interface IFanOutBuilder<TIn, TCurrent>
/// </summary>
/// <returns>The built stream instance ready to be started.</returns>
/// <exception cref="InvalidOperationException">Thrown when no sinks have been configured.</exception>
IStream<TIn, TCurrent> Build();
IStream<TIn> Build();
}
}
4 changes: 2 additions & 2 deletions src/Cortex.Streams/Abstractions/ISinkBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
/// Provides a method to build the stream after adding a sink.
/// </summary>
/// <typeparam name="TIn">The type of the initial input to the stream.</typeparam>
/// <typeparam name="TCurrent">The current type of data in the stream.</typeparam>
/// <typeparam name="TCurrent">The current type of data in the stream (internal use only).</typeparam>
public interface ISinkBuilder<TIn, TCurrent>
{
/// <summary>
/// Builds the stream and returns a stream instance that can be started and stopped.
/// </summary>
/// <returns>A stream instance.</returns>
IStream<TIn, TCurrent> Build();
IStream<TIn> Build();
}
}
6 changes: 3 additions & 3 deletions src/Cortex.Streams/Abstractions/IStream.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
using Cortex.States;
using Cortex.Streams.Operators;
using Cortex.Streams.Abstractions;
using Cortex.Streams.Performance;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Cortex.Streams
{
public interface IStream<TIn, TCurrent>
public interface IStream<TIn>
{
/// <summary>
/// Start the stream processing.
Expand Down Expand Up @@ -66,7 +66,7 @@ public interface IStream<TIn, TCurrent>

StreamStatuses GetStatus();

IReadOnlyDictionary<string, BranchOperator<TCurrent>> GetBranches();
IReadOnlyDictionary<string, IBranchInfo> GetBranches();

TStateStore GetStateStoreByName<TStateStore>(string name) where TStateStore : IDataStore;
IEnumerable<TStateStore> GetStateStoresByType<TStateStore>() where TStateStore : IDataStore;
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams/Abstractions/IStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public interface IStreamBuilder<TIn, TCurrent>
/// Builds the stream
/// </summary>
/// <returns></returns>
IStream<TIn, TCurrent> Build();
IStream<TIn> Build();


/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams/FanOutBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public IFanOutBuilder<TIn, TCurrent> ToWithTransform<TOutput>(string name, Func<
}

/// <inheritdoc />
public IStream<TIn, TCurrent> Build()
public IStream<TIn> Build()
{
if (_branchOperators.Count == 0)
{
Expand Down
5 changes: 3 additions & 2 deletions src/Cortex.Streams/Operators/BranchOperator.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Cortex.Streams.ErrorHandling;
using Cortex.Streams.Abstractions;
using Cortex.Streams.ErrorHandling;
using Cortex.Telemetry;
using System;
using System.Collections.Generic;
Expand All @@ -10,7 +11,7 @@ namespace Cortex.Streams.Operators
/// Represents a branch in a fan-out pattern that processes data independently.
/// Forwards telemetry and error handling configuration to the branch's operator chain.
/// </summary>
public class BranchOperator<T> : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled
public class BranchOperator<T> : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled, IBranchInfo
{
private readonly string _branchName;
private readonly IOperator _branchOperator;
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams/SinkBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public SinkBuilder(
/// Builds the stream and returns a stream instance.
/// </summary>
/// <returns>A stream instance.</returns>
public IStream<TIn, TCurrent> Build()
public IStream<TIn> Build()
{
return new Stream<TIn, TCurrent>(_name, _firstOperator, _branchOperators, _telemetryProvider, _executionOptions, _performanceOptions);
}
Expand Down
9 changes: 5 additions & 4 deletions src/Cortex.Streams/Stream.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Cortex.States;
using Cortex.States.Operators;
using Cortex.Streams.Abstractions;
using Cortex.Streams.ErrorHandling;
using Cortex.Streams.Operators;
using Cortex.Streams.Performance;
Expand All @@ -16,8 +17,8 @@ namespace Cortex.Streams
/// Represents a built stream that can be started and stopped.
/// </summary>
/// <typeparam name="TIn">The type of the initial input to the stream.</typeparam>
/// <typeparam name="TCurrent">The current type of data in the stream.</typeparam>
public class Stream<TIn, TCurrent> : IStream<TIn, TCurrent>, IStatefulOperator, IDisposable
/// <typeparam name="TCurrent">The current type of data in the stream (internal use only).</typeparam>
public class Stream<TIn, TCurrent> : IStream<TIn>, IStatefulOperator, IDisposable
{
private readonly string _name;
private readonly IOperator _operatorChain;
Expand Down Expand Up @@ -362,9 +363,9 @@ public BufferStatistics GetBufferStatistics()
};
}

public IReadOnlyDictionary<string, BranchOperator<TCurrent>> GetBranches()
public IReadOnlyDictionary<string, IBranchInfo> GetBranches()
{
var branchDict = new Dictionary<string, BranchOperator<TCurrent>>();
var branchDict = new Dictionary<string, IBranchInfo>();
foreach (var branchOperator in _branchOperators)
{
branchDict[branchOperator.BranchName] = branchOperator;
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams/StreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ IStreamBuilder<TIn, TIn> IInitialStreamBuilder<TIn>.Stream()
}


public IStream<TIn, TCurrent> Build()
public IStream<TIn> Build()
{
//return new Stream<TIn, TCurrent>(_name, _firstOperator, _branchOperators);
return new Stream<TIn, TCurrent>(_name, _firstOperator, _branchOperators, _telemetryProvider, _executionOptions, _performanceOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull()
public async Task Handle_EmitsAfterExecutionEvent_WhenConfigured()
{
// Arrange
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>, CommandExecutionEvent<TestCommand, TestCommandResult>>>();
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>>>();
CommandExecutionEvent<TestCommand, TestCommandResult>? capturedEvent = null;

mockStream
Expand Down Expand Up @@ -68,7 +68,7 @@ public async Task Handle_EmitsAfterExecutionEvent_WhenConfigured()
public async Task Handle_EmitsBeforeExecutionEvent_WhenConfigured()
{
// Arrange
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>, CommandExecutionEvent<TestCommand, TestCommandResult>>>();
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>>>();
var capturedEvents = new List<CommandExecutionEvent<TestCommand, TestCommandResult>>();

mockStream
Expand Down Expand Up @@ -97,7 +97,7 @@ public async Task Handle_EmitsBeforeExecutionEvent_WhenConfigured()
public async Task Handle_EmitsFailedEvent_WhenCommandThrows()
{
// Arrange
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>, CommandExecutionEvent<TestCommand, TestCommandResult>>>();
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>>>();
CommandExecutionEvent<TestCommand, TestCommandResult>? capturedEvent = null;

mockStream
Expand Down Expand Up @@ -128,7 +128,7 @@ await Assert.ThrowsAsync<InvalidOperationException>(() =>
public async Task Handle_DoesNotEmit_WhenBothFlagsAreFalse()
{
// Arrange
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>, CommandExecutionEvent<TestCommand, TestCommandResult>>>();
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>>>();

var behavior = new StreamEmittingCommandBehavior<TestCommand, TestCommandResult>(
mockStream.Object,
Expand All @@ -151,7 +151,7 @@ public async Task Handle_DoesNotEmit_WhenBothFlagsAreFalse()
public async Task Handle_IncludesDuration_InAfterEvent()
{
// Arrange
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>, CommandExecutionEvent<TestCommand, TestCommandResult>>>();
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>>>();
CommandExecutionEvent<TestCommand, TestCommandResult>? capturedEvent = null;

mockStream
Expand Down Expand Up @@ -184,7 +184,7 @@ public async Task Handle_IncludesDuration_InAfterEvent()
public async Task Handle_PropagatesResultCorrectly()
{
// Arrange
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>, CommandExecutionEvent<TestCommand, TestCommandResult>>>();
var mockStream = new Mock<IStream<CommandExecutionEvent<TestCommand, TestCommandResult>>>();
mockStream
.Setup(s => s.EmitAsync(It.IsAny<CommandExecutionEvent<TestCommand, TestCommandResult>>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull()
public async Task Handle_EmitsNotificationToStream()
{
// Arrange
var mockStream = new Mock<IStream<OrderCreatedNotification, OrderCreatedNotification>>();
var mockStream = new Mock<IStream<OrderCreatedNotification>>();
OrderCreatedNotification? capturedNotification = null;

mockStream
Expand Down Expand Up @@ -63,7 +63,7 @@ public async Task Handle_EmitsNotificationToStream()
public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails()
{
// Arrange
var mockStream = new Mock<IStream<OrderCreatedNotification, OrderCreatedNotification>>();
var mockStream = new Mock<IStream<OrderCreatedNotification>>();
OrderCreatedNotification? capturedNotification = null;
Exception? capturedException = null;

Expand Down Expand Up @@ -95,7 +95,7 @@ public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails()
public async Task Handle_ThrowsException_WhenNoErrorHandlerAndStreamFails()
{
// Arrange
var mockStream = new Mock<IStream<OrderCreatedNotification, OrderCreatedNotification>>();
var mockStream = new Mock<IStream<OrderCreatedNotification>>();

mockStream
.Setup(s => s.EmitAsync(It.IsAny<OrderCreatedNotification>(), It.IsAny<CancellationToken>()))
Expand All @@ -113,7 +113,7 @@ await Assert.ThrowsAsync<InvalidOperationException>(() =>
public async Task Handle_PassesCancellationToken_ToStream()
{
// Arrange
var mockStream = new Mock<IStream<OrderCreatedNotification, OrderCreatedNotification>>();
var mockStream = new Mock<IStream<OrderCreatedNotification>>();
CancellationToken capturedToken = default;

mockStream
Expand Down Expand Up @@ -149,7 +149,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull()
public void Constructor_ThrowsArgumentNullException_WhenTransformerIsNull()
{
// Arrange
var mockStream = new Mock<IStream<OrderStreamData, OrderStreamData>>();
var mockStream = new Mock<IStream<OrderStreamData>>();

// Act & Assert
Assert.Throws<ArgumentNullException>(() =>
Expand All @@ -162,7 +162,7 @@ public void Constructor_ThrowsArgumentNullException_WhenTransformerIsNull()
public async Task Handle_TransformsAndEmitsNotificationToStream()
{
// Arrange
var mockStream = new Mock<IStream<OrderStreamData, OrderStreamData>>();
var mockStream = new Mock<IStream<OrderStreamData>>();
OrderStreamData? capturedData = null;

mockStream
Expand Down Expand Up @@ -197,7 +197,7 @@ public async Task Handle_TransformsAndEmitsNotificationToStream()
public async Task Handle_InvokesErrorHandler_WhenTransformFails()
{
// Arrange
var mockStream = new Mock<IStream<OrderStreamData, OrderStreamData>>();
var mockStream = new Mock<IStream<OrderStreamData>>();
Exception? capturedException = null;

var handler = new TransformingStreamNotificationHandler<OrderCreatedNotification, OrderStreamData>(
Expand All @@ -220,7 +220,7 @@ public async Task Handle_InvokesErrorHandler_WhenTransformFails()
public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails()
{
// Arrange
var mockStream = new Mock<IStream<OrderStreamData, OrderStreamData>>();
var mockStream = new Mock<IStream<OrderStreamData>>();
Exception? capturedException = null;

mockStream
Expand Down
Loading