Skip to content

Releases: buildersoftio/cortex

v3.1.2

10 Feb 19:45
e62f581

Choose a tag to compare

🚀 Cortex Data Framework v3.1.2

Release Date: 10 February 2026
Version: 3.1.2

We're thrilled to announce the release of Cortex Data Framework v3.1 – the most feature-packed, performant, and developer-friendly version yet! This release brings major enhancements to stream processing, state management, mediator patterns, and introduces powerful new type utilities.


What's Changed

  • v3/bug/209 : Improve error handling and diagnostics in Streams/Kafka by @eneshoxha in #210

Full Changelog: v3.1.1...v3.1.2


🎯 Highlights

Feature Description
🪟 Unified Windowing API Tumbling, Sliding, and Session windows with custom triggers
🔗 Stream-Stream Joins Windowed joins between two streams
🎛️ Simplified APIs Single type parameter for StreamBuilder
🦆 DuckDB State Store High-performance analytical state storage
📡 Async Buffering Backpressure-aware stream processing
🔀 FanOut Operator Multi-sink stream support with filters
🎭 Type-Inferred Mediator No more explicit generic parameters

🌊 Stream Processing Enhancements

🪟 Unified Windowing API

Create tumbling, sliding, and session windows with a clean, unified API:

// Tumbling Window - Non-overlapping fixed-size windows
// Returns WindowResult<string, OrderEvent> containing the key and all items in the window
var stream = StreamBuilder<OrderEvent>
    .CreateNewStream("OrderAggregation")
    .Stream()
    .TumblingWindow(
        keySelector: e => e.CustomerId,
        timestampSelector: e => e.OrderTime,
        windowSize: TimeSpan.FromMinutes(5))
    .Map(windowResult => new OrderSummary 
    { 
        CustomerId = windowResult.Key,
        TotalAmount = windowResult.Items.Sum(o => o.Amount),
        Count = windowResult.Items.Count,
        WindowStart = windowResult.WindowStart,
        WindowEnd = windowResult.WindowEnd
    })
    .Sink(summary => Console.WriteLine($"Customer {summary.CustomerId}: {summary.TotalAmount}"))
    .Build();

// Sliding Window - Overlapping windows with slide interval
var slidingStream = StreamBuilder<SensorReading>
    .CreateNewStream("SensorAverage")
    .Stream()
    .SlidingWindow(
        keySelector: e => e.SensorId,
        timestampSelector: e => e.Timestamp,
        windowSize: TimeSpan.FromMinutes(10),
        slideInterval: TimeSpan.FromMinutes(2))
    .Map(window => new { window.Key, Average = window.Items.Average(r => r.Value) })
    .Sink(avg => Console.WriteLine($"Sensor {avg.Key} avg: {avg.Average}"))
    .Build();

// Session Window - Activity-based windows with inactivity gaps
var sessionStream = StreamBuilder<UserActivity>
    .CreateNewStream("UserSessions")
    .Stream()
    .SessionWindow(
        keySelector: e => e.UserId,
        timestampSelector: e => e.ActivityTime,
        inactivityGap: TimeSpan.FromMinutes(30))
    .Map(session => new UserSession 
    { 
        UserId = session.Key,
        Events = session.Items.ToList(),
        Duration = session.WindowEnd - session.WindowStart
    })
    .Sink(s => Console.WriteLine($"Session ended for {s.UserId}: {s.Events.Count} events"))
    .Build();

🔗 Stream-Stream Joins

Join two streams together within time windows:

// Create the join operator with configuration
var joinOperator = new StreamStreamJoinOperator<Order, Payment, string, OrderWithPayment>(
    leftKeySelector: order => order.OrderId,
    rightKeySelector: payment => payment.OrderId,
    leftTimestampSelector: order => order.CreatedAt,
    rightTimestampSelector: payment => payment.ProcessedAt,
    joinFunction: (order, payment) => new OrderWithPayment 
    { 
        Order = order, 
        Payment = payment 
    },
    configuration: new StreamJoinConfiguration 
    { 
        WindowSize = TimeSpan.FromMinutes(5),
        JoinType = StreamJoinType.Inner
    });

// Build the stream with the join
var orderStream = StreamBuilder<Order>
    .CreateNewStream("OrderPaymentJoin")
    .Stream()
    .JoinStream(joinOperator)
    .Sink(result => Console.WriteLine($"Matched: {result.Order.OrderId}"))
    .Build();

// Feed payments to the right side of the join
paymentStream.ForEach(payment => joinOperator.ProcessRight(payment));

📊 Stream-Table Left Joins

Enrich stream data with table lookups:

// Create a state store for the "table" side
var customerStore = new InMemoryStateStore<string, Customer>("CustomerStore");

// Build stream with left join - emits result even when no match found
var enrichedStream = StreamBuilder<OrderEvent>
    .CreateNewStream("EnrichedOrders")
    .Stream()
    .LeftJoin<Customer, string, EnrichedEvent>(
        rightStateStore: customerStore,
        keySelector: e => e.CustomerId,
        joinFunction: (order, customer) => new EnrichedEvent 
        { 
            Order = order, 
            CustomerName = customer?.Name ?? "Unknown",
            CustomerTier = customer?.Tier ?? "Standard"
        })
    .Sink(e => Console.WriteLine($"Order for {e.CustomerName}"))
    .Build();

🔀 FanOut: Multi-Sink Stream Support

Route events to multiple sinks with optional filters:

var stream = StreamBuilder<LogEvent>
    .CreateNewStream("LogRouter")
    .Stream()
    .FanOut(fanOut => fanOut
        .To("alerts", log => log.Level == "ERROR", log => alertService.SendAlert(log))
        .To("metrics", log => log.Level == "INFO", log => metricsService.RecordMetric(log))
        .To("archive", log => archiveService.Store(log))) // No filter = all events
    .Build();

