Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 205 additions & 181 deletions composer.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions deptrac-baseline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ deptrac:
- Patchlevel\EventSourcing\Subscription\RunMode
Patchlevel\EventSourcing\Attribute\Projector:
- Patchlevel\EventSourcing\Subscription\RunMode
Patchlevel\EventSourcing\Attribute\SharedApplyContext:
- Patchlevel\EventSourcing\Aggregate\AggregateRoot
Patchlevel\EventSourcing\Attribute\Stream:
- Patchlevel\EventSourcing\Aggregate\AggregateRoot
Patchlevel\EventSourcing\Attribute\Subscriber:
Expand Down
46 changes: 46 additions & 0 deletions docs/pages/aggregate.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ final class Profile extends BasicAggregateRoot
}
}
```
!!! tip

You don't necessarily need to define multiple `Apply` attributes with the event class
if you define the event types in the method using a union type.

## Suppress missing apply methods

Sometimes you have events that do not change the state of the aggregate itself,
Expand Down Expand Up @@ -358,6 +363,38 @@ final class Profile extends BasicAggregateRoot

When all events are suppressed, debugging becomes more difficult if you forget an apply method.

## Shared apply context

When working with [micro-aggregates](./aggregate.md#micro-aggregates),
it’s common that events are applied by different aggregates.
As a result, an aggregate may receive events it does not handle, which can lead to multiple “missing apply” warnings.

The `SharedApplyContext` attribute allows you to declare that several aggregates share the same apply context.
With this configuration, a missing apply is only reported if none of the shared aggregates handle the event.

```php
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\SharedApplyContext;
use Patchlevel\EventSourcing\Attribute\Stream;

#[Aggregate('profile')]
#[SharedApplyContext([PersonalInformation::class])]
final class Profile extends BasicAggregateRoot
{
}

#[Aggregate('personal_information')]
#[Stream(Profile::class)]
#[SharedApplyContext([Profile::class])]
final class PersonalInformation extends BasicAggregateRoot
{
}
```
!!! warning

You need to define the `SharedApplyContext` attribute on all aggregates that share the apply context.

## Stream Name

!!! warning
Expand Down Expand Up @@ -675,8 +712,10 @@ use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Attribute\SharedApplyContext;

#[Aggregate('order')]
#[SharedApplyContext([Shipping::class])]
final class Order extends BasicAggregateRoot
{
#[Id]
Expand Down Expand Up @@ -706,10 +745,12 @@ use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Attribute\SharedApplyContext;
use Patchlevel\EventSourcing\Attribute\Stream;

#[Aggregate('shipping')]
#[Stream(Order::class)]
#[SharedApplyContext([Order::class])]
final class Shipping extends BasicAggregateRoot
{
#[Id]
Expand Down Expand Up @@ -740,6 +781,11 @@ final class Shipping extends BasicAggregateRoot
}
}
```
!!! tip

