diff --git a/.tool-versions b/.tool-versions index acf1141d..1cfcb49d 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ elixir 1.13.3-otp-24 -erlang 24.2 +erlang 24.2.1 diff --git a/lib/commanded/aggregates/aggregate_state_builder.ex b/lib/commanded/aggregates/aggregate_state_builder.ex index 1c4bf004..7642755b 100644 --- a/lib/commanded/aggregates/aggregate_state_builder.ex +++ b/lib/commanded/aggregates/aggregate_state_builder.ex @@ -5,6 +5,8 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do alias Commanded.EventStore.SnapshotData alias Commanded.Snapshotting + require Logger + @read_event_batch_size 100 @doc """ @@ -64,10 +66,35 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do defp rebuild_from_event_stream(event_stream, %Aggregate{} = state) do Enum.reduce(event_stream, state, fn event, state -> %RecordedEvent{data: data, stream_version: stream_version} = event - %Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state %Aggregate{ - state + aggregate_module: aggregate_module, + aggregate_state: aggregate_state, + aggregate_version: aggregate_version, + snapshotting: snapshotting + } = state + + state_with_snapshot = + if snapshotting && Snapshotting.snapshot_required?(snapshotting, stream_version) do + case Snapshotting.take_snapshot(snapshotting, aggregate_version, aggregate_state) do + {:ok, snapshotting} -> + # nocommit + IO.puts("Stream version: #{stream_version}") + %Aggregate{state | snapshotting: snapshotting} + + {:error, error} -> + Logger.warn(fn -> + "snapshot failed due to: " <> inspect(error) + end) + + state + end + else + state + end + + %Aggregate{ + state_with_snapshot | aggregate_version: stream_version, aggregate_state: aggregate_module.apply(aggregate_state, data) }