From 471c94bf620e829177bde9eceb310469a2c51506 Mon Sep 17 00:00:00 2001 From: Viktor Masicek Date: Mon, 27 Oct 2025 19:59:25 +0100 Subject: [PATCH] Enable to set consumer labels for their specific restart On PhpAmpq broker realized by specific top priority queue for each consumers --- README.md | 4 +++- src/Broker/Consumer.php | 2 +- src/Broker/PhpAmqpLib/Consumer.php | 6 ++--- src/Broker/PhpAmqpLib/Manager.php | 33 ++++++++++++++++++++++---- src/Broker/PhpAmqpLib/Producer.php | 6 ++--- src/Broker/Producer.php | 4 ++-- src/Console/ConsumeCommand.php | 4 +++- src/Console/ReloadConsumersCommand.php | 19 ++++++++++----- 8 files changed, 57 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 388a79d..019a630 100644 --- a/README.md +++ b/README.md @@ -192,7 +192,7 @@ Ve všech ostatních případech se záznam uloží jako úspěšně dokončený `background-queue:clear-finished 14` Smaže všechny úspěšně zpracované záznamy starší 14 dní. -`background-queue:reload-consumers QUEUE NUMBER` Reloadne NUMBER consumerů pro danou QUEUE. +`background-queue:reload-consumers QUEUE [LABEL1,LABEL2,...]` Reloadne consumery označené jedním z labelů, viz `background-queue:consume`. `background-queue:update-schema` Aktualizuje databázové schéma, pokud je potřeba. @@ -282,6 +282,8 @@ Ale pokud by se vyskytlo více požadavků na zasílání emailů, po nějaké d Dále máme možnost prioritu nastavenou pro callback přetížit při vkládání záznamu v metodě `publish`. Například víme, že se jedná o rozesílání newsletterů. Tedy se jedná o zasílání emailů, ale s nízkou prioritou zpracování. +Příkaz `background-queue:consume` má také volitelný parametr `-l`, kterým konzumerovi nastavíme příslušný label. Při restartu konzumerů pomocí `background-queue:reload-consumers` pak máme možnost temito labely zvolit, které konzumery chceme restartovat. + ``` $priority = null; // aplikuje se priorita 10 z nastavení pro callback if ($isNewsletter) { diff --git a/src/Broker/Consumer.php b/src/Broker/Consumer.php index 3229a39..d6a3231 100644 --- a/src/Broker/Consumer.php +++ b/src/Broker/Consumer.php @@ -4,5 +4,5 @@ interface Consumer { - public function consume(string $queue, array $priorities): void; + public function consume(string $queue, array $priorities, ?string $consumerLabel = null): void; } \ No newline at end of file diff --git a/src/Broker/PhpAmqpLib/Consumer.php b/src/Broker/PhpAmqpLib/Consumer.php index ead4d9a..2fdeb31 100644 --- a/src/Broker/PhpAmqpLib/Consumer.php +++ b/src/Broker/PhpAmqpLib/Consumer.php @@ -3,6 +3,7 @@ namespace ADT\BackgroundQueue\Broker\PhpAmqpLib; use ADT\BackgroundQueue\BackgroundQueue; +use ADT\BackgroundQueue\Console\ReloadConsumersCommand; use Exception; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Message\AMQPMessage; @@ -21,13 +22,12 @@ public function __construct(Manager $manager, BackgroundQueue $backgroundQueue) /** * @throws Exception */ - public function consume(string $queue, array $priorities): void + public function consume(string $queue, array $priorities, ?string $consumerLabel = null): void { // TODO Do budoucna cheme podporovat libovolné priority a ne pouze jejich výčet. // Zde si musíme vytáhnout seznam existujících front. To lze přes HTTP API pomocí CURL. - // Nejprve se chceme kouknout, jestli není zaslána zpráva k ukončení, proto na první místo dáme TOP_PRIORITY frontu. - array_unshift($priorities, Manager::QUEUE_TOP_PRIORITY); + $priorities = $this->manager->includeTopPriority($priorities, $consumerLabel); // Sestavíme si seznam názvů front v RabbitMQ (tedy včetně priorit) a všechny inicializujeme $queuesWithPriorities = []; diff --git a/src/Broker/PhpAmqpLib/Manager.php b/src/Broker/PhpAmqpLib/Manager.php index 9a16836..957137e 100644 --- a/src/Broker/PhpAmqpLib/Manager.php +++ b/src/Broker/PhpAmqpLib/Manager.php @@ -10,6 +10,7 @@ class Manager { const QUEUE_TOP_PRIORITY = 0; + const QUEUE_NAME_PARTS_DELIMITER = '_'; private array $connectionParams; private array $queueParams; @@ -141,14 +142,38 @@ public function setupQos() $this->initQos = true; } - public function getQueueWithPriority(string $queue, int $priority): string + public function getQueueWithPriority(string $queue, string $priority): string { - return $queue . '_' . $priority; + if (strpos($queue, self::QUEUE_NAME_PARTS_DELIMITER) !== false) { + throw new \Exception('Priority cannot contains "' . self::QUEUE_NAME_PARTS_DELIMITER . '".'); + } + return $queue . self::QUEUE_NAME_PARTS_DELIMITER . $priority; + } + + public function includeTopPriority(array $priorities, ?string $label = null): array + { + array_unshift($priorities, $this->getTopPriorityName($label)); + return $priorities; + } + + public function getTopPriorityName(?string $label = null): string + { + $topPriority = self::QUEUE_TOP_PRIORITY; + if (!is_null($label)) { + if (strpos($label, self::QUEUE_NAME_PARTS_DELIMITER) !== false) { + throw new \Exception('Label cannot contains "' . self::QUEUE_NAME_PARTS_DELIMITER . '".'); + } + + $topPriority .= self::QUEUE_NAME_PARTS_DELIMITER . $label; + } + + return $topPriority; } + public function parseQueueAndPriority(string $queueWithPriority): array { - $parts = explode('_', $queueWithPriority); + $parts = explode(self::QUEUE_NAME_PARTS_DELIMITER, $queueWithPriority); if (count($parts) === 2) { return [$parts[0], $parts[1]]; @@ -157,7 +182,7 @@ public function parseQueueAndPriority(string $queueWithPriority): array while (true) { $part = array_shift($parts); if (is_numeric($part)) { - return [implode('_', $nameParts), $part]; + return [implode(self::QUEUE_NAME_PARTS_DELIMITER, $nameParts), $part]; } else { $nameParts[] = $part; } diff --git a/src/Broker/PhpAmqpLib/Producer.php b/src/Broker/PhpAmqpLib/Producer.php index 4495d11..15322d2 100644 --- a/src/Broker/PhpAmqpLib/Producer.php +++ b/src/Broker/PhpAmqpLib/Producer.php @@ -18,7 +18,7 @@ public function __construct( Manager $manager) $this->manager = $manager; } - public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void + public function publish(string $id, string $queue, string $priority, ?int $expiration = null): void { $queue = $this->manager->getQueueWithPriority($queue, $priority); $exchange = $queue; @@ -46,9 +46,9 @@ public function publish(string $id, string $queue, int $priority, ?int $expirati } - public function publishDie(string $queue): void + public function publishDie(string $queue, ?string $consumerLabel = null): void { - $this->publish(self::DIE, $queue, Manager::QUEUE_TOP_PRIORITY); + $this->publish(self::DIE, $queue, $this->manager->getTopPriorityName($consumerLabel)); } private function createMessage(string $body): AMQPMessage diff --git a/src/Broker/Producer.php b/src/Broker/Producer.php index a1cc19d..68d16f0 100644 --- a/src/Broker/Producer.php +++ b/src/Broker/Producer.php @@ -4,6 +4,6 @@ interface Producer { - public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void; - public function publishDie(string $queue): void; + public function publish(string $id, string $queue, string $priority, ?int $expiration = null): void; + public function publishDie(string $queue, ?string $consumerLabel = null): void; } \ No newline at end of file diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index db0bf05..8815de2 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -28,6 +28,7 @@ protected function configure() $this->addArgument('queue', InputArgument::REQUIRED); $this->addOption('jobs', 'j', InputOption::VALUE_REQUIRED, 'Number of jobs consumed by one consumer in one process', 1); $this->addOption('priorities', 'p', InputOption::VALUE_REQUIRED, 'Priorities for consume (e.g. 10, 20-40, 25-, -20)'); + $this->addOption('label', 'l', InputOption::VALUE_OPTIONAL, 'Consumer label for restart it by reload command'); $this->setDescription('Start consumer.'); } @@ -35,6 +36,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int { $jobs = $input->getOption('jobs'); $priorities = $this->getPrioritiesListBasedConfig($input->getOption('priorities')); + $label = $input->getOption('label'); if (!is_numeric($jobs)) { $output->writeln("Option --jobs has to be integer"); @@ -43,7 +45,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int for ($i = 0; $i < (int)$jobs; $i++) { $this->backgroundQueue->dieIfNecessary(); - $this->consumer->consume($input->getArgument('queue'), $priorities); + $this->consumer->consume($input->getArgument('queue'), $priorities, $label); } return 0; diff --git a/src/Console/ReloadConsumersCommand.php b/src/Console/ReloadConsumersCommand.php index 9ef700f..bde0f34 100644 --- a/src/Console/ReloadConsumersCommand.php +++ b/src/Console/ReloadConsumersCommand.php @@ -27,17 +27,24 @@ protected function configure() 'A queue whose consumers are to reload.' ); $this->addArgument( - "number", - InputArgument::REQUIRED, - 'Number of consumers to reload.' + "consumers-labels", + InputArgument::OPTIONAL, + 'Labels of consumers to restart separated by comma.' ); - $this->setDescription('Creates the specified number of noop messages to reload consumers consuming specified queue.'); + $this->setDescription('Restart specified consumers by lables on specified queue.'); } protected function executeCommand(InputInterface $input, OutputInterface $output): int { - for ($i = 0; $i < $input->getArgument("number"); $i++) { - $this->producer->publishDie($input->getArgument("queue")); + $consumersLabels = $input->getArgument("consumers-labels"); + if ($consumersLabels) { + $consumersLabels = explode(',', $consumersLabels); + } else { + $consumersLabels = [null]; + } + + foreach ($consumersLabels as $consumerLabel) { + $this->producer->publishDie($input->getArgument("queue"), $consumerLabel); } return 0;