// With transformations before sinking
.FanOut(fanOut => fanOut
    .To("console", log => Console.WriteLine(log.Message))
    .ToWithTransform("json", 
        log => JsonSerializer.Serialize(log), 
        json => File.AppendAllText("logs.json", json)))

📡 Async Buffering & Backpressure

Handle high-throughput scenarios with built-in backpressure:

// Option 1: Use predefined high-throughput settings
var stream = StreamBuilder<Event>
    .CreateNewStream("HighThroughputStream")
    .WithPerformanceOptions(StreamPerformanceOptions.HighThroughput(
        bufferCapacity: 100_000,
        concurrencyLevel: Environment.ProcessorCount))
    .Stream()
    .Map(e => ProcessEvent(e))
    .Sink(e => SaveToDatabase(e))
    .Build();

// Option 2: Custom configuration
var streamCustom = StreamBuilder<Event>
    .CreateNewStream("CustomBufferedStream")
    .WithPerformanceOptions(new StreamPerformanceOptions
    {
        EnableBufferedProcessing = true,
        BufferCapacity = 10_000,
        BackpressureStrategy = BackpressureStrategy.Block,
        BatchSize = 100,
        BatchTimeout = TimeSpan.FromMilliseconds(50),
        ConcurrencyLevel = 4,
        BlockingTimeout = TimeSpan.FromSeconds(30),
        OnItemDropped = (item, reason) => _logger.LogWarning("Dropped: {Reason}", reason)
    })
    .Stream()
    .Map(e => ProcessEvent(e))
    .Sink(e => SaveToDatabase(e))
    .Build();

// Option 3: Low-latency settings (immediate processing)
var lowLatencyStream = StreamBuilder<Event>
    .CreateNewStream("LowLatencyStream")
    .WithPerformanceOptions(StreamPerformanceOptions.LowLatency())
    .Stream()
    .Sink(e => ProcessImmediately(e))
    .Build();

🎛️ Simplified StreamBuilder API

The StreamBuilder now uses a single type parameter for stream creation:

// Clean, simple API - just specify the input type
var stream = StreamBuilder<OrderEvent>
    .CreateNewStream("MyStream")
    .Stream()
    .Filter(e => e.IsValid)
    .Map(e => Transform(e))
    .Sink(e => Save(e))
    .Build();

// With external source operator
var kafkaStream = StreamBuilder<OrderEvent>
    .CreateNewStream("KafkaOrders")
    .Stream(new KafkaSourceOperator<OrderEvent>(kafkaConfig))
    .Filter(e => e.Amount > 100)
    .Sink(e => ProcessLargeOrder(e))
    .Build();

📝 Structured Logging for Operators

All source and sink operators now include structured logging:

// Automatic structured logging for all operators
[14:23:45 INF] KafkaSourceOperator started consuming from topic "orders"
[14:23:46 INF] Processed 1000 messages in 1.2s (833 msg/s)
[14:23:47 WRN] Backpressure detected, slowing consumption

🧠 Mediator Pattern Improvements

🎭 Type-Inferred Commands & Queries

No more explicit type parameters – the compiler infers everything:

// Before (v2.x) - Explicit type parameters required
var result = await mediator.SendCommandAsync<CreateOrderCommand, OrderId>(command);
var data = await mediator.SendQueryAsync<GetOrderQuery, OrderDto>(query);

// After (v3.0) - Type inference with extension methods!
var result = await mediator.SendAsync(command);  // Returns OrderId
var data = await mediator.QueryAsync(query);     // Returns OrderDto

// Void commands work too!
await mediator.SendAsync(new DeleteOrderCommand { OrderId = id });

// Publish notifications
await mediator.PublishAsync(new OrderCreatedNotification { OrderId = id });

🔔 Notification Pipeline Behaviors

Add cross-cutting concerns to notifications:

public class LoggingNotificationBehavior<TNotification> 
    : INotificationBehavior<TNotification>
    where TNotification : INotification
{
    public async Task Handle(
        TNotification notification,
        NotificationHandlerDelegate next,
        CancellationToken cancellationToken)
    {
        _logger.LogInformation("Publishing {Notification}", typeof(TNotification).Name);
        await next();
        _logger.LogInformation("Published {Notification}", typeof(TNotification).Name);
    }
}
``...
Read more

v3.1.1

30 Jan 12:38
08efb09

Choose a tag to compare

🚀 Cortex Data Framework v3.1.1

Release Date: 30 January 2026
Version: 3.1.1

We're thrilled to announce the release of Cortex Data Framework v3.1 – the most feature-packed, performant, and developer-friendly version yet! This release brings major enhancements to stream processing, state management, mediator patterns, and introduces powerful new type utilities.


🎯 Highlights

Feature Description
🪟 Unified Windowing API Tumbling, Sliding, and Session windows with custom triggers
🔗 Stream-Stream Joins Windowed joins between two streams
🎛️ Simplified APIs Single type parameter for StreamBuilder
🦆 DuckDB State Store High-performance analytical state storage
📡 Async Buffering Backpressure-aware stream processing
🔀 FanOut Operator Multi-sink stream support with filters
🎭 Type-Inferred Mediator No more explicit generic parameters

🌊 Stream Processing Enhancements

🪟 Unified Windowing API

Create tumbling, sliding, and session windows with a clean, unified API:

// Tumbling Window - Non-overlapping fixed-size windows
// Returns WindowResult<string, OrderEvent> containing the key and all items in the window
var stream = StreamBuilder<OrderEvent>
    .CreateNewStream("OrderAggregation")
    .Stream()
    .TumblingWindow(
        keySelector: e => e.CustomerId,
        timestampSelector: e => e.OrderTime,
        windowSize: TimeSpan.FromMinutes(5))
    .Map(windowResult => new OrderSummary 
    { 
        CustomerId = windowResult.Key,
        TotalAmount = windowResult.Items.Sum(o => o.Amount),
        Count = windowResult.Items.Count,
        WindowStart = windowResult.WindowStart,
        WindowEnd = windowResult.WindowEnd
    })
    .Sink(summary => Console.WriteLine($"Customer {summary.CustomerId}: {summary.TotalAmount}"))
    .Build();

