From 659c1f6d6b235aadcc7787c417f71f1e681c9854 Mon Sep 17 00:00:00 2001 From: Alex Speller Date: Fri, 9 Jul 2021 01:16:29 +0100 Subject: [PATCH 1/2] Respect snapshot params when rebuilding aggregate from event stream --- lib/commanded/aggregates/aggregate.ex | 29 +++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/lib/commanded/aggregates/aggregate.ex b/lib/commanded/aggregates/aggregate.ex index e9d56593..30d07fb7 100644 --- a/lib/commanded/aggregates/aggregate.ex +++ b/lib/commanded/aggregates/aggregate.ex @@ -463,10 +463,35 @@ defmodule Commanded.Aggregates.Aggregate do defp rebuild_from_event_stream(event_stream, %Aggregate{} = state) do Enum.reduce(event_stream, state, fn event, state -> %RecordedEvent{data: data, stream_version: stream_version} = event - %Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state %Aggregate{ - state + aggregate_module: aggregate_module, + aggregate_state: aggregate_state, + aggregate_version: aggregate_version, + snapshotting: snapshotting + } = state + + state_with_snapshot = + if snapshotting && Snapshotting.snapshot_required?(snapshotting, stream_version) do + case Snapshotting.take_snapshot(snapshotting, aggregate_version, aggregate_state) do + {:ok, snapshotting} -> + # nocommit + IO.puts("Stream version: #{stream_version}") + %Aggregate{state | snapshotting: snapshotting} + + {:error, error} -> + Logger.warn(fn -> + describe(state) <> " snapshot failed due to: " <> inspect(error) + end) + + state + end + else + state + end + + %Aggregate{ + state_with_snapshot | aggregate_version: stream_version, aggregate_state: aggregate_module.apply(aggregate_state, data) } From 890e16b40a479fd77c329f8a306da6cd42c1bbc7 Mon Sep 17 00:00:00 2001 From: Julien Guimont Date: Thu, 5 May 2022 13:13:40 -0400 Subject: [PATCH 2/2] Tests --- .tool-versions | 2 +- lib/commanded/aggregates/aggregate_state_builder.ex | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.tool-versions b/.tool-versions index acf1141d..1cfcb49d 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ elixir 1.13.3-otp-24 -erlang 24.2 +erlang 24.2.1 diff --git a/lib/commanded/aggregates/aggregate_state_builder.ex b/lib/commanded/aggregates/aggregate_state_builder.ex index 3f9f1d00..7642755b 100644 --- a/lib/commanded/aggregates/aggregate_state_builder.ex +++ b/lib/commanded/aggregates/aggregate_state_builder.ex @@ -5,6 +5,8 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do alias Commanded.EventStore.SnapshotData alias Commanded.Snapshotting + require Logger + @read_event_batch_size 100 @doc """ @@ -82,7 +84,7 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do {:error, error} -> Logger.warn(fn -> - describe(state) <> " snapshot failed due to: " <> inspect(error) + "snapshot failed due to: " <> inspect(error) end) state