diff --git a/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs b/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs index e9d97a79..bbe4727e 100644 --- a/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs +++ b/src/Elders.Cronus/Projections/Rebuilding/RebuildProjectionSequentially_Job.cs @@ -98,43 +98,48 @@ protected override async Task RunJobAsync(IClusterOperations { OnAggregateStreamLoadedAsync = async stream => { + // 1. It is important to check all EventHandlers if they are interested in specific events and do this FIRST. + List rawEvents = []; foreach (string eventTypeContract in projectionEventsContractIds) { - var eventsData = stream.Commits + IEnumerable interested = stream.Commits .SelectMany(x => x.Events) - .Where(x => IsInterested(eventTypeContract, x.Data)) - .Select(x => x.Data); + .Where(x => IsInterested(eventTypeContract, x.Data)); - foreach (var eventRaw in eventsData) + // Do not try to optimize the GC using Enumerable.Concat because it does not solve the issue + rawEvents.AddRange(interested); + } + + // 2. Then the result is sorted! + IEnumerable rawEventsSorted = rawEvents.OrderBy(x => x.Revision).ThenBy(x => x.Position); // Do not try to optimize the sorting with SortedDictionary. It is not faster. + foreach (AggregateEventRaw eventRaw in rawEventsSorted) + { + var @event = serializer.DeserializeFromBytes(eventRaw.Data).Unwrap(); + if (@event is null) + { + logger.LogError("Failed to deserialize event from data {data}.", eventRaw.Data); + return; + } + + Task projectionStoreTask = projectionWriter.SaveAsync(projectionType, @event, version); + Task replayTask = eventSourcedProjection.ReplayEventAsync(@event); + + try + { + await Task.WhenAll([projectionStoreTask, replayTask]).ConfigureAwait(false); + } + catch (Exception ex) { - IEvent @event = serializer.DeserializeFromBytes(eventRaw); - @event = @event.Unwrap(); - if (@event is null) - { - logger.LogError("Failed to deserialize event from data {data}.", eventRaw); - return; - } - - Task projectionStoreTask = projectionWriter.SaveAsync(projectionType, @event, version); - Task replayTask = eventSourcedProjection.ReplayEventAsync(@event); - - try - { - await Task.WhenAll([projectionStoreTask, replayTask]).ConfigureAwait(false); - } - catch (Exception ex) - { - if (projectionStoreTask.IsFaulted) - logger.LogError(ex, "Failed to persist event!"); - - if (replayTask.IsFaulted) - logger.LogError(ex, "Failed to replay event!"); - - continue; - } - - progressTracker.TrackAndNotify(@event.GetType().GetContractId(), ct); + if (projectionStoreTask.IsFaulted) + logger.LogError(ex, "Failed to persist event!"); + + if (replayTask.IsFaulted) + logger.LogError(ex, "Failed to replay event!"); + + continue; } + + progressTracker.TrackAndNotify(@event.GetType().GetContractId(), ct); } }, NotifyProgressAsync = async options =>