Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,43 +98,48 @@ protected override async Task<JobExecutionStatus> RunJobAsync(IClusterOperations
{
OnAggregateStreamLoadedAsync = async stream =>
{
// 1. It is important to check all EventHandlers if they are interested in specific events and do this FIRST.
List<AggregateEventRaw> rawEvents = [];
foreach (string eventTypeContract in projectionEventsContractIds)
{
var eventsData = stream.Commits
IEnumerable<AggregateEventRaw> interested = stream.Commits
.SelectMany(x => x.Events)
.Where(x => IsInterested(eventTypeContract, x.Data))
.Select(x => x.Data);
.Where(x => IsInterested(eventTypeContract, x.Data));

foreach (var eventRaw in eventsData)
// Do not try to optimize the GC using Enumerable.Concat because it does not solve the issue
rawEvents.AddRange(interested);
}

// 2. Then the result is sorted!
IEnumerable<AggregateEventRaw> rawEventsSorted = rawEvents.OrderBy(x => x.Revision).ThenBy(x => x.Position); // Do not try to optimize the sorting with SortedDictionary. It is not faster.
foreach (AggregateEventRaw eventRaw in rawEventsSorted)
{
var @event = serializer.DeserializeFromBytes<IEvent>(eventRaw.Data).Unwrap();
if (@event is null)
{
logger.LogError("Failed to deserialize event from data {data}.", eventRaw.Data);
return;
}

Task projectionStoreTask = projectionWriter.SaveAsync(projectionType, @event, version);
Task replayTask = eventSourcedProjection.ReplayEventAsync(@event);

try
{
await Task.WhenAll([projectionStoreTask, replayTask]).ConfigureAwait(false);
}
catch (Exception ex)
{
IEvent @event = serializer.DeserializeFromBytes<IEvent>(eventRaw);
@event = @event.Unwrap();
if (@event is null)
{
logger.LogError("Failed to deserialize event from data {data}.", eventRaw);
return;
}

Task projectionStoreTask = projectionWriter.SaveAsync(projectionType, @event, version);
Task replayTask = eventSourcedProjection.ReplayEventAsync(@event);

try
{
await Task.WhenAll([projectionStoreTask, replayTask]).ConfigureAwait(false);
}
catch (Exception ex)
{
if (projectionStoreTask.IsFaulted)
logger.LogError(ex, "Failed to persist event!");

if (replayTask.IsFaulted)
logger.LogError(ex, "Failed to replay event!");

continue;
}

progressTracker.TrackAndNotify(@event.GetType().GetContractId(), ct);
if (projectionStoreTask.IsFaulted)
logger.LogError(ex, "Failed to persist event!");

if (replayTask.IsFaulted)
logger.LogError(ex, "Failed to replay event!");

continue;
}

progressTracker.TrackAndNotify(@event.GetType().GetContractId(), ct);
}
},
NotifyProgressAsync = async options =>
Expand Down
Loading