// Sliding Window - Overlapping windows with slide interval
var slidingStream = StreamBuilder<SensorReading>
    .CreateNewStream("SensorAverage")
    .Stream()
    .SlidingWindow(
        keySelector: e => e.SensorId,
        timestampSelector: e => e.Timestamp,
        windowSize: TimeSpan.FromMinutes(10),
        slideInterval: TimeSpan.FromMinutes(2))
    .Map(window => new { window.Key, Average = window.Items.Average(r => r.Value) })
    .Sink(avg => Console.WriteLine($"Sensor {avg.Key} avg: {avg.Average}"))
    .Build();

// Session Window - Activity-based windows with inactivity gaps
var sessionStream = StreamBuilder<UserActivity>
    .CreateNewStream("UserSessions")
    .Stream()
    .SessionWindow(
        keySelector: e => e.UserId,
        timestampSelector: e => e.ActivityTime,
        inactivityGap: TimeSpan.FromMinutes(30))
    .Map(session => new UserSession 
    { 
        UserId = session.Key,
        Events = session.Items.ToList(),
        Duration = session.WindowEnd - session.WindowStart
    })
    .Sink(s => Console.WriteLine($"Session ended for {s.UserId}: {s.Events.Count} events"))
    .Build();

🔗 Stream-Stream Joins

Join two streams together within time windows:

// Create the join operator with configuration
var joinOperator = new StreamStreamJoinOperator<Order, Payment, string, OrderWithPayment>(
    leftKeySelector: order => order.OrderId,
    rightKeySelector: payment => payment.OrderId,
    leftTimestampSelector: order => order.CreatedAt,
    rightTimestampSelector: payment => payment.ProcessedAt,
    joinFunction: (order, payment) => new OrderWithPayment 
    { 
        Order = order, 
        Payment = payment 
    },
    configuration: new StreamJoinConfiguration 
    { 
        WindowSize = TimeSpan.FromMinutes(5),
        JoinType = StreamJoinType.Inner
    });

// Build the stream with the join
var orderStream = StreamBuilder<Order>
    .CreateNewStream("OrderPaymentJoin")
    .Stream()
    .JoinStream(joinOperator)
    .Sink(result => Console.WriteLine($"Matched: {result.Order.OrderId}"))
    .Build();

// Feed payments to the right side of the join
paymentStream.ForEach(payment => joinOperator.ProcessRight(payment));

📊 Stream-Table Left Joins

Enrich stream data with table lookups:

// Create a state store for the "table" side
var customerStore = new InMemoryStateStore<string, Customer>("CustomerStore");

// Build stream with left join - emits result even when no match found
var enrichedStream = StreamBuilder<OrderEvent>
    .CreateNewStream("EnrichedOrders")
    .Stream()
    .LeftJoin<Customer, string, EnrichedEvent>(
        rightStateStore: customerStore,
        keySelector: e => e.CustomerId,
        joinFunction: (order, customer) => new EnrichedEvent 
        { 
            Order = order, 
            CustomerName = customer?.Name ?? "Unknown",
            CustomerTier = customer?.Tier ?? "Standard"
        })
    .Sink(e => Console.WriteLine($"Order for {e.CustomerName}"))
    .Build();

🔀 FanOut: Multi-Sink Stream Support

Route events to multiple sinks with optional filters:

var stream = StreamBuilder<LogEvent>
    .CreateNewStream("LogRouter")
    .Stream()
    .FanOut(fanOut => fanOut
        .To("alerts", log => log.Level == "ERROR", log => alertService.SendAlert(log))
        .To("metrics", log => log.Level == "INFO", log => metricsService.RecordMetric(log))
        .To("archive", log => archiveService.Store(log))) // No filter = all events
    .Build();

// With transformations before sinking
.FanOut(fanOut => fanOut
    .To("console", log => Console.WriteLine(log.Message))
    .ToWithTransform("json", 
        log => JsonSerializer.Serialize(log), 
        json => File.AppendAllText("logs.json", json)))

📡 Async Buffering & Backpressure

Handle high-throughput scenarios with built-in backpressure:

// Option 1: Use predefined high-throughput settings
var stream = StreamBuilder<Event>
    .CreateNewStream("HighThroughputStream")
    .WithPerformanceOptions(StreamPerformanceOptions.HighThroughput(
        bufferCapacity: 100_000,
        concurrencyLevel: Environment.ProcessorCount))
    .Stream()
    .Map(e => ProcessEvent(e))
    .Sink(e => SaveToDatabase(e))
    .Build();

// Option 2: Custom configuration
var streamCustom = StreamBuilder<Event>
    .CreateNewStream("CustomBufferedStream")
    .WithPerformanceOptions(new StreamPerformanceOptions
    {
        EnableBufferedProcessing = true,
        BufferCapacity = 10_000,
        BackpressureStrategy = BackpressureStrategy.Block,
        BatchSize = 100,
        BatchTimeout = TimeSpan.FromMilliseconds(50),
        ConcurrencyLevel = 4,
        BlockingTimeout = TimeSpan.FromSeconds(30),
        OnItemDropped = (item, reason) => _logger.LogWarning("Dropped: {Reason}", reason)
    })
    .Stream()
    .Map(e => ProcessEvent(e))
    .Sink(e => SaveToDatabase(e))
    .Build();

// Option 3: Low-latency settings (immediate processing)
var lowLatencyStream = StreamBuilder<Event>
    .CreateNewStream("LowLatencyStream")
    .WithPerformanceOptions(StreamPerformanceOptions.LowLatency())
    .Stream()
    .Sink(e => ProcessImmediately(e))
    .Build();

🎛️ Simplified StreamBuilder API

The StreamBuilder now uses a single type parameter for stream creation:

// Clean, simple API - just specify the input type
var stream = StreamBuilder<OrderEvent>
    .CreateNewStream("MyStream")
    .Stream()
    .Filter(e => e.IsValid)
    .Map(e => Transform(e))
    .Sink(e => Save(e))
    .Build();

// With external source operator
var kafkaStream = StreamBuilder<OrderEvent>
    .CreateNewStream("KafkaOrders")
    .Stream(new KafkaSourceOperator<OrderEvent>(kafkaConfig))
    .Filter(e => e.Amount > 100)
    .Sink(e => ProcessLargeOrder(e))
    .Build();

📝 Structured Logging for Operators

All source and sink operators now include structured logging:

// Automatic structured logging for all operators
[14:23:45 INF] KafkaSourceOperator started consuming from topic "orders"
[14:23:46 INF] Processed 1000 messages in 1.2s (833 msg/s)
[14:23:47 WRN] Backpressure detected, slowing consumption

🧠 Mediator Pattern Improvements

🎭 Type-Inferred Commands & Queries

No more explicit type parameters – the compiler infers everything:

// Before (v2.x) - Explicit type parameters required
var result = await mediator.SendCommandAsync<CreateOrderCommand, OrderId>(command);
var data = await mediator.SendQueryAsync<GetOrderQuery, OrderDto>(query);

// After (v3.0) - Type inference with extension methods!
var result = await mediator.SendAsync(command);  // Returns OrderId
var data = await mediator.QueryAsync(query);     // Returns OrderDto

// Void commands work too!
await mediator.SendAsync(new DeleteOrderCommand { OrderId = id });

// Publish notifications
await mediator.PublishAsync(new OrderCreatedNotification { OrderId = id });

🔔 Notification Pipeline Behaviors

Add cross-cutting concerns to notifications:

public class LoggingNotificationBehavior<TNotification> 
    : INotificationBehavior<TNotification>
    where TNotification : INotification
{
    public async Task Handle(
        TNotification notification,
        NotificationHandlerDelegate next,
        CancellationToken cancellationToken)
    {
        _logger.LogInformation("Publishing {Notification}", typeof(TNotification).Name);
        await next();
        _logger.LogInformation("Published {Notification}", typeof(TNotification).Name);
    }
}

🚨 Exception Handling Pipeline Behaviors

Centralized exception handling for your CQRS pipeline:

