diff --git a/lib/commanded/aggregates/aggregate_state_builder.ex b/lib/commanded/aggregates/aggregate_state_builder.ex index 90bf39c7..2a8fdaf5 100644 --- a/lib/commanded/aggregates/aggregate_state_builder.ex +++ b/lib/commanded/aggregates/aggregate_state_builder.ex @@ -1,4 +1,6 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do + require Logger + alias Commanded.Aggregates.Aggregate alias Commanded.EventStore alias Commanded.EventStore.RecordedEvent @@ -64,13 +66,53 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do defp rebuild_from_event_stream(event_stream, %Aggregate{} = state) do Enum.reduce(event_stream, state, fn event, state -> %RecordedEvent{data: data, stream_version: stream_version} = event + %Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state + state_with_snapshot = state_with_snapshot(stream_version, state) + %Aggregate{ - state + state_with_snapshot | aggregate_version: stream_version, aggregate_state: aggregate_module.apply(aggregate_state, data) } end) end + + defp state_with_snapshot( + stream_version, + %Aggregate{ + aggregate_state: aggregate_state, + aggregate_version: aggregate_version, + snapshotting: snapshotting + } = state + ) do + if snapshotting && Snapshotting.snapshot_required?(snapshotting, stream_version) do + case Snapshotting.take_snapshot(snapshotting, aggregate_version, aggregate_state) do + {:ok, snapshotting} -> + # nocommit + IO.puts("Stream version: #{stream_version}") + %Aggregate{state | snapshotting: snapshotting} + + {:error, error} -> + Logger.warning(fn -> + describe(state) <> " snapshot failed due to: " <> inspect(error) + end) + + state + end + else + state + end + end + + defp describe(%Aggregate{} = aggregate) do + %Aggregate{ + aggregate_module: aggregate_module, + aggregate_uuid: aggregate_uuid, + aggregate_version: aggregate_version + } = aggregate + + "#{inspect(aggregate_module)}<#{aggregate_uuid}@#{aggregate_version}>" + end end diff --git a/lib/commanded/commands/router.ex b/lib/commanded/commands/router.ex index 60599301..342f4589 100644 --- a/lib/commanded/commands/router.ex +++ b/lib/commanded/commands/router.ex @@ -371,6 +371,7 @@ defmodule Commanded.Commands.Router do | {:ok, aggregate_state :: struct()} | {:ok, aggregate_version :: non_neg_integer()} | {:ok, execution_result :: ExecutionResult.t()} + | {:ok, events :: list()} | {:error, :unregistered_command} | {:error, :consistency_timeout} | {:error, reason :: term()}