diff --git a/composer.json b/composer.json index d16c1ad..52a6610 100644 --- a/composer.json +++ b/composer.json @@ -35,7 +35,8 @@ "Thesis\\Pgmq\\": "src/" }, "files": [ - "src/pgmq.php" + "src/pgmq.php", + "src/Internal/functions.php" ] }, "autoload-dev": { diff --git a/src/ConsumeController.php b/src/ConsumeController.php index aa2eb68..a98ceda 100644 --- a/src/ConsumeController.php +++ b/src/ConsumeController.php @@ -5,6 +5,7 @@ namespace Thesis\Pgmq; use Amp\Postgres\PostgresTransaction; +use Thesis\Pgmq; use Thesis\Time\TimeSpan; /** @@ -12,9 +13,12 @@ */ final readonly class ConsumeController { + /** + * @param non-empty-string $queue + */ public function __construct( public PostgresTransaction $tx, - private Queue $queue, + private string $queue, private ConsumeContext $context, ) {} @@ -28,10 +32,11 @@ public function stop(): void */ public function ack(array $messages): void { - $this->queue->deleteBatch(array_map( - static fn(Message $message): int => $message->id, - $messages, - )); + Pgmq\deleteBatch( + pg: $this->tx, + queue: $this->queue, + messageIds: array_map(static fn(Message $m) => $m->id, $messages), + ); } /** @@ -39,12 +44,11 @@ public function ack(array $messages): void */ public function nack(array $messages, TimeSpan $delay): void { - $this->queue->setVisibilityTimeout( - array_map( - static fn(Message $message): int => $message->id, - $messages, - ), - $delay, + Pgmq\setVisibilityTimeout( + pg: $this->tx, + queue: $this->queue, + messageIds: array_map(static fn(Message $m) => $m->id, $messages), + visibilityTimeout: $delay, ); } @@ -53,9 +57,10 @@ public function nack(array $messages, TimeSpan $delay): void */ public function term(array $messages): void { - $this->queue->archiveBatch(array_map( - static fn(Message $message): int => $message->id, - $messages, - )); + Pgmq\archiveBatch( + pg: $this->tx, + queue: $this->queue, + messageIds: array_map(static fn(Message $m) => $m->id, $messages), + ); } } diff --git a/src/Consumer.php b/src/Consumer.php index 5e09f3b..a6bd5de 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -8,13 +8,14 @@ use Amp\Pipeline; use Amp\Postgres\PostgresConnection; use function Amp\async; +use function Thesis\Pgmq\Internal\consume; /** * @api */ final class Consumer { - /** @var list */ + /** @var list */ private array $consumers = []; public function __construct( @@ -56,7 +57,7 @@ public function consume( ); } - $handle = new Internal\ConsumeHandler( + $context = consume( $this->pg, $config, $handler, @@ -64,9 +65,7 @@ public function consume( $polls, ); - $this->consumers[] = $handle; - - return $handle->context; + return $this->consumers[] = $context; } public function stop(): void diff --git a/src/Internal/ConsumeHandler.php b/src/Internal/ConsumeHandler.php deleted file mode 100644 index 6b3ee20..0000000 --- a/src/Internal/ConsumeHandler.php +++ /dev/null @@ -1,96 +0,0 @@ - */ - private DeferredFuture $completionMarker; - - /** - * @param callable(non-empty-list, Pgmq\ConsumeController): void $handler - * @param Pipeline\Queue $polls - */ - public function __construct( - PostgresConnection $pg, - Pgmq\ConsumeConfig $config, - callable $handler, - PollWatcher $watcher, - private Pipeline\Queue $polls, - ) { - $iterator = $polls->iterate(); - $this->completionMarker = $completionMarker = new DeferredFuture(); - - $this->context = $context = new Pgmq\ConsumeContext( - $this->stop(...), - $this->completionMarker->getFuture(), - ); - - EventLoop::queue(static function () use ( - $pg, - $config, - $handler, - $iterator, - $watcher, - $completionMarker, - $context, - ): void { - $watcher->watch(); - - foreach ($iterator as $_) { - $tx = $pg->beginTransaction(); - - try { - $messages = Pgmq\readBatch( - pg: $tx, - queue: $config->queue, - count: $config->batch, - visibilityTimeout: $config->visibilityTimeout, - ); - - $messages = [...$messages]; - - if (\count($messages) > 0) { - $handler( - $messages, - new Pgmq\ConsumeController($tx, new Pgmq\Queue($config->queue, $tx), $context), - ); - } - - $tx->commit(); - } catch (\Throwable $e) { - $tx->rollback(); - $completionMarker->error($e); - $iterator->dispose(); - break; - } - } - - $watcher->cancel(); - - if (!$completionMarker->isComplete()) { - $completionMarker->complete(); - } - }); - } - - public function stop(): void - { - if (!$this->polls->isComplete()) { - $this->polls->complete(); - } - } -} diff --git a/src/Internal/functions.php b/src/Internal/functions.php new file mode 100644 index 0000000..e62fe1b --- /dev/null +++ b/src/Internal/functions.php @@ -0,0 +1,84 @@ +, Pgmq\ConsumeController): void $handler + * @param Pipeline\Queue $polls + */ +function consume( + PostgresConnection $pg, + Pgmq\ConsumeConfig $config, + callable $handler, + PollWatcher $watcher, + Pipeline\Queue $polls, +): Pgmq\ConsumeContext { + $iterator = $polls->iterate(); + $completionMarker = new DeferredFuture(); + + $context = new Pgmq\ConsumeContext( + stop: static function () use ($polls): void { + if (!$polls->isComplete()) { + $polls->complete(); + } + }, + completionMarker: $completionMarker->getFuture(), + ); + + EventLoop::queue(static function () use ( + $pg, + $config, + $handler, + $iterator, + $watcher, + $completionMarker, + $context, + ): void { + $watcher->watch(); + + foreach ($iterator as $_) { + $tx = $pg->beginTransaction(); + + try { + $messages = Pgmq\readBatch( + pg: $tx, + queue: $config->queue, + count: $config->batch, + visibilityTimeout: $config->visibilityTimeout, + ); + + $messages = [...$messages]; + + if (\count($messages) > 0) { + $handler( + $messages, + new Pgmq\ConsumeController($tx, $config->queue, $context), + ); + } + + $tx->commit(); + } catch (\Throwable $e) { + $tx->rollback(); + $completionMarker->error($e); + $iterator->dispose(); + break; + } + } + + $watcher->cancel(); + + if (!$completionMarker->isComplete()) { + $completionMarker->complete(); + } + }); + + return $context; +}