With the [SharedApplyContext](./aggregate.md#shared-apply-context) attribute,
you can suppress missing applies for events that are handled by other aggregates.

### Child Aggregates

??? example "Experimental"
Expand Down
12 changes: 12 additions & 0 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,18 @@ foreach ($subscriptions as $subscription) {
echo $subscription->status()->value;
}
```
### Refresh

If you change the metadata of a subscriber in the code (e.g. `runMode`, `group` or `cleanupTasks`),
you can use the `refresh` method to update the existing subscriptions in the store.

```php
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->refresh(new SubscriptionEngineCriteria());
```
## Learn more

* [How to use CLI commands](./cli.md)
Expand Down
2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mkdocs-material==9.7.1

# Markdown extensions
Pygments==2.19.2
pymdown-extensions==10.20.1
pymdown-extensions==10.21

# MkDocs plugins
mkdocs-material-extensions==1.3.1
8 changes: 7 additions & 1 deletion phpstan-baseline.neon
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
parameters:
ignoreErrors:
-
message: '#^Cannot unset offset ''url'' on array\{application_name\?\: string, charset\?\: string, dbname\?\: string, defaultTableOptions\?\: array\<string, mixed\>, driver\?\: ''ibm_db2''\|''mysqli''\|''oci8''\|''pdo_mysql''\|''pdo_oci''\|''pdo_pgsql''\|''pdo_sqlite''\|''pdo_sqlsrv''\|''pgsql''\|''sqlite3''\|''sqlsrv'', driverClass\?\: class\-string\<Doctrine\\DBAL\\Driver\>, driverOptions\?\: array\<mixed\>, host\?\: string, \.\.\.\}\.$#'
message: '#^Cannot unset offset ''url'' on array\{application_name\?\: string, charset\?\: string, defaultTableOptions\?\: array\<string, mixed\>, driver\?\: ''ibm_db2''\|''mysqli''\|''oci8''\|''pdo_mysql''\|''pdo_oci''\|''pdo_pgsql''\|''pdo_sqlite''\|''pdo_sqlsrv''\|''pgsql''\|''sqlite3''\|''sqlsrv'', driverClass\?\: class\-string\<Doctrine\\DBAL\\Driver\>, driverOptions\?\: array\<mixed\>, host\?\: string, keepReplica\?\: bool, \.\.\.\}\.$#'
identifier: unset.offset
count: 1
path: src/Console/DoctrineHelper.php
Expand Down Expand Up @@ -444,6 +444,12 @@ parameters:
count: 1
path: tests/Unit/Fixture/ProfileWithHandler.php

-
message: '#^Property Patchlevel\\EventSourcing\\Tests\\Unit\\Fixture\\ProfileWithSharedApplyContext\:\:\$id is unused\.$#'
identifier: property.unused
count: 1
path: tests/Unit/Fixture/ProfileWithSharedApplyContext.php

-
message: '#^Property Patchlevel\\EventSourcing\\Tests\\Unit\\Fixture\\ProfileWithSuppressAll\:\:\$id is unused\.$#'
identifier: property.unused
Expand Down
18 changes: 18 additions & 0 deletions src/Attribute/SharedApplyContext.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;

#[Attribute(Attribute::TARGET_CLASS)]
final readonly class SharedApplyContext
{
/** @param list<class-string<AggregateRoot>> $aggregates */
public function __construct(
public array $aggregates,
) {
}
}
36 changes: 36 additions & 0 deletions src/Console/Command/SubscriptionRefreshCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console\Command;

use LogicException;
use Patchlevel\EventSourcing\Subscription\Engine\CanRefreshSubscriptions;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

use function sprintf;

#[AsCommand(
'event-sourcing:subscription:refresh',
'Refresh subscriptions (run-mode, group)',
)]
final class SubscriptionRefreshCommand extends SubscriptionCommand
{
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (!$this->engine instanceof CanRefreshSubscriptions) {
throw new LogicException(sprintf(
'"%s" does not implement "%s" and cannot call refresh.',
$this->engine::class,
CanRefreshSubscriptions::class,
));
}

$criteria = $this->subscriptionEngineCriteria($input);
$this->engine->refresh($criteria);

return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\ChildAggregate;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Attribute\SharedApplyContext;
use Patchlevel\EventSourcing\Attribute\Snapshot as AttributeSnapshot;
use Patchlevel\EventSourcing\Attribute\Stream;
use Patchlevel\EventSourcing\Attribute\SuppressMissingApply;
Expand Down Expand Up @@ -73,25 +74,42 @@ public function metadata(string $aggregate): AggregateRootMetadata
private function findSuppressMissingApply(ReflectionClass $reflector): array
{
$suppressEvents = [];
$suppressAll = false;

$attributes = $reflector->getAttributes(SuppressMissingApply::class);

foreach ($attributes as $attribute) {
$instance = $attribute->newInstance();
if ($attributes !== []) {
$instance = $attributes[0]->newInstance();

if ($instance->suppressAll) {
$suppressAll = true;

continue;
return [[], true];
}

foreach ($instance->suppressEvents as $event) {
$suppressEvents[$event] = true;
}
}

return [$suppressEvents, $suppressAll];
$attributes = $reflector->getAttributes(SharedApplyContext::class);

if ($attributes !== []) {
$instance = $attributes[0]->newInstance();

foreach ($instance->aggregates as $aggregateClass) {
$reflectionClass = new ReflectionClass($aggregateClass);

$applyMethods = $this->findApplyMethods(
$reflectionClass,
$aggregateClass,
$this->findChildAggregates($reflectionClass),
);

foreach ($applyMethods as $eventClass => $method) {
$suppressEvents[$eventClass] = true;
}
}
}

return [$suppressEvents, false];
}

private function findAggregateName(ReflectionClass $reflector): string
Expand Down
10 changes: 10 additions & 0 deletions src/Subscription/Engine/CanRefreshSubscriptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

interface CanRefreshSubscriptions
{
public function refresh(SubscriptionEngineCriteria|null $criteria = null): Result;
}
17 changes: 16 additions & 1 deletion src/Subscription/Engine/CatchUpSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

namespace Patchlevel\EventSourcing\Subscription\Engine;

use LogicException;
use Patchlevel\EventSourcing\Subscription\Subscription;

use function array_merge;
use function sprintf;

use const PHP_INT_MAX;

final class CatchUpSubscriptionEngine implements SubscriptionEngine
final class CatchUpSubscriptionEngine implements SubscriptionEngine, CanRefreshSubscriptions
{
public function __construct(
private readonly SubscriptionEngine $parent,
Expand Down Expand Up @@ -86,6 +88,19 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null):
return $this->parent->subscriptions($criteria);
}

public function refresh(SubscriptionEngineCriteria|null $criteria = null): Result
{
if (!$this->parent instanceof CanRefreshSubscriptions) {
throw new LogicException(sprintf(
'"%s" does not implement "%s" and cannot call refresh.',
$this->parent::class,
CanRefreshSubscriptions::class,
));
}

return $this->parent->refresh($criteria);
}

private function mergeResult(ProcessedResult ...$results): ProcessedResult
{
$processedMessages = 0;
Expand Down
78 changes: 77 additions & 1 deletion src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
use function count;
use function sprintf;

final class DefaultSubscriptionEngine implements SubscriptionEngine
final class DefaultSubscriptionEngine implements SubscriptionEngine, CanRefreshSubscriptions
{
private SubscriptionManager $subscriptionManager;

Expand Down Expand Up @@ -823,6 +823,82 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null):
);
}

public function refresh(SubscriptionEngineCriteria|null $criteria = null): Result
{
$criteria ??= new SubscriptionEngineCriteria();

$this->discoverNewSubscriptions();

$subscriptions = $this->subscriptionManager->find(new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
));

foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());

if (!$subscriber) {
continue;
}

$changed = false;

if ($subscription->runMode() !== $subscriber->runMode()) {
$changed = true;
$oldRunMode = $subscription->runMode();
$subscription->changeRunMode($subscriber->runMode());

$this->logger?->info(
sprintf(
'Subscription Engine: Subscription "%s" run mode changed from "%s" to "%s".',
$subscription->id(),
$oldRunMode->value,
$subscription->runMode()->value,
),
);
}

if ($subscription->group() !== $subscriber->group()) {
$changed = true;
$oldGroup = $subscription->group();
$subscription->changeGroup($subscriber->group());

$this->logger?->info(
sprintf(
'Subscription Engine: Subscription "%s" group changed from "%s" to "%s".',
$subscription->id(),
$oldGroup,
$subscription->group(),
),
);
}

$cleanupTasks = $this->cleanupTasks($subscriber);

if ($subscription->cleanupTasks() !== $cleanupTasks) {
$changed = true;
$subscription->replaceCleanupTasks($cleanupTasks);

$this->logger?->info(
sprintf(
'Subscription Engine: Subscription "%s" cleanup tasks changed.',
$subscription->id(),
),
);
}

if (!$changed) {
continue;
}

$this->subscriptionManager->update($subscription);
}

$this->subscriptionManager->flush();

return new Result();
}

private function handleMessage(int $index, Message $message, Subscription $subscription): Error|null
{
$subscriber = $this->subscriber($subscription->id());
Expand Down
Loading
Loading