eventsourcing is a generic, type-safe event sourcing framework for Go.
It provides the building blocks for event-driven architectures, including command handling, query handling, event buses, envelopes, and a flexible iterator for read models.
This library focuses on simplicity, modern Go patterns, and full generics support, making it easy to build event-sourced systems with strong type guarantees.
- Commands & Command Handlers – Strongly typed command routing.
- Events & Event Handlers – Publish and consume events safely.
- Event Bus – Supports multiple subscribers with typed handlers.
- Queries & Query Handlers – Request data through a type-safe query bus.
- Query Gateway – Simple façade to dispatch typed queries.
- Generic Iterator – Lazy, paginated, or buffered read model iteration.
- Revision Management – Built-in support for aggregate and stream revisions.
- Metadata & Envelopes – Rich event metadata included by default.
- Type-Safe Generics Everywhere – Commands, events, queries, handlers, results.
go get github.com/terraskye/eventsourcingThe otel subpackage provides built-in observability for your event-sourced application using OpenTelemetry standards.
Event-sourced systems are inherently distributed and asynchronous. Without proper observability:
- Command failures are hard to diagnose
- Event handler latency goes unnoticed
- Concurrency conflicts are invisible
- Performance bottlenecks in the event store remain hidden
The otel package wraps your handlers and stores with tracing spans and metrics, giving you full visibility into your system's behavior without modifying business logic.
| Component | Spans | Metrics |
|---|---|---|
| Command Handlers | command.handle <Type> |
duration, in-flight, handled, failed, conflicts |
| Event Handlers | events.handle <Type> |
duration, handled |
| Event Store | EventStore.Save, EventStore.LoadStream, etc. |
duration, saves, loads, errors, events appended/loaded |
Wrap your handlers and stores with the telemetry decorators:
import "github.com/terraskye/eventsourcing/otel"
// Wrap a command handler
handler := otel.WithCommandTelemetry(myCommandHandler)
// Wrap an event handler
eventHandler := otel.WithEventTelemetry(myEventHandler)
// Wrap an event store
store := otel.WithEventStoreTelemetry(myEventStore)Customize span names and attributes using options:
// Static operation name
handler := otel.WithCommandTelemetry(myHandler,
otel.WithOperation("order.create"),
)
// Add static attributes to all spans
handler := otel.WithCommandTelemetry(myHandler,
otel.WithAttributes(
attribute.String("service.name", "orders"),
attribute.String("service.version", "1.0.0"),
),
)
// Dynamic operation name based on context
handler := otel.WithCommandTelemetry(myHandler,
otel.WithOperationGetter(func(ctx context.Context, defaultOp string) string {
if tenant := TenantFromContext(ctx); tenant != "" {
return fmt.Sprintf("%s [%s]", defaultOp, tenant)
}
return defaultOp
}),
)
// Extract dynamic attributes from context
handler := otel.WithCommandTelemetry(myHandler,
otel.WithAttributeGetter(func(ctx context.Context) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("tenant.id", TenantFromContext(ctx)),
attribute.String("user.id", UserFromContext(ctx)),
}
}),
)Commands:
eventsourcing.commands.handled- total successful commandseventsourcing.commands.failed- total failed commandseventsourcing.commands.duration- histogram of handling time (ms)eventsourcing.commands.in_flight- currently processingeventsourcing.concurrency.conflicts- optimistic locking conflicts
Events:
eventsourcing.eventbus.handled- events processed by handlerseventsourcing.eventbus.duration- handler execution time (ms)eventsourcing.events.appended- events written to storeeventsourcing.events.loaded- events read from store
Event Store:
eventsourcing.eventstore.saves- save operationseventsourcing.eventstore.duration- operation time (ms)eventsourcing.eventstore.errors- failed operations
All spans include semantic attributes following OpenTelemetry conventions:
eventsourcing.command.type- the command type nameeventsourcing.aggregate.id- target aggregate IDeventsourcing.stream.id- event stream identifiereventsourcing.stream.version- stream version after operationeventsourcing.event.type- event type nameeventsourcing.event.id- unique event IDeventsourcing.event.global_position- global ordering positioneventsourcing.event.stream_position- position within stream