// Register exception handling behaviors
services.AddCortexMediator(
    new[] { typeof(Program).Assembly },
    options => options....
Read more

v3.1.0

30 Jan 11:43
5b5fe0e

Choose a tag to compare

🚀 Cortex Data Framework v3.1 Release Notes

Release Date: 30 January 2026
Version: 3.1.0

We're thrilled to announce the release of Cortex Data Framework v3.1 – the most feature-packed, performant, and developer-friendly version yet! This release brings major enhancements to stream processing, state management, mediator patterns, and introduces powerful new type utilities.


🎯 Highlights

Feature Description
🪟 Unified Windowing API Tumbling, Sliding, and Session windows with custom triggers
🔗 Stream-Stream Joins Windowed joins between two streams
🎛️ Simplified APIs Single type parameter for StreamBuilder
🦆 DuckDB State Store High-performance analytical state storage
📡 Async Buffering Backpressure-aware stream processing
🔀 FanOut Operator Multi-sink stream support with filters
🎭 Type-Inferred Mediator No more explicit generic parameters

🌊 Stream Processing Enhancements

🪟 Unified Windowing API

Create tumbling, sliding, and session windows with a clean, unified API:

// Tumbling Window - Non-overlapping fixed-size windows
// Returns WindowResult<string, OrderEvent> containing the key and all items in the window
var stream = StreamBuilder<OrderEvent>
    .CreateNewStream("OrderAggregation")
    .Stream()
    .TumblingWindow(
        keySelector: e => e.CustomerId,
        timestampSelector: e => e.OrderTime,
        windowSize: TimeSpan.FromMinutes(5))
    .Map(windowResult => new OrderSummary 
    { 
        CustomerId = windowResult.Key,
        TotalAmount = windowResult.Items.Sum(o => o.Amount),
        Count = windowResult.Items.Count,
        WindowStart = windowResult.WindowStart,
        WindowEnd = windowResult.WindowEnd
    })
    .Sink(summary => Console.WriteLine($"Customer {summary.CustomerId}: {summary.TotalAmount}"))
    .Build();

// Sliding Window - Overlapping windows with slide interval
var slidingStream = StreamBuilder<SensorReading>
    .CreateNewStream("SensorAverage")
    .Stream()
    .SlidingWindow(
        keySelector: e => e.SensorId,
        timestampSelector: e => e.Timestamp,
        windowSize: TimeSpan.FromMinutes(10),
        slideInterval: TimeSpan.FromMinutes(2))
    .Map(window => new { window.Key, Average = window.Items.Average(r => r.Value) })
    .Sink(avg => Console.WriteLine($"Sensor {avg.Key} avg: {avg.Average}"))
    .Build();

// Session Window - Activity-based windows with inactivity gaps
var sessionStream = StreamBuilder<UserActivity>
    .CreateNewStream("UserSessions")
    .Stream()
    .SessionWindow(
        keySelector: e => e.UserId,
        timestampSelector: e => e.ActivityTime,
        inactivityGap: TimeSpan.FromMinutes(30))
    .Map(session => new UserSession 
    { 
        UserId = session.Key,
        Events = session.Items.ToList(),
        Duration = session.WindowEnd - session.WindowStart
    })
    .Sink(s => Console.WriteLine($"Session ended for {s.UserId}: {s.Events.Count} events"))
    .Build();

🔗 Stream-Stream Joins

Join two streams together within time windows:

// Create the join operator with configuration
var joinOperator = new StreamStreamJoinOperator<Order, Payment, string, OrderWithPayment>(
    leftKeySelector: order => order.OrderId,
    rightKeySelector: payment => payment.OrderId,
    leftTimestampSelector: order => order.CreatedAt,
    rightTimestampSelector: payment => payment.ProcessedAt,
    joinFunction: (order, payment) => new OrderWithPayment 
    { 
        Order = order, 
        Payment = payment 
    },
    configuration: new StreamJoinConfiguration 
    { 
        WindowSize = TimeSpan.FromMinutes(5),
        JoinType = StreamJoinType.Inner
    });

// Build the stream with the join
var orderStream = StreamBuilder<Order>
    .CreateNewStream("OrderPaymentJoin")
    .Stream()
    .JoinStream(joinOperator)
    .Sink(result => Console.WriteLine($"Matched: {result.Order.OrderId}"))
    .Build();

// Feed payments to the right side of the join
paymentStream.ForEach(payment => joinOperator.ProcessRight(payment));

📊 Stream-Table Left Joins

Enrich stream data with table lookups:

// Create a state store for the "table" side
var customerStore = new InMemoryStateStore<string, Customer>("CustomerStore");

// Build stream with left join - emits result even when no match found
var enrichedStream = StreamBuilder<OrderEvent>
    .CreateNewStream("EnrichedOrders")
    .Stream()
    .LeftJoin<Customer, string, EnrichedEvent>(
        rightStateStore: customerStore,
        keySelector: e => e.CustomerId,
        joinFunction: (order, customer) => new EnrichedEvent 
        { 
            Order = order, 
            CustomerName = customer?.Name ?? "Unknown",
            CustomerTier = customer?.Tier ?? "Standard"
        })
    .Sink(e => Console.WriteLine($"Order for {e.CustomerName}"))
    .Build();

🔀 FanOut: Multi-Sink Stream Support

Route events to multiple sinks with optional filters:

var stream = StreamBuilder<LogEvent>
    .CreateNewStream("LogRouter")
    .Stream()
    .FanOut(fanOut => fanOut
        .To("alerts", log => log.Level == "ERROR", log => alertService.SendAlert(log))
        .To("metrics", log => log.Level == "INFO", log => metricsService.RecordMetric(log))
        .To("archive", log => archiveService.Store(log))) // No filter = all events
    .Build();

// With transformations before sinking
.FanOut(fanOut => fanOut
    .To("console", log => Console.WriteLine(log.Message))
    .ToWithTransform("json", 
        log => JsonSerializer.Serialize(log), 
        json => File.AppendAllText("logs.json", json)))

📡 Async Buffering & Backpressure

Handle high-throughput scenarios with built-in backpressure:

// Option 1: Use predefined high-throughput settings
var stream = StreamBuilder<Event>
    .CreateNewStream("HighThroughputStream")
    .WithPerformanceOptions(StreamPerformanceOptions.HighThroughput(
        bufferCapacity: 100_000,
        concurrencyLevel: Environment.ProcessorCount))
    .Stream()
    .Map(e => ProcessEvent(e))
    .Sink(e => SaveToDatabase(e))
    .Build();

// Option 2: Custom configuration
var streamCustom = StreamBuilder<Event>
    .CreateNewStream("CustomBufferedStream")
    .WithPerformanceOptions(new StreamPerformanceOptions
    {
        EnableBufferedProcessing = true,
        BufferCapacity = 10_000,
        BackpressureStrategy = BackpressureStrategy.Block,
        BatchSize = 100,
        BatchTimeout = TimeSpan.FromMilliseconds(50),
        ConcurrencyLevel = 4,
        BlockingTimeout = TimeSpan.FromSeconds(30),
        OnItemDropped = (item, reason) => _logger.LogWarning("Dropped: {Reason}", reason)
    })
    .Stream()
    .Map(e => ProcessEvent(e))
    .Sink(e => SaveToDatabase(e))
    .Build();

// Option 3: Low-latency settings (immediate processing)
var lowLatencyStream = StreamBuilder<Event>
    .CreateNewStream("LowLatencyStream")
    .WithPerformanceOptions(StreamPerformanceOptions.LowLatency())
    .Stream()
    .Sink(e => ProcessImmediately(e))
    .Build();

🎛️ Simplified StreamBuilder API

The StreamBuilder now uses a single type parameter for stream creation:

// Clean, simple API - just specify the input type
var stream = StreamBuilder<OrderEvent>
    .CreateNewStream("MyStream")
    .Stream()
    .Filter(e => e.IsValid)
    .Map(e => Transform(e))
    .Sink(e => Save(e))
    .Build();

// With external source operator
var kafkaStream = StreamBuilder<OrderEvent>
    .CreateNewStream("KafkaOrders")
    .Stream(new KafkaSourceOperator<OrderEvent>(kafkaConfig))
    .Filter(e => e.Amount > 100)
    .Sink(e => ProcessLargeOrder(e))
    .Build();

📝 Structured Logging for Operators

All source and sink operators now include structured logging:

// Automatic structured logging for all operators
[14:23:45 INF] KafkaSourceOperator started consuming from topic "orders"
[14:23:46 INF] Processed 1000 messages in 1.2s (833 msg/s)
[14:23:47 WRN] Backpressure detected, slowing consumption

🧠 Mediator Pattern Improvements

🎭 Type-Inferred Commands & Queries

No more explicit type parameters – the compiler infers everything:

// Before (v2.x) - Explicit type parameters required
var result = await mediator.SendCommandAsync<CreateOrderCommand, OrderId>(command);
var data = await mediator.SendQueryAsync<GetOrderQuery, OrderDto>(query);

// After (v3.0) - Type inference with extension methods!
var result = await mediator.SendAsync(command);  // Returns OrderId
var data = await mediator.QueryAsync(query);     // Returns OrderDto

// Void commands work too!
await mediator.SendAsync(new DeleteOrderCommand { OrderId = id });

// Publish notifications
await mediator.PublishAsync(new OrderCreatedNotification { OrderId = id });

🔔 Notification Pipeline Behaviors

Add cross-cutting concerns to notifications:

public class LoggingNotificationBehavior<TNotification> 
    : INotificationBehavior<TNotification>
    where TNotification : INotification
{
    public async Task Handle(
        TNotification notification,
        NotificationHandlerDelegate next,
        CancellationToken cancellationToken)
    {
        _logger.LogInformation("Publishing {Notification}", typeof(TNotification).Name);
        await next();
        _logger.LogInformation("Published {Notification}", typeof(TNotification).Name);
    }
}

🚨 Exception Handling Pipeline Behaviors

Centralized exception handling for your CQRS pipeline:

// Register exception handling behaviors
services.AddCortexMediator(
    new[] { typeof(Program).Assembly },
    options...
Read more

v2.2.0

24 Jan 23:01
b9cf456

Choose a tag to compare

Release of version v2.2.0

We are thrilled to announce the 2st release of the second version of Cortex Data Framework, a comprehensive and robust platform designed to redefine how developers approach real-time data streaming, processing, and state management. With this release, Cortex empowers you to build scalable, high-performance, and maintainable data pipelines tailored to diverse use cases.

🎉 What's Changed

  • 🧩 Introduced Cortex.Serialization.Yaml for native YAML serialization support.
  • 📘 Added a comprehensive README with multi .NET version compatibility.
  • 🛡️ Implemented ValidationCommandBehavior for centralized command validation.
  • ⚡ Added full Async/Await support across mediator behaviors and handlers.
  • 🔗 Integrated FluentValidation directly into the mediator pipeline.
  • 🔄 Refactored the AddCortexMediator method signature for a cleaner DI configuration.
  • 🧹 Simplified mediator command invocation patterns for better API ergonomics.
  • 🏗️ Improved pipeline robustness and behavior wiring.
  • 🚀 General stabilization and readiness for the Cortex v2.2 release.

In short, v2.2 delivers:

  • 🧬 More flexible serialization (YAML support),
  • 🔒 A stronger and cleaner validation pipeline,
  • ⚙️ A fully async-first architecture,
  • 🧩 A simpler and more expressive Mediator API,
  • ✨ A better overall developer experience.

📚 Documentation & Examples

Get started quickly with the comprehensive Cortex Documentation. Examples include:

  • Setting up pipelines.
  • Using windowing for temporal analysis.
  • Building custom operators.

🛠 Developer Highlights

1. Simple Stream Builder API:

  • Example:
var stream = StreamBuilder<int, int>
    .CreateNewStream("MyStream")
    .Stream()
    .Map(x => x * 2)
    .Filter(x => x > 10)
    .Sink(Console.WriteLine)
    .Build();
stream.Start();

2. Dynamic State Stores:

  • Use InMemoryStateStore or RocksDbStateStore with a single line of code.
  • Built-in thread-safety for multi-tenant applications.

3. Customizable Metrics:
Add telemetry to monitor every operator and capture real-time performance metrics.

🚧 Known Issues

  • Initial setups may experience minor configuration hurdles with telemetry providers. Please refer to the documentation for troubleshooting.
  • Memory usage for large datasets may increase when using in-memory state stores.

💌 Thanks for Being a Part of the Journey

We are excited to see the amazing projects you will build with Cortex Data Framework! Your feedback is invaluable—reach out to us via GitHub Issues or email for support and suggestions.

Stay Connected

Follow our progress and contribute:

GitHub: Cortex Repository
Discussions: Community Forum
Join our community in Discord: Discord Shield

Here’s to building better, faster, and more scalable data-driven applications. 🚀

v2.1.0

10 Oct 20:28
700f272

Choose a tag to compare

Release of version v2.1.0

We are thrilled to announce the 1st release of the second version of Cortex Data Framework, a comprehensive and robust platform designed to redefine how developers approach real-time data streaming, processing, and state management. With this release, Cortex empowers you to build scalable, high-performance, and maintainable data pipelines tailored to diverse use cases.

🎉 What's Changed

  • ✨ Cortex.Mediator: Added non-returning ICommand interface (brought back as requested). (#141, #142)
  • ✅ Cortex.Mediator: Added validation support (pipeline/behaviors). (#129, #130)
    • Cortex.Mediator.Behaviors.FluentValidation: implementation of the FluentValidation validation for Commands and Queries
  • 🚀 Streams: Added Key support for Kafka Stream (#127, #131) and Pulsar Stream (#128, #132, #133).
  • 🧠 New module: Cortex.Vectors — initial creation and docs. (#107, #134)
  • ⚡ Streams: Added EmitAsync alongside Emit. (#102, #143)
  • 🐛 Fix: Addressed generic type-argument inference error. (#139)

🔧 Potential breaking changes / upgrade notes

  • If you previously used “void/Unit” style commands through workarounds, migrate to the new ICommand (non-returning) interface. Returning commands continue to use ICommand.
  • Review mediator validation behaviors if you have custom pipelines to ensure order/registration matches your expectations.
  • For Streams using keys, set the key on produce/emit operations to enable partition-aware routing in Kafka/Pulsar.
  • Prefer EmitAsync in async pipelines to avoid blocking the caller; Emit remains available for synchronous flows.

📚 Documentation & Examples

Get started quickly with the comprehensive Cortex Documentation. Examples include:

  • Setting up pipelines.
  • Using windowing for temporal analysis.
  • Building custom operators.

🛠 Developer Highlights

1. Simple Stream Builder API:

  • Example:
var stream = StreamBuilder<int, int>
    .CreateNewStream("MyStream")
    .Stream()
    .Map(x => x * 2)
    .Filter(x => x > 10)
    .Sink(Console.WriteLine)
    .Build();
stream.Start();

2. Dynamic State Stores:

  • Use InMemoryStateStore or RocksDbStateStore with a single line of code.
  • Built-in thread-safety for multi-tenant applications.

3. Customizable Metrics:
Add telemetry to monitor every operator and capture real-time performance metrics.

🚧 Known Issues

  • Initial setups may experience minor configuration hurdles with telemetry providers. Please refer to the documentation for troubleshooting.
  • Memory usage for large datasets may increase when using in-memory state stores.

💌 Thanks for Being a Part of the Journey

We are excited to see the amazing projects you will build with Cortex Data Framework! Your feedback is invaluable—reach out to us via GitHub Issues or email for support and suggestions.

Stay Connected

Follow our progress and contribute:

GitHub: Cortex Repository
Discussions: Community Forum
Join our community in Discord: Discord Shield

Here’s to building better, faster, and more scalable data-driven applications. 🚀

v1.7.0

24 Jul 12:47
475b75a

Choose a tag to compare

Release of version v1.7.0

We are thrilled to announce the 6th major release of the first version of Cortex Data Framework, a comprehensive and robust platform designed to redefine how developers approach real-time data streaming, processing, and state management. With this release, Cortex empowers you to build scalable, high-performance, and maintainable data pipelines tailored to diverse use cases.

🎉 What's Changed

  • Make ICommand and ICommandHandler generic (TResult)
  • Update IMediator / Mediator to return TResult from commands
  • Generalize pipeline & logging behaviors to TResult
    • CommandPipelineBehavior
    • LoggingCommandBehavior
    • add LoggingQueryBehavior
  • add Common/Unit.cs to represent “void” command results
  • Update DI helpers (MediatorOptions*, ServiceCollectionExtensions)
  • README updated

BREAKING CHANGE: All command handlers must now implement ICommandHandler<TCommand, TResult>. For commands that previously returned nothing, return Unit.

📚 Documentation & Examples

Get started quickly with the comprehensive Cortex Documentation. Examples include:

  • Setting up pipelines.
  • Using windowing for temporal analysis.
  • Building custom operators.

🛠 Developer Highlights

1. Simple Stream Builder API:

  • Example:
var stream = StreamBuilder<int, int>
    .CreateNewStream("MyStream")
    .Stream()
    .Map(x => x * 2)
    .Filter(x => x > 10)
    .Sink(Console.WriteLine)
    .Build();
stream.Start();

2. Dynamic State Stores:

  • Use InMemoryStateStore or RocksDbStateStore with a single line of code.
  • Built-in thread-safety for multi-tenant applications.

3. Customizable Metrics:
Add telemetry to monitor every operator and capture real-time performance metrics.

🚧 Known Issues

  • Initial setups may experience minor configuration hurdles with telemetry providers. Please refer to the documentation for troubleshooting.
  • Memory usage for large datasets may increase when using in-memory state stores.

💌 Thanks for Being a Part of the Journey

We are excited to see the amazing projects you will build with Cortex Data Framework! Your feedback is invaluable—reach out to us via GitHub Issues or email for support and suggestions.

Stay Connected

Follow our progress and contribute:

GitHub: Cortex Repository
Discussions: Community Forum
Join our community in Discord: Discord Shield

Here’s to building better, faster, and more scalable data-driven applications. 🚀

v1.6.1

08 Apr 11:22
7312a6f

Choose a tag to compare

Release of version v1.6.1

We are thrilled to announce the fifth major release of the first version of Cortex Data Framework, a comprehensive and robust platform designed to redefine how developers approach real-time data streaming, processing, and state management. With this release, Cortex empowers you to build scalable, high-performance, and maintainable data pipelines tailored to diverse use cases.

🎉 What's Changed

  • Add Joins for Stream to Table [Cortex.Streams];
  • We have rewritten the entire library to support CQRS [Cortex.Mediator];
    • Async Processing for Commands, Queries, and Notifications,
    • Add support for Pipelines,
    • Add default CommandPipelines,
    • Add support for DI

📚 Documentation & Examples

Get started quickly with the comprehensive Cortex Documentation. Examples include:

  • Setting up pipelines.
  • Using windowing for temporal analysis.
  • Building custom operators.

🛠 Developer Highlights

1. Simple Stream Builder API:

  • Example:
var stream = StreamBuilder<int, int>
    .CreateNewStream("MyStream")
    .Stream()
    .Map(x => x * 2)
    .Filter(x => x > 10)
    .Sink(Console.WriteLine)
    .Build();
stream.Start();

2. Dynamic State Stores:

  • Use InMemoryStateStore or RocksDbStateStore with a single line of code.
  • Built-in thread-safety for multi-tenant applications.

3. Customizable Metrics:
Add telemetry to monitor every operator and capture real-time performance metrics.

🚧 Known Issues

  • Initial setups may experience minor configuration hurdles with telemetry providers. Please refer to the documentation for troubleshooting.
  • Memory usage for large datasets may increase when using in-memory state stores.

💌 Thanks for Being a Part of the Journey

We are excited to see the amazing projects you will build with Cortex Data Framework! Your feedback is invaluable—reach out to us via GitHub Issues or email for support and suggestions.

Stay Connected

Follow our progress and contribute:

GitHub: Cortex Repository
Discussions: Community Forum
Join our community in Discord: Discord Shield

Here’s to building better, faster, and more scalable data-driven applications. 🚀

v1.6.0

05 Apr 12:13
d3c0b71

Choose a tag to compare

Release of version v1.6.0

We are thrilled to announce the fifth major release of the first version of Cortex Data Framework, a comprehensive and robust platform designed to redefine how developers approach real-time data streaming, processing, and state management. With this release, Cortex empowers you to build scalable, high-performance, and maintainable data pipelines tailored to diverse use cases.

🎉 What's Changed

  • Add Joins for Stream to Table [Cortex.Streams];
  • We have re-written the entire library for supporting CQRS [Cortex.Mediator];
    • Async Processing for Commands, Queries and Notifications,
    • Add support for Pipelines,
    • Add default CommandPipelines,
    • Add support for DI

📚 Documentation & Examples

Get started quickly with the comprehensive Cortex Documentation. Examples include:

  • Setting up pipelines.
  • Using windowing for temporal analysis.
  • Building custom operators.

🛠 Developer Highlights

1. Simple Stream Builder API:

  • Example:
var stream = StreamBuilder<int, int>
    .CreateNewStream("MyStream")
    .Stream()
    .Map(x => x * 2)
    .Filter(x => x > 10)
    .Sink(Console.WriteLine)
    .Build();
stream.Start();

2. Dynamic State Stores:

  • Use InMemoryStateStore or RocksDbStateStore with a single line of code.
  • Built-in thread-safety for multi-tenant applications.

3. Customizable Metrics:
Add telemetry to monitor every operator and capture real-time performance metrics.

🚧 Known Issues

  • Initial setups may experience minor configuration hurdles with telemetry providers. Please refer to the documentation for troubleshooting.
  • Memory usage for large datasets may increase when using in-memory state stores.

💌 Thanks for Being a Part of the Journey

We are excited to see the amazing projects you will build with Cortex Data Framework! Your feedback is invaluable—reach out to us via GitHub Issues or email for support and suggestions.

Stay Connected

Follow our progress and contribute:

GitHub: Cortex Repository
Discussions: Community Forum
Join our community in Discord: Discord Shield

Here’s to building better, faster, and more scalable data-driven applications. 🚀

v1.5.0

08 Feb 14:53
c56c98d

Choose a tag to compare

Release of version v1.5.0

We are thrilled to announce the fifth major release of the first version of Cortex Data Framework, a comprehensive and robust platform designed to redefine how developers approach real-time data streaming, processing, and state management. With this release, Cortex empowers you to build scalable, high-performance, and maintainable data pipelines tailored to diverse use cases.

🎉 What's Changed

  • Add CDC Sink Operator for ElasticSearch [Cortex.Streams];
  • Add OneOf keyword [Cortex.Types];
  • Add AnyOf keyword [Cortex.Types];
  • Add AllOf keyword [Cortex.Types];

📚 Documentation & Examples

Get started quickly with the comprehensive Cortex Documentation. Examples include:

  • Setting up pipelines.
  • Using windowing for temporal analysis.
  • Building custom operators.

🛠 Developer Highlights

1. Simple Stream Builder API:

  • Example:
var stream = StreamBuilder<int, int>
    .CreateNewStream("MyStream")
    .Stream()
    .Map(x => x * 2)
    .Filter(x => x > 10)
    .Sink(Console.WriteLine)
    .Build();
stream.Start();

2. Dynamic State Stores:

  • Use InMemoryStateStore or RocksDbStateStore with a single line of code.
  • Built-in thread-safety for multi-tenant applications.

3. Customizable Metrics:
Add telemetry to monitor every operator and capture real-time performance metrics.

🚧 Known Issues

  • Initial setups may experience minor configuration hurdles with telemetry providers. Please refer to the documentation for troubleshooting.
  • Memory usage for large datasets may increase when using in-memory state stores.

💌 Thanks for Being a Part of the Journey

We are excited to see the amazing projects you will build with Cortex Data Framework! Your feedback is invaluable—reach out to us via GitHub Issues or email for support and suggestions.

Stay Connected

Follow our progress and contribute:

GitHub: Cortex Repository
Discussions: Community Forum
Join our community in Discord: Discord Shield

Here’s to building better, faster, and more scalable data-driven applications. 🚀

v1.4.0

18 Jan 22:48
2c195c9

Choose a tag to compare

Release of version v1.4.0

We are thrilled to announce the third major release of the first version of Cortex Data Framework, a comprehensive and robust platform designed to redefine how developers approach real-time data streaming, processing, and state management. With this release, Cortex empowers you to build scalable, high-performance, and maintainable data pipelines tailored to diverse use cases.

🎉 What's Changed

  • Add CDC Source Operator for Microsoft Sql Server [Cortex.Streams];
  • Add CDC Source Operator for PostgreSQL [Cortex.Streams];
  • Add CDC Source Operator for MongoDb [Cortex.Streams];
  • Add support for Sqlite [Cortex.States];

📚 Documentation & Examples

Get started quickly with the comprehensive Cortex Documentation. Examples include:

  • Setting up pipelines.
  • Using windowing for temporal analysis.
  • Building custom operators.

🛠 Developer Highlights

1. Simple Stream Builder API:

  • Example:
var stream = StreamBuilder<int, int>
    .CreateNewStream("MyStream")
    .Stream()
    .Map(x => x * 2)
    .Filter(x => x > 10)
    .Sink(Console.WriteLine)
    .Build();
stream.Start();

2. Dynamic State Stores:

  • Use InMemoryStateStore or RocksDbStateStore with a single line of code.
  • Built-in thread-safety for multi-tenant applications.

3. Customizable Metrics:
Add telemetry to monitor every operator and capture real-time performance metrics.

🚧 Known Issues

  • Initial setups may experience minor configuration hurdles with telemetry providers. Please refer to the documentation for troubleshooting.
  • Memory usage for large datasets may increase when using in-memory state stores.

💌 Thanks for Being a Part of the Journey

We are excited to see the amazing projects you will build with Cortex Data Framework! Your feedback is invaluable—reach out to us via GitHub Issues or email for support and suggestions.

Stay Connected

Follow our progress and contribute:

GitHub: Cortex Repository
Discussions: Community Forum
Join our community in Discord: Discord Shield

Here’s to building better, faster, and more scalable data-driven applications. 🚀