From 9d95470c083b55b26c97f820a90fff01f2e5bc15 Mon Sep 17 00:00:00 2001 From: klimick Date: Sun, 21 Dec 2025 11:14:51 +0300 Subject: [PATCH] make pooling required --- src/Consumer.php | 10 ++-------- src/Internal/ChannelWatcher.php | 4 ++-- tests/PgmqTest.php | 2 +- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/Consumer.php b/src/Consumer.php index 891dc2c..5e09f3b 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -42,11 +42,9 @@ public function consume( $timeoutWatcher = $config->pollInterval->isPositive() ? new Internal\TimeoutWatcher($polls, $config->pollInterval) - : null; + : throw new \LogicException('Pooling is required. Set $pollInterval to a positive value.'); - if ($timeoutWatcher !== null) { - $watchers[] = $timeoutWatcher; - } + $watchers[] = $timeoutWatcher; if ($config->listenForInserts) { $channelName = $queue->enableNotifyInsert(); @@ -58,10 +56,6 @@ public function consume( ); } - if (\count($watchers) === 0) { - throw new \LogicException('At least one watcher must be configured. Either set a positive $pollInterval or enable $listenForInserts or both.'); - } - $handle = new Internal\ConsumeHandler( $this->pg, $config, diff --git a/src/Internal/ChannelWatcher.php b/src/Internal/ChannelWatcher.php index 5ad2ce6..14f4794 100644 --- a/src/Internal/ChannelWatcher.php +++ b/src/Internal/ChannelWatcher.php @@ -19,14 +19,14 @@ public function __construct( private Pipeline\Queue $queue, private PostgresListener $listener, - private ?TimeoutWatcher $timeout = null, + private TimeoutWatcher $timeout, ) {} public function watch(): void { EventLoop::queue(function (): void { foreach ($this->listener as $_) { - $this->timeout?->reschedule(); + $this->timeout->reschedule(); $this->queue->pushAsync(null)->ignore(); } }); diff --git a/tests/PgmqTest.php b/tests/PgmqTest.php index 8af2b8d..2176702 100644 --- a/tests/PgmqTest.php +++ b/tests/PgmqTest.php @@ -280,7 +280,7 @@ public function testInvalidConsumerConfiguration(): void $consumer = createConsumer($this->pg); self::expectException(\LogicException::class); - self::expectExceptionMessage('At least one watcher must be configured. Either set a positive $pollInterval or enable $listenForInserts or both.'); + self::expectExceptionMessage('Pooling is required. Set $pollInterval to a positive value.'); $consumer->consume(static fn() => null, new ConsumeConfig( queue: $queue->name, pollInterval: TimeSpan::fromSeconds(0),