From e23fa739e46d31781b6efc2710dea65873e39670 Mon Sep 17 00:00:00 2001 From: Joao Gilberto Balsini Moura Date: Sat, 27 Apr 2024 11:47:29 -0300 Subject: [PATCH 1/3] Improved snapshotting --- .../aggregates/aggregate_state_builder.ex | 41 ++++++++++++++++++- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/lib/commanded/aggregates/aggregate_state_builder.ex b/lib/commanded/aggregates/aggregate_state_builder.ex index 90bf39c7..2bd8b5de 100644 --- a/lib/commanded/aggregates/aggregate_state_builder.ex +++ b/lib/commanded/aggregates/aggregate_state_builder.ex @@ -1,4 +1,6 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do + require Logger + alias Commanded.Aggregates.Aggregate alias Commanded.EventStore alias Commanded.EventStore.RecordedEvent @@ -64,13 +66,48 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do defp rebuild_from_event_stream(event_stream, %Aggregate{} = state) do Enum.reduce(event_stream, state, fn event, state -> %RecordedEvent{data: data, stream_version: stream_version} = event - %Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state %Aggregate{ - state + aggregate_module: aggregate_module, + aggregate_state: aggregate_state, + aggregate_version: aggregate_version, + snapshotting: snapshotting + } = state + + state_with_snapshot = + if snapshotting && Snapshotting.snapshot_required?(snapshotting, stream_version) do + case Snapshotting.take_snapshot(snapshotting, aggregate_version, aggregate_state) do + {:ok, snapshotting} -> + # nocommit + IO.puts("Stream version: #{stream_version}") + %Aggregate{state | snapshotting: snapshotting} + + {:error, error} -> + Logger.warning(fn -> + describe(state) <> " snapshot failed due to: " <> inspect(error) + end) + + state + end + else + state + end + + %Aggregate{ + state_with_snapshot | aggregate_version: stream_version, aggregate_state: aggregate_module.apply(aggregate_state, data) } end) end + + defp describe(%Aggregate{} = aggregate) do + %Aggregate{ + aggregate_module: aggregate_module, + aggregate_uuid: aggregate_uuid, + aggregate_version: aggregate_version + } = aggregate + + "#{inspect(aggregate_module)}<#{aggregate_uuid}@#{aggregate_version}>" + end end From 06c06e052f41cadca1e26cb125f7c24ac519b81b Mon Sep 17 00:00:00 2001 From: Joao Gilberto Balsini Moura Date: Sat, 27 Apr 2024 11:58:57 -0300 Subject: [PATCH 2/3] Fix --- .../aggregates/aggregate_state_builder.ex | 55 ++++++++++--------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/lib/commanded/aggregates/aggregate_state_builder.ex b/lib/commanded/aggregates/aggregate_state_builder.ex index 2bd8b5de..2a8fdaf5 100644 --- a/lib/commanded/aggregates/aggregate_state_builder.ex +++ b/lib/commanded/aggregates/aggregate_state_builder.ex @@ -67,31 +67,9 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do Enum.reduce(event_stream, state, fn event, state -> %RecordedEvent{data: data, stream_version: stream_version} = event - %Aggregate{ - aggregate_module: aggregate_module, - aggregate_state: aggregate_state, - aggregate_version: aggregate_version, - snapshotting: snapshotting - } = state - - state_with_snapshot = - if snapshotting && Snapshotting.snapshot_required?(snapshotting, stream_version) do - case Snapshotting.take_snapshot(snapshotting, aggregate_version, aggregate_state) do - {:ok, snapshotting} -> - # nocommit - IO.puts("Stream version: #{stream_version}") - %Aggregate{state | snapshotting: snapshotting} - - {:error, error} -> - Logger.warning(fn -> - describe(state) <> " snapshot failed due to: " <> inspect(error) - end) - - state - end - else - state - end + %Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state + + state_with_snapshot = state_with_snapshot(stream_version, state) %Aggregate{ state_with_snapshot @@ -101,6 +79,33 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do end) end + defp state_with_snapshot( + stream_version, + %Aggregate{ + aggregate_state: aggregate_state, + aggregate_version: aggregate_version, + snapshotting: snapshotting + } = state + ) do + if snapshotting && Snapshotting.snapshot_required?(snapshotting, stream_version) do + case Snapshotting.take_snapshot(snapshotting, aggregate_version, aggregate_state) do + {:ok, snapshotting} -> + # nocommit + IO.puts("Stream version: #{stream_version}") + %Aggregate{state | snapshotting: snapshotting} + + {:error, error} -> + Logger.warning(fn -> + describe(state) <> " snapshot failed due to: " <> inspect(error) + end) + + state + end + else + state + end + end + defp describe(%Aggregate{} = aggregate) do %Aggregate{ aggregate_module: aggregate_module, From 3ffdc0d25b05a4dda4fda749ee009b5157eb6ac4 Mon Sep 17 00:00:00 2001 From: Joao Gilberto Balsini Moura Date: Fri, 9 Aug 2024 09:15:10 -0300 Subject: [PATCH 3/3] Dialyzer for events --- lib/commanded/commands/router.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/commanded/commands/router.ex b/lib/commanded/commands/router.ex index 60599301..342f4589 100644 --- a/lib/commanded/commands/router.ex +++ b/lib/commanded/commands/router.ex @@ -371,6 +371,7 @@ defmodule Commanded.Commands.Router do | {:ok, aggregate_state :: struct()} | {:ok, aggregate_version :: non_neg_integer()} | {:ok, execution_result :: ExecutionResult.t()} + | {:ok, events :: list()} | {:error, :unregistered_command} | {:error, :consistency_timeout} | {:error, reason :: term()}