From 32eb23891e3a7913331693ede239e8c49807cd56 Mon Sep 17 00:00:00 2001 From: Konstantin Tenekedzhiev Date: Thu, 5 Dec 2024 16:36:59 +0200 Subject: [PATCH 1/3] fix: makes RebuildProjectionSequentially_Job actually sequential and not dependent on the IEventHandler registration order on the projection type --- .../RebuildProjectionSequentially_Job.cs | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs b/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs index e9d97a79..c9e7d417 100644 --- a/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs +++ b/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs @@ -98,43 +98,47 @@ protected override async Task RunJobAsync(IClusterOperations { OnAggregateStreamLoadedAsync = async stream => { + List events = []; foreach (string eventTypeContract in projectionEventsContractIds) { - var eventsData = stream.Commits + var interested = stream.Commits .SelectMany(x => x.Events) - .Where(x => IsInterested(eventTypeContract, x.Data)) - .Select(x => x.Data); + .Where(x => IsInterested(eventTypeContract, x.Data)); - foreach (var eventRaw in eventsData) + foreach (var eventRaw in interested) { - IEvent @event = serializer.DeserializeFromBytes(eventRaw); - @event = @event.Unwrap(); + var @event = serializer.DeserializeFromBytes(eventRaw.Data).Unwrap(); if (@event is null) { - logger.LogError("Failed to deserialize event from data {data}.", eventRaw); + logger.LogError("Failed to deserialize event from data {data}.", eventRaw.Data); return; } - Task projectionStoreTask = projectionWriter.SaveAsync(projectionType, @event, version); - Task replayTask = eventSourcedProjection.ReplayEventAsync(@event); + events.Add(@event); + } + } - try - { - await Task.WhenAll([projectionStoreTask, replayTask]).ConfigureAwait(false); - } - catch (Exception ex) - { - if (projectionStoreTask.IsFaulted) - logger.LogError(ex, "Failed to persist event!"); + foreach (var @event in events.OrderBy(x => x.Timestamp)) + { + Task projectionStoreTask = projectionWriter.SaveAsync(projectionType, @event, version); + Task replayTask = eventSourcedProjection.ReplayEventAsync(@event); - if (replayTask.IsFaulted) - logger.LogError(ex, "Failed to replay event!"); + try + { + await Task.WhenAll([projectionStoreTask, replayTask]).ConfigureAwait(false); + } + catch (Exception ex) + { + if (projectionStoreTask.IsFaulted) + logger.LogError(ex, "Failed to persist event!"); - continue; - } + if (replayTask.IsFaulted) + logger.LogError(ex, "Failed to replay event!"); - progressTracker.TrackAndNotify(@event.GetType().GetContractId(), ct); + continue; } + + progressTracker.TrackAndNotify(@event.GetType().GetContractId(), ct); } }, NotifyProgressAsync = async options => From 1a5ca792068d847c58803caef99b756fccffdb49 Mon Sep 17 00:00:00 2001 From: Konstantin Tenekedzhiev Date: Fri, 6 Dec 2024 12:28:39 +0200 Subject: [PATCH 2/3] fix: soft the events by revision and position --- .../RebuildProjectionSequentially_Job.cs | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs b/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs index c9e7d417..086ae8ba 100644 --- a/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs +++ b/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs @@ -98,28 +98,25 @@ protected override async Task RunJobAsync(IClusterOperations { OnAggregateStreamLoadedAsync = async stream => { - List events = []; + List rawEvents = []; foreach (string eventTypeContract in projectionEventsContractIds) { var interested = stream.Commits .SelectMany(x => x.Events) .Where(x => IsInterested(eventTypeContract, x.Data)); - foreach (var eventRaw in interested) - { - var @event = serializer.DeserializeFromBytes(eventRaw.Data).Unwrap(); - if (@event is null) - { - logger.LogError("Failed to deserialize event from data {data}.", eventRaw.Data); - return; - } - - events.Add(@event); - } + rawEvents.AddRange(interested); } - foreach (var @event in events.OrderBy(x => x.Timestamp)) + foreach (var eventRaw in rawEvents.OrderBy(x => x.Revision).ThenBy(x => x.Position)) { + var @event = serializer.DeserializeFromBytes(eventRaw.Data).Unwrap(); + if (@event is null) + { + logger.LogError("Failed to deserialize event from data {data}.", eventRaw.Data); + return; + } + Task projectionStoreTask = projectionWriter.SaveAsync(projectionType, @event, version); Task replayTask = eventSourcedProjection.ReplayEventAsync(@event); From 006efc049d9d07acaa14b58368b89b2c0aab71f7 Mon Sep 17 00:00:00 2001 From: mynkow Date: Tue, 10 Dec 2024 10:34:58 +0200 Subject: [PATCH 3/3] Adds some comments --- .../Rebuilding/RebuildProjectionSequentially_Job.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs b/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs index 086ae8ba..bbe4727e 100644 --- a/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs +++ b/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs @@ -98,17 +98,21 @@ protected override async Task RunJobAsync(IClusterOperations { OnAggregateStreamLoadedAsync = async stream => { + // 1. It is important to check all EventHandlers if they are interested in specific events and do this FIRST. List rawEvents = []; foreach (string eventTypeContract in projectionEventsContractIds) { - var interested = stream.Commits + IEnumerable interested = stream.Commits .SelectMany(x => x.Events) .Where(x => IsInterested(eventTypeContract, x.Data)); + // Do not try to optimize the GC using Enumerable.Concat because it does not solve the issue rawEvents.AddRange(interested); } - foreach (var eventRaw in rawEvents.OrderBy(x => x.Revision).ThenBy(x => x.Position)) + // 2. Then the result is sorted! + IEnumerable rawEventsSorted = rawEvents.OrderBy(x => x.Revision).ThenBy(x => x.Position); // Do not try to optimize the sorting with SortedDictionary. It is not faster. + foreach (AggregateEventRaw eventRaw in rawEventsSorted) { var @event = serializer.DeserializeFromBytes(eventRaw.Data).Unwrap(); if (@event is null)