From 16ad2017f9b3d503de69ad5c36b93f759194c2fb Mon Sep 17 00:00:00 2001 From: klimick Date: Tue, 23 Dec 2025 10:31:15 +0300 Subject: [PATCH 1/2] fewer allocations in consumer --- src/ConsumeController.php | 35 +++++++++-------- src/Consumer.php | 8 ++-- ...onsumeHandler.php => ConsumeScheduler.php} | 38 +++++++++---------- 3 files changed, 40 insertions(+), 41 deletions(-) rename src/Internal/{ConsumeHandler.php => ConsumeScheduler.php} (68%) diff --git a/src/ConsumeController.php b/src/ConsumeController.php index aa2eb68..a98ceda 100644 --- a/src/ConsumeController.php +++ b/src/ConsumeController.php @@ -5,6 +5,7 @@ namespace Thesis\Pgmq; use Amp\Postgres\PostgresTransaction; +use Thesis\Pgmq; use Thesis\Time\TimeSpan; /** @@ -12,9 +13,12 @@ */ final readonly class ConsumeController { + /** + * @param non-empty-string $queue + */ public function __construct( public PostgresTransaction $tx, - private Queue $queue, + private string $queue, private ConsumeContext $context, ) {} @@ -28,10 +32,11 @@ public function stop(): void */ public function ack(array $messages): void { - $this->queue->deleteBatch(array_map( - static fn(Message $message): int => $message->id, - $messages, - )); + Pgmq\deleteBatch( + pg: $this->tx, + queue: $this->queue, + messageIds: array_map(static fn(Message $m) => $m->id, $messages), + ); } /** @@ -39,12 +44,11 @@ public function ack(array $messages): void */ public function nack(array $messages, TimeSpan $delay): void { - $this->queue->setVisibilityTimeout( - array_map( - static fn(Message $message): int => $message->id, - $messages, - ), - $delay, + Pgmq\setVisibilityTimeout( + pg: $this->tx, + queue: $this->queue, + messageIds: array_map(static fn(Message $m) => $m->id, $messages), + visibilityTimeout: $delay, ); } @@ -53,9 +57,10 @@ public function nack(array $messages, TimeSpan $delay): void */ public function term(array $messages): void { - $this->queue->archiveBatch(array_map( - static fn(Message $message): int => $message->id, - $messages, - )); + Pgmq\archiveBatch( + pg: $this->tx, + queue: $this->queue, + messageIds: array_map(static fn(Message $m) => $m->id, $messages), + ); } } diff --git a/src/Consumer.php b/src/Consumer.php index 5e09f3b..b31d0ab 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -14,7 +14,7 @@ */ final class Consumer { - /** @var list */ + /** @var list */ private array $consumers = []; public function __construct( @@ -56,7 +56,7 @@ public function consume( ); } - $handle = new Internal\ConsumeHandler( + $context = Internal\ConsumeScheduler::schedule( $this->pg, $config, $handler, @@ -64,9 +64,7 @@ public function consume( $polls, ); - $this->consumers[] = $handle; - - return $handle->context; + return $this->consumers[] = $context; } public function stop(): void diff --git a/src/Internal/ConsumeHandler.php b/src/Internal/ConsumeScheduler.php similarity index 68% rename from src/Internal/ConsumeHandler.php rename to src/Internal/ConsumeScheduler.php index 6b3ee20..31bb90d 100644 --- a/src/Internal/ConsumeHandler.php +++ b/src/Internal/ConsumeScheduler.php @@ -9,34 +9,35 @@ use Amp\Postgres\PostgresConnection; use Revolt\EventLoop; use Thesis\Pgmq; +use Thesis\Pgmq\ConsumeController; +use Thesis\Pgmq\Message; /** * @internal */ -final readonly class ConsumeHandler +final readonly class ConsumeScheduler { - public Pgmq\ConsumeContext $context; - - /** @var DeferredFuture<*> */ - private DeferredFuture $completionMarker; - /** - * @param callable(non-empty-list, Pgmq\ConsumeController): void $handler + * @param callable(non-empty-list, ConsumeController): void $handler * @param Pipeline\Queue $polls */ - public function __construct( + public static function schedule( PostgresConnection $pg, Pgmq\ConsumeConfig $config, callable $handler, PollWatcher $watcher, - private Pipeline\Queue $polls, - ) { + Pipeline\Queue $polls, + ): Pgmq\ConsumeContext { $iterator = $polls->iterate(); - $this->completionMarker = $completionMarker = new DeferredFuture(); + $completionMarker = new DeferredFuture(); - $this->context = $context = new Pgmq\ConsumeContext( - $this->stop(...), - $this->completionMarker->getFuture(), + $context = new Pgmq\ConsumeContext( + stop: static function () use ($polls): void { + if (!$polls->isComplete()) { + $polls->complete(); + } + }, + completionMarker: $completionMarker->getFuture(), ); EventLoop::queue(static function () use ( @@ -66,7 +67,7 @@ public function __construct( if (\count($messages) > 0) { $handler( $messages, - new Pgmq\ConsumeController($tx, new Pgmq\Queue($config->queue, $tx), $context), + new ConsumeController($tx, $config->queue, $context), ); } @@ -85,12 +86,7 @@ public function __construct( $completionMarker->complete(); } }); - } - public function stop(): void - { - if (!$this->polls->isComplete()) { - $this->polls->complete(); - } + return $context; } } From dea823fec3d09708d9c9a18e87e74acf37836924 Mon Sep 17 00:00:00 2001 From: klimick Date: Thu, 25 Dec 2025 16:38:14 +0300 Subject: [PATCH 2/2] consume functions instead ConsumeScheduler --- composer.json | 3 +- src/Consumer.php | 3 +- src/Internal/ConsumeScheduler.php | 92 ------------------------------- src/Internal/functions.php | 84 ++++++++++++++++++++++++++++ 4 files changed, 88 insertions(+), 94 deletions(-) delete mode 100644 src/Internal/ConsumeScheduler.php create mode 100644 src/Internal/functions.php diff --git a/composer.json b/composer.json index d16c1ad..52a6610 100644 --- a/composer.json +++ b/composer.json @@ -35,7 +35,8 @@ "Thesis\\Pgmq\\": "src/" }, "files": [ - "src/pgmq.php" + "src/pgmq.php", + "src/Internal/functions.php" ] }, "autoload-dev": { diff --git a/src/Consumer.php b/src/Consumer.php index b31d0ab..a6bd5de 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -8,6 +8,7 @@ use Amp\Pipeline; use Amp\Postgres\PostgresConnection; use function Amp\async; +use function Thesis\Pgmq\Internal\consume; /** * @api @@ -56,7 +57,7 @@ public function consume( ); } - $context = Internal\ConsumeScheduler::schedule( + $context = consume( $this->pg, $config, $handler, diff --git a/src/Internal/ConsumeScheduler.php b/src/Internal/ConsumeScheduler.php deleted file mode 100644 index 31bb90d..0000000 --- a/src/Internal/ConsumeScheduler.php +++ /dev/null @@ -1,92 +0,0 @@ -, ConsumeController): void $handler - * @param Pipeline\Queue $polls - */ - public static function schedule( - PostgresConnection $pg, - Pgmq\ConsumeConfig $config, - callable $handler, - PollWatcher $watcher, - Pipeline\Queue $polls, - ): Pgmq\ConsumeContext { - $iterator = $polls->iterate(); - $completionMarker = new DeferredFuture(); - - $context = new Pgmq\ConsumeContext( - stop: static function () use ($polls): void { - if (!$polls->isComplete()) { - $polls->complete(); - } - }, - completionMarker: $completionMarker->getFuture(), - ); - - EventLoop::queue(static function () use ( - $pg, - $config, - $handler, - $iterator, - $watcher, - $completionMarker, - $context, - ): void { - $watcher->watch(); - - foreach ($iterator as $_) { - $tx = $pg->beginTransaction(); - - try { - $messages = Pgmq\readBatch( - pg: $tx, - queue: $config->queue, - count: $config->batch, - visibilityTimeout: $config->visibilityTimeout, - ); - - $messages = [...$messages]; - - if (\count($messages) > 0) { - $handler( - $messages, - new ConsumeController($tx, $config->queue, $context), - ); - } - - $tx->commit(); - } catch (\Throwable $e) { - $tx->rollback(); - $completionMarker->error($e); - $iterator->dispose(); - break; - } - } - - $watcher->cancel(); - - if (!$completionMarker->isComplete()) { - $completionMarker->complete(); - } - }); - - return $context; - } -} diff --git a/src/Internal/functions.php b/src/Internal/functions.php new file mode 100644 index 0000000..e62fe1b --- /dev/null +++ b/src/Internal/functions.php @@ -0,0 +1,84 @@ +, Pgmq\ConsumeController): void $handler + * @param Pipeline\Queue $polls + */ +function consume( + PostgresConnection $pg, + Pgmq\ConsumeConfig $config, + callable $handler, + PollWatcher $watcher, + Pipeline\Queue $polls, +): Pgmq\ConsumeContext { + $iterator = $polls->iterate(); + $completionMarker = new DeferredFuture(); + + $context = new Pgmq\ConsumeContext( + stop: static function () use ($polls): void { + if (!$polls->isComplete()) { + $polls->complete(); + } + }, + completionMarker: $completionMarker->getFuture(), + ); + + EventLoop::queue(static function () use ( + $pg, + $config, + $handler, + $iterator, + $watcher, + $completionMarker, + $context, + ): void { + $watcher->watch(); + + foreach ($iterator as $_) { + $tx = $pg->beginTransaction(); + + try { + $messages = Pgmq\readBatch( + pg: $tx, + queue: $config->queue, + count: $config->batch, + visibilityTimeout: $config->visibilityTimeout, + ); + + $messages = [...$messages]; + + if (\count($messages) > 0) { + $handler( + $messages, + new Pgmq\ConsumeController($tx, $config->queue, $context), + ); + } + + $tx->commit(); + } catch (\Throwable $e) { + $tx->rollback(); + $completionMarker->error($e); + $iterator->dispose(); + break; + } + } + + $watcher->cancel(); + + if (!$completionMarker->isComplete()) { + $completionMarker->complete(); + } + }); + + return $context; +}