Skip to content

danieleades/sourcery

Repository files navigation

sourcery

Continuous integration codecov

Building blocks for pragmatic event-sourced systems in Rust. The crate focuses on keeping domain types pure while giving you the tools to rebuild state, project read models, and persist events through a pluggable store interface.

Highlights

  • Domain-first API – events are plain structs that implement DomainEvent; IDs and metadata live in an envelope rather than in your payloads.
  • Aggregate derive#[derive(Aggregate)] generates the event enum plus serialisation/deserialisation glue so command handlers stay focused on behaviour.
  • Repository orchestrationRepository loads aggregates, executes commands via Handle<C>, and persists the resulting events in a single transaction.
  • Metadata-aware projections – projections receive aggregate IDs, events, and metadata via ApplyProjection, enabling cross-aggregate correlation using causation, timestamps, or custom metadata.
  • Store agnostic – ships with an in-memory store for demos/tests; implement the EventStore trait to plug in your own persistence.

Documentation

Full documentation — Conceptual guides, API reference, and runnable examples.

see also, the API docs

How this differs from other Rust event-sourcing crates

This crate borrows inspiration from projects like eventually and cqrs but makes a few different trade-offs:

  • Events are first-class structs. Instead of immediately wrapping events in aggregate-specific enums, each DomainEvent stands on its own. Multiple aggregates (or even completely unrelated subsystems) can reuse the same event type. Projections receive aggregate identifiers and metadata alongside events rather than relying on the payload to embed IDs.

  • Projections are fully decoupled from Aggregates. Read models don’t have to depend on a particular aggregate enum or repository type. You declare the events you care about—potentially pulling from several aggregate kinds—and compose them via the builder. The fluent ProjectionBuilder keeps common cases ergonomic while still leaving room for custom loading logic when you need it.

  • Metadata lives outside domain objects. Infrastructure concerns (aggregate kind, ID, causation/correlation IDs, user info) travel alongside the event as separate parameters to projection handlers. The domain event itself remains pure, making it easier to share across bounded contexts.

  • More boilerplate, mitigated when it matters. Because events and projections are explicit structs, the type definitions are a bit louder than frameworks that lean on trait objects or dynamic dispatch. The provided #[derive(Aggregate)] covers the command side, while projections stay explicit and lean on the builder to avoid duplicate wiring.

  • Minimal infrastructure baked in. There is no built-in command bus, outbox, snapshot scheduler, or event streaming layer. You wire the repository into whichever pipeline you prefer. That keeps the crate lightweight compared to libraries that bundle an entire CQRS stack.

  • Versioning happens at the codec layer. We don’t expose an explicit “upcaster” concept like cqrs. Instead, you can migrate historical events transparently inside your codec (see examples/versioned_events.rs).

  • Type-safe optimistic concurrency by default. Repositories use version-checked mutations by default. Conflicts are detected at the type level—the Optimistic strategy returns OptimisticCommandError::Concurrency when the stream version changes between load and commit. For automatic retry on conflicts, use execute_with_retry:

    let attempts = repo
        .execute_with_retry::<Account, Deposit>(&id, &cmd, &(), 3)
        .await?;
    println!("Succeeded after {attempts} attempt(s)");

    See examples/optimistic_concurrency.rs for more.

Quick start

The snippet below wires together a single aggregate, a projection, and a repository using the aggregate derive and the in-memory store.

use sourcery::{
    Apply, ApplyProjection, DomainEvent, Handle, store::{inmemory, JsonCodec},
    Repository,
};
use serde::{Deserialize, Serialize};

// === Domain events ===

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FundsDeposited {
    pub amount_cents: i64,
}

impl DomainEvent for FundsDeposited {
    const KIND: &'static str = "bank.account.deposited";
}

// === Commands ===

#[derive(Debug)]
pub struct DepositFunds {
    pub amount_cents: i64,
}

// === Aggregate ===

#[derive(Debug, Default, sourcery::Aggregate, Serialize, Deserialize)]
#[aggregate(id = String, error = String, events(FundsDeposited))]
pub struct Account {
    balance_cents: i64,
}

impl Apply<FundsDeposited> for Account {
    fn apply(&mut self, event: &FundsDeposited) {
        self.balance_cents += event.amount_cents;
    }
}

impl Handle<DepositFunds> for Account {
    fn handle(&self, command: &DepositFunds) -> Result<Vec<Self::Event>, Self::Error> {
        if command.amount_cents <= 0 {
            return Err("amount must be positive".to_string());
        }
        Ok(vec![FundsDeposited {
            amount_cents: command.amount_cents,
        }
        .into()])
    }
}

// === Projection ===

#[derive(Debug, Default, sourcery::Projection)]
#[projection(id = String)]
pub struct AccountBalance {
    pub total_cents: i64,
}

impl ApplyProjection<FundsDeposited> for AccountBalance {
    fn apply_projection(
        &mut self,
        _aggregate_id: &Self::Id,
        event: &FundsDeposited,
        _metadata: &Self::Metadata,
    ) {
        self.total_cents += event.amount_cents;
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
    let repository = Repository::new(store);

    let account_id = "ACC-001".to_string();
    let command = DepositFunds { amount_cents: 25_00 };
    repository.execute_command::<Account, DepositFunds>(
        &account_id,
        &command,
        &(),
    )
    .await?;

    let summary = repository
        .build_projection::<AccountBalance>()
        .event::<FundsDeposited>()
        .load()
        .await?;
    assert_eq!(summary.total_cents, 25_00);
    Ok(())
}

See the examples for larger, end-to-end scenarios (composite IDs, CQRS dashboards, versioned events, etc.).

Core concepts

Aggregates

Aggregates rebuild state from events and validate commands. Implement Apply<E> for each event you care about, then add Handle<C> implementations for each command. The #[derive(Aggregate)] macro generates the sum-type that glues everything together.

Projections

Read models that keep their state in sync by replaying events. Projections implement ApplyProjection<E> for the event types they care about and declare their kind, instance ID, aggregate ID, and metadata requirements via the Projection trait (usually with #[derive(Projection)]). Build them by calling Repository::build_projection::<P>(), chaining the relevant .event::<E>() / .event_for::<A, E>() registrations (or .events::<A::Event>() / .events_for::<A>() for aggregate event enums), and finally .load().await.

Event context

The repository loads raw events from the store and dispatches them to projections via the ApplyProjection trait. Each projection handler receives:

  • aggregate_id – the instance identifier
  • event – the deserialized domain event
  • metadata – timestamps, causation IDs, user information, or your own metadata type

Aggregates never see this context—only the pure events.

Repositories & stores

Repository<S> orchestrates aggregate loading, command execution, and appending to the underlying store. The EventStore trait defines the persistence boundary; implement it to back the repository with Postgres, DynamoDB, S3, or anything else that can read/write ordered event streams.

Running the examples

cargo run --example quickstart
cargo run --example inventory_report
cargo run --example subscription_billing
cargo run --example versioned_events
cargo run --example advanced_projection
cargo run --example optimistic_concurrency

Status

The crate is still pre-1.0. Expect APIs to evolve as real-world usage grows. Feedback and contributions are welcome! Submit an issue or pull request if you spot something missing.

📜 Licensing

This project is publicly available under the GNU General Public License v3.0. It may optionally be distributed under the **MIT license by commercial arrangement.


Was this useful? Buy me a coffee

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Sponsor this project

  •  

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •