Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ Ve všech ostatních případech se záznam uloží jako úspěšně dokončený

`background-queue:clear-finished 14` Smaže všechny úspěšně zpracované záznamy starší 14 dní.

`background-queue:reload-consumers QUEUE NUMBER` Reloadne NUMBER consumerů pro danou QUEUE.
`background-queue:reload-consumers QUEUE [LABEL1,LABEL2,...]` Reloadne consumery označené jedním z labelů, viz `background-queue:consume`.

`background-queue:update-schema` Aktualizuje databázové schéma, pokud je potřeba.

Expand Down Expand Up @@ -282,6 +282,8 @@ Ale pokud by se vyskytlo více požadavků na zasílání emailů, po nějaké d
Dále máme možnost prioritu nastavenou pro callback přetížit při vkládání záznamu v metodě `publish`. Například víme, že se jedná o rozesílání newsletterů.
Tedy se jedná o zasílání emailů, ale s nízkou prioritou zpracování.

Příkaz `background-queue:consume` má také volitelný parametr `-l`, kterým konzumerovi nastavíme příslušný label. Při restartu konzumerů pomocí `background-queue:reload-consumers` pak máme možnost temito labely zvolit, které konzumery chceme restartovat.

```
$priority = null; // aplikuje se priorita 10 z nastavení pro callback
if ($isNewsletter) {
Expand Down
2 changes: 1 addition & 1 deletion src/Broker/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

interface Consumer
{
public function consume(string $queue, array $priorities): void;
public function consume(string $queue, array $priorities, ?string $consumerLabel = null): void;
}
6 changes: 3 additions & 3 deletions src/Broker/PhpAmqpLib/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace ADT\BackgroundQueue\Broker\PhpAmqpLib;

use ADT\BackgroundQueue\BackgroundQueue;
use ADT\BackgroundQueue\Console\ReloadConsumersCommand;
use Exception;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
use PhpAmqpLib\Message\AMQPMessage;
Expand All @@ -21,13 +22,12 @@ public function __construct(Manager $manager, BackgroundQueue $backgroundQueue)
/**
* @throws Exception
*/
public function consume(string $queue, array $priorities): void
public function consume(string $queue, array $priorities, ?string $consumerLabel = null): void
{
// TODO Do budoucna cheme podporovat libovolné priority a ne pouze jejich výčet.
// Zde si musíme vytáhnout seznam existujících front. To lze přes HTTP API pomocí CURL.

// Nejprve se chceme kouknout, jestli není zaslána zpráva k ukončení, proto na první místo dáme TOP_PRIORITY frontu.
array_unshift($priorities, Manager::QUEUE_TOP_PRIORITY);
$priorities = $this->manager->includeTopPriority($priorities, $consumerLabel);

// Sestavíme si seznam názvů front v RabbitMQ (tedy včetně priorit) a všechny inicializujeme
$queuesWithPriorities = [];
Expand Down
33 changes: 29 additions & 4 deletions src/Broker/PhpAmqpLib/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
class Manager
{
const QUEUE_TOP_PRIORITY = 0;
const QUEUE_NAME_PARTS_DELIMITER = '_';

private array $connectionParams;
private array $queueParams;
Expand Down Expand Up @@ -141,14 +142,38 @@ public function setupQos()
$this->initQos = true;
}

public function getQueueWithPriority(string $queue, int $priority): string
public function getQueueWithPriority(string $queue, string $priority): string
{
return $queue . '_' . $priority;
if (strpos($queue, self::QUEUE_NAME_PARTS_DELIMITER) !== false) {
throw new \Exception('Priority cannot contains "' . self::QUEUE_NAME_PARTS_DELIMITER . '".');
}
return $queue . self::QUEUE_NAME_PARTS_DELIMITER . $priority;
}

public function includeTopPriority(array $priorities, ?string $label = null): array
{
array_unshift($priorities, $this->getTopPriorityName($label));
return $priorities;
}

public function getTopPriorityName(?string $label = null): string
{
$topPriority = self::QUEUE_TOP_PRIORITY;
if (!is_null($label)) {
if (strpos($label, self::QUEUE_NAME_PARTS_DELIMITER) !== false) {
throw new \Exception('Label cannot contains "' . self::QUEUE_NAME_PARTS_DELIMITER . '".');
}

$topPriority .= self::QUEUE_NAME_PARTS_DELIMITER . $label;
}

return $topPriority;
}


public function parseQueueAndPriority(string $queueWithPriority): array
{
$parts = explode('_', $queueWithPriority);
$parts = explode(self::QUEUE_NAME_PARTS_DELIMITER, $queueWithPriority);

if (count($parts) === 2) {
return [$parts[0], $parts[1]];
Expand All @@ -157,7 +182,7 @@ public function parseQueueAndPriority(string $queueWithPriority): array
while (true) {
$part = array_shift($parts);
if (is_numeric($part)) {
return [implode('_', $nameParts), $part];
return [implode(self::QUEUE_NAME_PARTS_DELIMITER, $nameParts), $part];
} else {
$nameParts[] = $part;
}
Expand Down
6 changes: 3 additions & 3 deletions src/Broker/PhpAmqpLib/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public function __construct( Manager $manager)
$this->manager = $manager;
}

public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void
public function publish(string $id, string $queue, string $priority, ?int $expiration = null): void
{
$queue = $this->manager->getQueueWithPriority($queue, $priority);
$exchange = $queue;
Expand Down Expand Up @@ -46,9 +46,9 @@ public function publish(string $id, string $queue, int $priority, ?int $expirati

}

public function publishDie(string $queue): void
public function publishDie(string $queue, ?string $consumerLabel = null): void
{
$this->publish(self::DIE, $queue, Manager::QUEUE_TOP_PRIORITY);
$this->publish(self::DIE, $queue, $this->manager->getTopPriorityName($consumerLabel));
}

private function createMessage(string $body): AMQPMessage
Expand Down
4 changes: 2 additions & 2 deletions src/Broker/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

interface Producer
{
public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void;
public function publishDie(string $queue): void;
public function publish(string $id, string $queue, string $priority, ?int $expiration = null): void;
public function publishDie(string $queue, ?string $consumerLabel = null): void;
}
4 changes: 3 additions & 1 deletion src/Console/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ protected function configure()
$this->addArgument('queue', InputArgument::REQUIRED);
$this->addOption('jobs', 'j', InputOption::VALUE_REQUIRED, 'Number of jobs consumed by one consumer in one process', 1);
$this->addOption('priorities', 'p', InputOption::VALUE_REQUIRED, 'Priorities for consume (e.g. 10, 20-40, 25-, -20)');
$this->addOption('label', 'l', InputOption::VALUE_OPTIONAL, 'Consumer label for restart it by reload command');
$this->setDescription('Start consumer.');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$jobs = $input->getOption('jobs');
$priorities = $this->getPrioritiesListBasedConfig($input->getOption('priorities'));
$label = $input->getOption('label');

if (!is_numeric($jobs)) {
$output->writeln("<error>Option --jobs has to be integer</error>");
Expand All @@ -43,7 +45,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

for ($i = 0; $i < (int)$jobs; $i++) {
$this->backgroundQueue->dieIfNecessary();
$this->consumer->consume($input->getArgument('queue'), $priorities);
$this->consumer->consume($input->getArgument('queue'), $priorities, $label);
}

return 0;
Expand Down
19 changes: 13 additions & 6 deletions src/Console/ReloadConsumersCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,24 @@ protected function configure()
'A queue whose consumers are to reload.'
);
$this->addArgument(
"number",
InputArgument::REQUIRED,
'Number of consumers to reload.'
"consumers-labels",
InputArgument::OPTIONAL,
'Labels of consumers to restart separated by comma.'
);
$this->setDescription('Creates the specified number of noop messages to reload consumers consuming specified queue.');
$this->setDescription('Restart specified consumers by lables on specified queue.');
}

protected function executeCommand(InputInterface $input, OutputInterface $output): int
{
for ($i = 0; $i < $input->getArgument("number"); $i++) {
$this->producer->publishDie($input->getArgument("queue"));
$consumersLabels = $input->getArgument("consumers-labels");
if ($consumersLabels) {
$consumersLabels = explode(',', $consumersLabels);
} else {
$consumersLabels = [null];
}

foreach ($consumersLabels as $consumerLabel) {
$this->producer->publishDie($input->getArgument("queue"), $consumerLabel);
}

return 0;
Expand Down