From 7f49f6498c2910fd8b25b1cd125e6a3c921fcdd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 11:37:41 +0100 Subject: [PATCH 01/27] Refactors background queue processing This commit refactors the background queue processing to improve reliability and efficiency. It introduces a dedicated `processJob` method and incorporates broker publishing within the main processing loop. It also adds a mechanism to handle waiting jobs and ensures proper prioritization. Additionally, the changes include database connection checks, schema updates, and the removal of redundant code. --- src/BackgroundQueue.php | 792 ++++++++++++---------- src/Broker/PhpAmqpLib/Consumer.php | 5 +- src/Console/ClearFinishedCommand.php | 35 +- src/Console/Command.php | 2 +- src/Console/ConsumeCommand.php | 35 +- src/Console/ProcessCommand.php | 57 +- src/Console/ReloadConsumersCommand.php | 14 +- src/Console/UpdateSchemaCommand.php | 24 +- src/Entity/Enums/CallbackNameEnum.php | 8 + src/Exception/JobNotFoundException.php | 7 + tests/Integration/BackgroundQueueTest.php | 2 +- 11 files changed, 483 insertions(+), 498 deletions(-) create mode 100644 src/Entity/Enums/CallbackNameEnum.php create mode 100644 src/Exception/JobNotFoundException.php diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 0fa09f6..656e933 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -4,11 +4,13 @@ use ADT\BackgroundQueue\Broker\Producer; use ADT\BackgroundQueue\Entity\BackgroundJob; +use ADT\BackgroundQueue\Entity\Enums\CallbackNameEnum; use ADT\BackgroundQueue\Exception\DieException; +use ADT\BackgroundQueue\Exception\JobNotFoundException; use ADT\BackgroundQueue\Exception\PermanentErrorException; use ADT\BackgroundQueue\Exception\SkipException; use ADT\BackgroundQueue\Exception\WaitingException; -use ADT\Utils\FileSystem; +use DateTime; use DateTimeInterface; use Doctrine\DBAL\Connection; use Doctrine\DBAL\DriverManager; @@ -36,7 +38,7 @@ class BackgroundQueue private LoggerInterface $logger; private ?Producer $producer = null; private int $bulkSize = 1; - private array $bulkBrokerCallbacks = []; + private array $entitiesToPublish = []; private array $bulkDatabaseEntities = []; private bool $shouldDie = false; @@ -78,6 +80,13 @@ public function __construct(array $config) if (!in_array($config['parametersFormat'], BackgroundJob::PARAMETERS_FORMATS, true)) { throw new Exception('Unsupported parameters format: ' . $config['parametersFormat']); } + if ($config['producer']) { + $config['callbacks'][CallbackNameEnum::PROCESS_WAITING_JOBS->value] = [ + 'callback' => [$this, trim(CallbackNameEnum::PROCESS_WAITING_JOBS->value, '_')], + 'queue' => null, + 'priority' => null + ]; + } $this->config = $config; $this->connection = $config['connection']; @@ -87,60 +96,6 @@ public function __construct(array $config) } } - /** - * Bezpečně ověříme, že nedošlo ke ztrátě spojení k DB. - * Pokud ano, připojíme se znovu. - * @throws \Doctrine\DBAL\Exception - */ - private function databaseConnectionCheckAndReconnect(): void - { - $warningHandler = function($errno, $errstr) { - $this->logger->log('critical', new Exception('BackgroundQueue - database connection lost (warning): code(' . $errno . ') ' . $errstr, 0)); - $this->connection->close(); - $this->connection->getNativeConnection(); - }; - - set_error_handler($warningHandler, E_WARNING); - - try { - if (!$this->databasePing()) { - $this->connection->close(); - $this->connection->getNativeConnection(); - } - } catch (Exception $e) { - $this->logger->log('critical', new Exception('BackgroundQueue - database connection lost (exception): ' . $e->getMessage(), 0, $e)); - $this->connection->close(); - $this->connection->getNativeConnection(); - } finally { - restore_error_handler(); - } - } - - /** - * @throws Exception - */ - private function databasePing(): bool - { - set_error_handler(function ($severity, $message) { - throw new PDOException($message, $severity); - }); - - try { - $this->connection->executeQuery($this->connection->getDatabasePlatform()->getDummySelectSQL()); - restore_error_handler(); - - return true; - - } catch (\Doctrine\DBAL\Exception) { - restore_error_handler(); - return false; - - } catch (Exception $e) { - restore_error_handler(); - throw $e; - } - } - /** * @throws Exception|\Doctrine\DBAL\Exception */ @@ -158,7 +113,7 @@ public function publish( throw new Exception('The job does not have the required parameter "callbackName" set.'); } - if (!isset($this->config['callbacks'][$callbackName])) { + if (!$this->getCallback($callbackName)) { throw new Exception('Callback "' . $callbackName . '" does not exist.'); } @@ -169,7 +124,7 @@ public function publish( $priority = $this->getPriority($priority, $callbackName); $entity = new BackgroundJob(); - $entity->setQueue($this->config['queue']); + $entity->setQueue($this->config['callbacks'][$callbackName]['queue'] ?? $this->config['queue']); $entity->setPriority($priority); $entity->setCallbackName($callbackName); $entity->setParameters($parameters, $this->config['parametersFormat']); @@ -184,6 +139,7 @@ public function publish( /** * @throws Exception + * @throws \Doctrine\DBAL\Exception * @internal */ public function publishToBroker(BackgroundJob $entity): void @@ -192,33 +148,77 @@ public function publishToBroker(BackgroundJob $entity): void return; } - $this->bulkBrokerCallbacks[] = function () use ($entity) { + $this->entitiesToPublish[] = $entity; + + if (!$this->connection->isTransactionActive() && count($this->entitiesToPublish) >= $this->bulkSize) { + $this->doPublishToBroker(); + } + } + + /** + * @throws \Doctrine\DBAL\Exception + * @internal + */ + public function doPublishToBroker(): void + { + foreach ($this->entitiesToPublish as $_entity) { try { - // Pokud mám u callbacku nastavenou frontu, v DB zůstává původní, ale do brokera použiju tu z konfigu. - // Tedy řadím do různých front pro různé consumery, ale z pohledu záznamů v DB se jedná o stejnou frontu. - $this->producer->publish((string)$entity->getId(), $this->getQueueForEntityIncludeCallback($entity), $entity->getPriority(), $entity->getPostponedBy()); + $this->producer->publish((string)$_entity->getId(), $_entity->getQueue(), $_entity->getPriority(), $_entity->getPostponedBy()); } catch (Exception $e) { - $entity->setState(BackgroundJob::STATE_BROKER_FAILED) + $_entity->setState(BackgroundJob::STATE_BROKER_FAILED) ->setErrorMessage($e->getMessage()); - $this->save($entity); + $this->save($_entity); - $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $e); + $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $_entity, $e); } - }; - if (!$this->connection->isTransactionActive() && count($this->bulkBrokerCallbacks) >= $this->bulkSize) { - $this->doPublishToBroker(); } + $this->entitiesToPublish = []; } /** - * @internal + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception */ - public function doPublishToBroker(): void + public function process(): void { - foreach ($this->bulkBrokerCallbacks as $_closure) { - $_closure(); + $states = BackgroundJob::READY_TO_PROCESS_STATES; + if ($this->getConfig()['producer']) { + unset ($states[BackgroundJob::STATE_READY]); + unset ($states[BackgroundJob::STATE_TEMPORARILY_FAILED]); + unset ($states[BackgroundJob::STATE_WAITING]); + + if (!$this->getUnfinishedJobIdentifiers([CallbackNameEnum::PROCESS_WAITING_JOBS->value])) { + $this->publish(CallbackNameEnum::PROCESS_WAITING_JOBS->value, identifier: CallbackNameEnum::PROCESS_WAITING_JOBS->value); + } + + } else { + // Nemáme producera + + unset ($states[BackgroundJob::STATE_BACK_TO_BROKER]); + } + + $qb = $this->createQueryBuilder() + ->andWhere('state IN (:state)') + ->setParameter('state', $states); + + /** @var BackgroundJob $_entity */ + foreach ($this->fetchAll($qb) as $_entity) { + if ( + $this->getConfig()['producer'] + && + $_entity->getState() !== BackgroundJob::STATE_BROKER_FAILED + ) { + $_entity->setState(BackgroundJob::STATE_READY); + $this->save($_entity); + $this->publishToBroker($_entity); + } else { + if (!$_entity->getProcessedByBroker() && $_entity->getAvailableFrom() > new DateTime()) { + continue; + } + $_entity->setProcessedByBroker(false); + $this->processJob($_entity->getId()); + } } - $this->bulkBrokerCallbacks = []; } /** @@ -227,16 +227,12 @@ public function doPublishToBroker(): void * @throws Exception * @internal */ - public function process(int|BackgroundJob $entity, string $queue, int $priority): void + public function processJob(int $id): void { // U publishera chceme transakci stejnou s flush, proto používáme stejné connection jako je v aplikaci. Ale u consumera chceme vlastní connection, aby když se revertne aplikační transakce, tak aby consumer mohl zapsat chybový stav k BackgroundJob. $this->createConnection(); - if (!$entity instanceof BackgroundJob) { - if (!$entity = $this->getEntity($entity, $queue, $priority)) { - return; - } - } + $entity = $this->getEntity($id); // Zpráva není ke zpracování v případě, že nemá stav READY nebo ERROR_REPEATABLE // Pokud při zpracování zprávy nastane chyba, zpráva zůstane ve stavu PROCESSING a consumer se ukončí. @@ -358,7 +354,7 @@ public function process(int|BackgroundJob $entity, string $queue, int $priority) $entity->setState($state) ->setErrorMessage($e ? $e->getMessage() : null); $this->save($entity); - if (in_array($state, [BackgroundJob::STATE_TEMPORARILY_FAILED, BackgroundJob::STATE_WAITING], true)) { + if (in_array($state, [BackgroundJob::STATE_TEMPORARILY_FAILED], true)) { $this->publishToBroker($entity); } @@ -376,6 +372,138 @@ public function process(int|BackgroundJob $entity, string $queue, int $priority) } } + public function getConfig(): array + { + return $this->config; + } + + public function startBulk(): void + { + $this->bulkSize = $this->config['bulkSize']; + } + + /** + * @throws \Doctrine\DBAL\Exception + * @throws SchemaException + */ + public function endBulk(): void + { + $this->doPublishToDatabase(); + $this->doPublishToBroker(); + $this->bulkSize = 1; + } + + /** + * @throws \Doctrine\DBAL\Exception + */ + public function clearFinishedJobs(?int $days = null): void + { + $qb = $this->createQueryBuilder() + ->delete($this->getConfig()['tableName']) + ->andWhere('state = :state') + ->setParameter('state', BackgroundJob::STATE_FINISHED); + + if ($days) { + $qb->andWhere('created_at <= :ago') + ->setParameter('ago', (new DateTime('midnight'))->modify('-' . $days . ' days')->format('Y-m-d H:i:s')); + } + + $qb->executeStatement(); + } + + /** + * @throws \Doctrine\DBAL\Exception + * @throws SchemaException* + * @throws Exception + * @internal + */ + public function updateSchema(): void + { + $schema = new Schema([], [], $this->createSchemaManager()->createSchemaConfig()); + + $table = $schema->createTable($this->config['tableName']); + + $table->addColumn('id', Types::BIGINT, ['unsigned' => true])->setAutoincrement(true)->setNotnull(true); + $table->addColumn('queue', Types::STRING, ['length' => 255])->setNotnull(true); + $table->addColumn('priority', Types::INTEGER)->setNotnull(false); + $table->addColumn('callback_name', Types::STRING, ['length' => 255])->setNotnull(true); + $table->addColumn('parameters', Types::BLOB)->setNotnull(false); + $table->addColumn('parameters_json', Types::JSON)->setNotnull(false); + $table->addColumn('state', Types::SMALLINT)->setNotnull(true); + $table->addColumn('created_at', Types::DATETIME_IMMUTABLE)->setNotnull(true); + $table->addColumn('last_attempt_at', Types::DATETIME_IMMUTABLE)->setNotnull(false); + $table->addColumn('number_of_attempts', Types::INTEGER)->setNotnull(true)->setDefault(0); + $table->addColumn('error_message', Types::TEXT)->setNotnull(false); + $table->addColumn('serial_group', Types::STRING, ['length' => 255])->setNotnull(false); + $table->addColumn('identifier', Types::STRING, ['length' => 255])->setNotnull(false); + $table->addColumn('is_unique', Types::BOOLEAN)->setNotnull(true)->setDefault(0); + $table->addColumn('postponed_by', Types::INTEGER)->setNotnull(false); + $table->addColumn('processed_by_broker', Types::BOOLEAN)->setNotnull(true)->setDefault(0); + $table->addColumn('execution_time', Types::INTEGER)->setNotnull(false); + $table->addColumn('finished_at', Types::DATETIME_IMMUTABLE)->setNotnull(false); + $table->addColumn('pid', Types::INTEGER)->setNotnull(false); + $table->addColumn('metadata', Types::JSON)->setNotnull(false); + $table->addColumn('memory', Types::JSON)->setNotnull(false); + + $table->setPrimaryKey(['id']); + $table->addIndex(['identifier']); + $table->addIndex(['state']); + + $schemaManager = $this->createSchemaManager(); + if ($schemaManager->tablesExist([$this->config['tableName']])) { + $tableDiff = $schemaManager->createComparator()->compareTables($this->createSchemaManager()->introspectTable($this->config['tableName']), $table); + $sqls = $this->connection->getDatabasePlatform()->getAlterTableSQL($tableDiff); + } else { + $sqls = $this->connection->getDatabasePlatform()->getCreateTableSQL($table); + } + foreach ($sqls as $sql) { + $this->executeStatement($sql); + } + } + + public function dieIfNecessary(): void + { + if ($this->shouldDie) { + die(); + } + } + + public static function parseDsn($dsn): array + { + // Parse the DSN string + $parsedDsn = parse_url($dsn); + + if ($parsedDsn === false || !isset($parsedDsn['scheme'], $parsedDsn['host'], $parsedDsn['path'])) { + throw new RuntimeException("Invalid DSN: " . $dsn); + } + + // Convert DSN scheme to Doctrine DBAL driver name + $driversMap = [ + 'mysql' => 'pdo_mysql', + 'pgsql' => 'pdo_pgsql', + 'sqlsrv' => 'pdo_sqlsrv', + ]; + + if (!isset($driversMap[$parsedDsn['scheme']])) { + throw new RuntimeException("Unknown DSN scheme: " . $parsedDsn['scheme']); + } + + // Convert DSN components to Doctrine DBAL connection parameters + $dbParams = [ + 'driver' => $driversMap[$parsedDsn['scheme']], + 'user' => $parsedDsn['user'] ?? null, + 'password' => $parsedDsn['pass'] ?? null, + 'host' => $parsedDsn['host'], + 'dbname' => ltrim($parsedDsn['path'], '/'), + ]; + + if (isset($parsedDsn['port'])) { + $dbParams['port'] = $parsedDsn['port']; + } + + return $dbParams; + } + private function createConnection(): void { if (!$this->connectionCreated) { @@ -388,7 +516,7 @@ private function createConnection(): void * @throws Exception * @throws \Doctrine\DBAL\Exception */ - public function getUnfinishedJobIdentifiers(array $identifiers = [], bool $excludeProcessing = false): array + private function getUnfinishedJobIdentifiers(array $identifiers = [], bool $excludeProcessing = false): array { $qb = $this->createQueryBuilder(); @@ -417,152 +545,265 @@ public function getUnfinishedJobIdentifiers(array $identifiers = [], bool $exclu return $unfinishedJobIdentifiers; } - public function getConfig(): array + /** + * @throws Exception|\Doctrine\DBAL\Exception + */ + private function fetch(QueryBuilder $qb): BackgroundJob { - return $this->config; - } + if (!$entities = $this->fetchAll($qb, 1)) { + throw new JobNotFoundException(); + } + return $entities[0]; + } /** - * @internal + * @throws Exception * @throws \Doctrine\DBAL\Exception - * @throws SchemaException */ - public function createQueryBuilder(): QueryBuilder + private function isRedundant(BackgroundJob $entity): bool { - return $this->connection->createQueryBuilder() - ->select('*') - ->from($this->config['tableName']) - ->andWhere('queue = :queue') - ->setParameter('queue', $this->config['queue']); + if (!$entity->isUnique()) { + return false; + } + + $qb = $this->createQueryBuilder(); + + $qb->andWhere('identifier = :identifier') + ->setParameter('identifier', $entity->getIdentifier()); + + $qb->andWhere('id < :id') + ->setParameter('id', $entity->getId()); + + return (bool) $this->fetch($qb); } /** - * @throws Exception|\Doctrine\DBAL\Exception - * @internal + * @throws \Doctrine\DBAL\Exception + * @throws SchemaException */ - public function fetchAll(QueryBuilder $qb, ?int $maxResults = null, $toEntity = true): array + private function getPreviousUnfinishedJobId(BackgroundJob $entity): ?int { - $sql = $qb->setMaxResults($maxResults)->getSQL(); - $parameters = $qb->getParameters(); - foreach ($parameters as $key => &$_value) { - if (is_array($_value)) { - $sql = str_replace(':' . $key, self::bindParamArray($key, $_value, $parameters), $sql); - } elseif ($_value instanceof DateTimeInterface) { - $_value->format('Y-m-d H:i:s'); - } + foreach ($this->findOldestUnfinishedJobIdsByGroup($entity) as $id) { + return $id; } + return null; + } - $entities = []; - foreach ($this->fetchAllAssociative($sql, $parameters) as $_row) { - $entities[] = $toEntity ? BackgroundJob::createEntity($_row) : $_row; + /** + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception + */ + private function findOldestUnfinishedJobIdsByGroup(?BackgroundJob $entity = null): iterable + { + $qb = $this->createQueryBuilder(); + + $qb->select('MIN(id) as id'); + + $qb->andWhere('state NOT IN (:state)') + ->setParameter('state', BackgroundJob::FINISHED_STATES); + + if ($entity) { + $qb->andWhere('serial_group = :serialGroup') + ->setParameter('serialGroup', $entity->getSerialGroup()); + + $qb->andWhere('id < :id') + ->setParameter('id', $entity->getId()); } - return $entities; + $qb->groupBy('serial_group'); + + $qb->orderBy('id', 'ASC'); + + foreach ($this->fetchAll($qb, toEntity: false) as $row) { + yield $row['id']; + } + + return []; } - public function startBulk(): void + /** + * @throws \Doctrine\DBAL\Exception + */ + private function save(BackgroundJob $entity): void { - $this->bulkSize = $this->config['bulkSize']; + $this->databaseConnectionCheckAndReconnect(); + + if (!$entity->getId()) { + if ($this->producer) { + $entity->setProcessedByBroker(true); + } + + $this->bulkDatabaseEntities[] = $entity; + + if (count($this->bulkDatabaseEntities) >= $this->bulkSize) { + $this->doPublishToDatabase(); + } + } else { + $this->connection->update($this->config['tableName'], $entity->getDatabaseValues(), ['id' => $entity->getId()]); + } } /** * @throws \Doctrine\DBAL\Exception - * @throws SchemaException */ - public function endBulk(): void + private function createSchemaManager(): AbstractSchemaManager { - $this->doPublishToDatabase(); - $this->doPublishToBroker(); - $this->bulkSize = 1; + return method_exists($this->connection, 'createSchemaManager') + ? $this->connection->createSchemaManager() + : $this->connection->getSchemaManager(); } /** - * @throws Exception|\Doctrine\DBAL\Exception + * @throws \Doctrine\DBAL\Exception */ - private function fetch(QueryBuilder $qb): ?BackgroundJob + private function executeStatement(string $sql): void { - return ($entities = $this->fetchAll($qb, 1)) ? $entities[0]: null; + method_exists($this->connection, 'executeStatement') + ? $this->connection->executeStatement($sql) + : $this->connection->exec($sql); } /** - * @throws Exception|\Doctrine\DBAL\Exception + * @throws \Doctrine\DBAL\Exception */ - private function count(QueryBuilder $qb): int + private function fetchAllAssociative(string $sql, array $parameters): array { - return count($this->fetchAll($qb, 1, false)); + return method_exists($this->connection, 'fetchAllAssociative') + ? $this->connection->fetchAllAssociative($sql, $parameters) + : $this->connection->fetchAll($sql, $parameters); + } + + private static function getPostponement(int $numberOfAttempts): int + { + return min(16, 2 ** ($numberOfAttempts -1)) * 1000 * 60; } /** - * @throws Exception + * @throws SchemaException * @throws \Doctrine\DBAL\Exception + * @throws Exception */ - private function isRedundant(BackgroundJob $entity): bool + private function getEntity(int $id): BackgroundJob { - if (!$entity->isUnique()) { - return false; - } - - $qb = $this->createQueryBuilder(); + $this->databaseConnectionCheckAndReconnect(); - $qb->andWhere('identifier = :identifier') - ->setParameter('identifier', $entity->getIdentifier()); + $qb = $this->createQueryBuilder() + ->andWhere('id = :id') + ->setParameter('id', $id); - $qb->andWhere('id < :id') - ->setParameter('id', $entity->getId()); + if (!$entity = $this->fetch($qb)) { + throw new JobNotFoundException('Job ' . $id . ' not found.'); + } - return (bool) $this->fetch($qb); + return $entity; } /** * @throws SchemaException * @throws \Doctrine\DBAL\Exception */ - private function getPreviousUnfinishedJob(BackgroundJob $entity): ?BackgroundJob + private function checkUnfinishedJobs(BackgroundJob $entity): bool { if (!$entity->getSerialGroup()) { - return null; + return true; } - $qb = $this->createQueryBuilder(); + if ($previousEntityId = $this->getPreviousUnfinishedJobId($entity)) { + try { + $entity->setState(BackgroundJob::STATE_WAITING); + $entity->setErrorMessage('Waiting for job ID ' . $previousEntityId); + $entity->setPostponedBy($this->config['waitingJobExpiration']); + $this->save($entity); + } catch (Exception $e) { + $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $e); + } + return false; + } - $qb->andWhere('state NOT IN (:state)') - ->setParameter('state', BackgroundJob::FINISHED_STATES); + return true; + } - $qb->andWhere('serial_group = :serial_group') - ->setParameter('serial_group', $entity->getSerialGroup()); + private function getPriority(?int $priority, string $callbackName): int + { + if (is_null($priority)) { + $priority = $this->config['callbacks'][$callbackName]['priority'] ?? array_values($this->config['priorities'])[0]; + } - if ($entity->getId()) { - $qb->andWhere('id < :id') - ->setParameter('id', $entity->getId()); + if (!in_array($priority, $this->config['priorities'])) { + throw new InvalidArgumentException("Priority $priority for callback $callbackName is not in available priorities: " . implode(',', $this->config['priorities'])); } - $qb->orderBy('id'); + return $priority; + } - return $this->fetch($qb); + private function getCallback($callback): callable + { + return $this->config['callbacks'][$callback]['callback'] ?? $this->config['callbacks'][$callback]; + } + + private function getMemories(): array + { + return [ + 'notRealActual' => memory_get_usage(), + 'realActual' => memory_get_usage(true), + 'notRealPeak' => memory_get_peak_usage(), + 'realPeak' => memory_get_peak_usage(true), + ]; } /** - * @internal + * @throws SchemaException * @throws \Doctrine\DBAL\Exception + * @throws Exception */ - public function save(BackgroundJob $entity): void + private function processWaitingJobs(): void { - $this->databaseConnectionCheckAndReconnect(); + foreach ($this->findOldestUnfinishedJobIdsByGroup() as $id) { + $_entity = $this->getEntity($id); - if (!$entity->getId()) { - if ($this->producer) { - $entity->setProcessedByBroker(true); + if ($_entity->getState() === BackgroundJob::STATE_PROCESSING) { + continue; } - $this->bulkDatabaseEntities[] = $entity; + $this->publishToBroker($_entity); + } - if (count($this->bulkDatabaseEntities) >= $this->bulkSize) { - $this->doPublishToDatabase(); + if (!$this->getUnfinishedJobIdentifiers([CallbackNameEnum::PROCESS_WAITING_JOBS->value], excludeProcessing: true)) { + $this->publish(CallbackNameEnum::PROCESS_WAITING_JOBS->value, identifier: CallbackNameEnum::PROCESS_WAITING_JOBS->value); + } + } + + private function createQueryBuilder(): QueryBuilder + { + return $this->connection->createQueryBuilder() + ->select('*') + ->from($this->config['tableName']) + ->andWhere('queue LIKE :queue') + ->setParameter('queue', $this->config['queue'] . '%'); + } + + /** + * @throws Exception|\Doctrine\DBAL\Exception + */ + private function fetchAll(QueryBuilder $qb, ?int $maxResults = null, $toEntity = true): array + { + $sql = $qb->setMaxResults($maxResults)->getSQL(); + $parameters = $qb->getParameters(); + foreach ($parameters as $key => &$_value) { + if (is_array($_value)) { + $sql = str_replace(':' . $key, self::bindParamArray($key, $_value, $parameters), $sql); + } elseif ($_value instanceof DateTimeInterface) { + $_value->format('Y-m-d H:i:s'); } - } else { - $this->connection->update($this->config['tableName'], $entity->getDatabaseValues(), ['id' => $entity->getId()]); } + + $entities = []; + foreach ($this->fetchAllAssociative($sql, $parameters) as $_row) { + $entities[] = $toEntity ? BackgroundJob::createEntity($_row) : $_row; + } + + return $entities; } /** @@ -661,231 +902,56 @@ private static function bindParamArray(string $prefix, array $values, array &$bi } /** - * @throws \Doctrine\DBAL\Exception - * @throws SchemaException* - * @throws Exception - * @internal - */ - public function updateSchema(): void - { - $schema = new Schema([], [], $this->createSchemaManager()->createSchemaConfig()); - - $table = $schema->createTable($this->config['tableName']); - - $table->addColumn('id', Types::BIGINT, ['unsigned' => true])->setAutoincrement(true)->setNotnull(true); - $table->addColumn('queue', Types::STRING, ['length' => 255])->setNotnull(true); - $table->addColumn('priority', Types::INTEGER)->setNotnull(false); - $table->addColumn('callback_name', Types::STRING, ['length' => 255])->setNotnull(true); - $table->addColumn('parameters', Types::BLOB)->setNotnull(false); - $table->addColumn('parameters_json', Types::JSON)->setNotnull(false); - $table->addColumn('state', Types::SMALLINT)->setNotnull(true); - $table->addColumn('created_at', Types::DATETIME_IMMUTABLE)->setNotnull(true); - $table->addColumn('last_attempt_at', Types::DATETIME_IMMUTABLE)->setNotnull(false); - $table->addColumn('number_of_attempts', Types::INTEGER)->setNotnull(true)->setDefault(0); - $table->addColumn('error_message', Types::TEXT)->setNotnull(false); - $table->addColumn('serial_group', Types::STRING, ['length' => 255])->setNotnull(false); - $table->addColumn('identifier', Types::STRING, ['length' => 255])->setNotnull(false); - $table->addColumn('is_unique', Types::BOOLEAN)->setNotnull(true)->setDefault(0); - $table->addColumn('postponed_by', Types::INTEGER)->setNotnull(false); - $table->addColumn('processed_by_broker', Types::BOOLEAN)->setNotnull(true)->setDefault(0); - $table->addColumn('execution_time', Types::INTEGER)->setNotnull(false); - $table->addColumn('finished_at', Types::DATETIME_IMMUTABLE)->setNotnull(false); - $table->addColumn('pid', Types::INTEGER)->setNotnull(false); - $table->addColumn('metadata', Types::JSON)->setNotnull(false); - $table->addColumn('memory', Types::JSON)->setNotnull(false); - - $table->setPrimaryKey(['id']); - $table->addIndex(['identifier']); - $table->addIndex(['state']); - - $schemaManager = $this->createSchemaManager(); - if ($schemaManager->tablesExist([$this->config['tableName']])) { - $tableDiff = $schemaManager->createComparator()->compareTables($this->createSchemaManager()->introspectTable($this->config['tableName']), $table); - $sqls = $this->connection->getDatabasePlatform()->getAlterTableSQL($tableDiff); - } else { - $sqls = $this->connection->getDatabasePlatform()->getCreateTableSQL($table); - } - foreach ($sqls as $sql) { - $this->executeStatement($sql); - } - } - - /** - * @throws \Doctrine\DBAL\Exception - */ - private function createSchemaManager(): AbstractSchemaManager - { - return method_exists($this->connection, 'createSchemaManager') - ? $this->connection->createSchemaManager() - : $this->connection->getSchemaManager(); - } - - /** - * @throws \Doctrine\DBAL\Exception - */ - private function executeStatement(string $sql): void - { - method_exists($this->connection, 'executeStatement') - ? $this->connection->executeStatement($sql) - : $this->connection->exec($sql); - } - - /** + * Bezpečně ověříme, že nedošlo ke ztrátě spojení k DB. + * Pokud ano, připojíme se znovu. * @throws \Doctrine\DBAL\Exception */ - private function fetchAllAssociative(string $sql, array $parameters): array - { - return method_exists($this->connection, 'fetchAllAssociative') - ? $this->connection->fetchAllAssociative($sql, $parameters) - : $this->connection->fetchAll($sql, $parameters); - } - - private static function getPostponement(int $numberOfAttempts): int - { - return min(16, 2 ** ($numberOfAttempts -1)) * 1000 * 60; - } - - public static function parseDsn($dsn): array + private function databaseConnectionCheckAndReconnect(): void { - // Parse the DSN string - $parsedDsn = parse_url($dsn); - - if ($parsedDsn === false || !isset($parsedDsn['scheme'], $parsedDsn['host'], $parsedDsn['path'])) { - throw new RuntimeException("Invalid DSN: " . $dsn); - } - - // Convert DSN scheme to Doctrine DBAL driver name - $driversMap = [ - 'mysql' => 'pdo_mysql', - 'pgsql' => 'pdo_pgsql', - 'sqlsrv' => 'pdo_sqlsrv', - ]; - - if (!isset($driversMap[$parsedDsn['scheme']])) { - throw new RuntimeException("Unknown DSN scheme: " . $parsedDsn['scheme']); - } + $warningHandler = function($errno, $errstr) { + $this->logger->log('critical', new Exception('BackgroundQueue - database connection lost (warning): code(' . $errno . ') ' . $errstr, 0)); + $this->connection->close(); + $this->connection->getNativeConnection(); + }; - // Convert DSN components to Doctrine DBAL connection parameters - $dbParams = [ - 'driver' => $driversMap[$parsedDsn['scheme']], - 'user' => $parsedDsn['user'] ?? null, - 'password' => $parsedDsn['pass'] ?? null, - 'host' => $parsedDsn['host'], - 'dbname' => ltrim($parsedDsn['path'], '/'), - ]; + set_error_handler($warningHandler, E_WARNING); - if (isset($parsedDsn['port'])) { - $dbParams['port'] = $parsedDsn['port']; + try { + if (!$this->databasePing()) { + $this->connection->close(); + $this->connection->getNativeConnection(); + } + } catch (Exception $e) { + $this->logger->log('critical', new Exception('BackgroundQueue - database connection lost (exception): ' . $e->getMessage(), 0, $e)); + $this->connection->close(); + $this->connection->getNativeConnection(); + } finally { + restore_error_handler(); } - - return $dbParams; } /** - * @throws SchemaException - * @throws \Doctrine\DBAL\Exception * @throws Exception */ - private function getEntity(int $id, string $queue, int $priority): ?BackgroundJob + private function databasePing(): bool { - $this->databaseConnectionCheckAndReconnect(); - - $entity = $this->fetch( - $this->createQueryBuilder() - ->andWhere('id = :id') - ->setParameter('id', $id) - ); + set_error_handler(function ($severity, $message) { + throw new PDOException($message, $severity); + }); - if (!$entity) { - if ($this->producer) { - // pokud je to rabbit fungujici v clusteru na vice serverech, - // tak jeste nemusi byt syncnuta master master databaze - - // coz si overime tak, ze v db neexistuje vetsi id - if (!$this->count($this->createQueryBuilder()->select('id')->andWhere('id > :id')->setParameter('id', $id))) { - // pridame znovu do queue - try { - // Zde máme $queue a $priority, ze kterých bylo vytaženo z brokera, tedy dáváme znovu do té stejné fronty a priority - $this->producer->publish((string) $id, $queue, $priority, $this->config['waitingJobExpiration']); - } catch (Exception $e) { - $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $e); - - $entity->setState(BackgroundJob::STATE_BROKER_FAILED); - $this->save($entity); - } - } else { - // zalogovat - $this->logException('Job "' . $id . '" not found.'); - } - } else { - // zalogovat - $this->logException('Job "' . $id . '" not found.'); - } - return null; - } + try { + $this->connection->executeQuery($this->connection->getDatabasePlatform()->getDummySelectSQL()); + restore_error_handler(); - return $entity; - } + return true; - /** - * @throws SchemaException - * @throws \Doctrine\DBAL\Exception - */ - private function checkUnfinishedJobs(BackgroundJob $entity): bool - { - if ($previousEntity = $this->getPreviousUnfinishedJob($entity)) { - try { - $entity->setState(BackgroundJob::STATE_WAITING); - $entity->setErrorMessage('Waiting for job ID ' . $previousEntity->getId()); - $entity->setPostponedBy($this->config['waitingJobExpiration']); - $this->save($entity); - $this->publishToBroker($entity); - } catch (Exception $e) { - $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $e); - } + } catch (\Doctrine\DBAL\Exception) { + restore_error_handler(); return false; - } - - return true; - } - - private function getPriority(?int $priority, string $callbackName): int - { - if (is_null($priority)) { - $priority = $this->config['callbacks'][$callbackName]['priority'] ?? array_values($this->config['priorities'])[0]; - } - - if (!in_array($priority, $this->config['priorities'])) { - throw new InvalidArgumentException("Priority $priority for callback $callbackName is not in available priorities: " . implode(',' , $this->config['priorities'])); - } - - return $priority; - } - - private function getCallback($callback): callable - { - return $this->config['callbacks'][$callback]['callback'] ?? $this->config['callbacks'][$callback]; - } - - public function getQueueForEntityIncludeCallback(BackgroundJob $entity): string - { - return $this->config['callbacks'][$entity->getCallbackName()]['queue'] ?? $entity->getQueue(); - } - private function getMemories(): array - { - return [ - 'notRealActual' => memory_get_usage(), - 'realActual' => memory_get_usage(true), - 'notRealPeak' => memory_get_peak_usage(), - 'realPeak' => memory_get_peak_usage(true), - ]; - } - - public function dieIfNecessary(): void - { - if ($this->shouldDie) { - die(); + } catch (Exception $e) { + restore_error_handler(); + throw $e; } } } diff --git a/src/Broker/PhpAmqpLib/Consumer.php b/src/Broker/PhpAmqpLib/Consumer.php index ead4d9a..bb66120 100644 --- a/src/Broker/PhpAmqpLib/Consumer.php +++ b/src/Broker/PhpAmqpLib/Consumer.php @@ -4,7 +4,6 @@ use ADT\BackgroundQueue\BackgroundQueue; use Exception; -use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Message\AMQPMessage; class Consumer implements \ADT\BackgroundQueue\Broker\Consumer @@ -56,9 +55,7 @@ public function consume(string $queue, array $priorities): void die(); } - $queuesWithPriority = $msg->getConsumerTag(); - list($queue, $priority) = $this->manager->parseQueueAndPriority($queuesWithPriority); - $this->backgroundQueue->process((int)$msg->getBody(), $queue, $priority); + $this->backgroundQueue->processJob((int)$msg->getBody()); }); } diff --git a/src/Console/ClearFinishedCommand.php b/src/Console/ClearFinishedCommand.php index 5101848..ae31326 100644 --- a/src/Console/ClearFinishedCommand.php +++ b/src/Console/ClearFinishedCommand.php @@ -3,62 +3,37 @@ namespace ADT\BackgroundQueue\Console; use ADT\BackgroundQueue\BackgroundQueue; -use ADT\BackgroundQueue\Entity\BackgroundJob; -use DateTime; use Doctrine\DBAL\Exception; -use Doctrine\DBAL\Schema\SchemaException; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Input\InputArgument; -#[AsCommand(name: 'background-queue:clear-finished')] +#[AsCommand(name: 'background-queue:clear-finished', description: 'Delete finished records.')] class ClearFinishedCommand extends Command { - protected static $defaultName = 'background-queue:clear-finished'; - - private BackgroundQueue $backgroundQueue; - - /** - * @throws Exception - */ - public function __construct(BackgroundQueue $backgroundQueue) + public function __construct(private readonly BackgroundQueue $backgroundQueue) { parent::__construct(); - $this->backgroundQueue = $backgroundQueue; } - protected function configure() + protected function configure(): void { - $this->setName('background-queue:clear-finished'); $this->addArgument( "days", InputArgument::OPTIONAL, 'Deletes finished records older than the specified number of days.', 1 ); - $this->setDescription('Delete finished records.'); } /** * @throws Exception - * @throws SchemaException - * @throws \Exception */ protected function executeCommand(InputInterface $input, OutputInterface $output): int { - $qb = $this->backgroundQueue->createQueryBuilder() - ->delete($this->backgroundQueue->getConfig()['tableName']) - ->andWhere('state = :state') - ->setParameter('state', BackgroundJob::STATE_FINISHED); - - if ($input->getArgument("days")) { - $qb->andWhere('created_at <= :ago') - ->setParameter('ago', (new DateTime('midnight'))->modify('-' . $input->getArgument("days") . ' days')->format('Y-m-d H:i:s')); - } - - $qb->executeStatement(); + $this->backgroundQueue->clearFinishedJobs($input->getArgument("days")); - return 0; + return self::SUCCESS; } } diff --git a/src/Console/Command.php b/src/Console/Command.php index 36fcd7c..98d4132 100644 --- a/src/Console/Command.php +++ b/src/Console/Command.php @@ -16,7 +16,7 @@ abstract class Command extends \Symfony\Component\Console\Command\Command abstract protected function executeCommand(InputInterface $input, OutputInterface $output): int; - public function setLocksDir(string $locksDir) + public function setLocksDir(string $locksDir): void { $this->locksDir = $locksDir; } diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index 6f54608..ba249f0 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -4,35 +4,33 @@ use ADT\BackgroundQueue\BackgroundQueue; use ADT\BackgroundQueue\Broker\Consumer; +use Exception; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -#[AsCommand(name: 'background-queue:consume')] +#[AsCommand(name: 'background-queue:consume', description: 'Start consumer.')] class ConsumeCommand extends \Symfony\Component\Console\Command\Command { - protected static $defaultName = 'background-queue:consume'; - private Consumer $consumer; - private BackgroundQueue $backgroundQueue; - - public function __construct(Consumer $consumer, BackgroundQueue $backgroundQueue) - { + public function __construct( + private readonly Consumer $consumer, + private readonly BackgroundQueue $backgroundQueue + ) { parent::__construct(); - $this->consumer = $consumer; - $this->backgroundQueue = $backgroundQueue; } - protected function configure() + protected function configure(): void { - $this->setName('background-queue:consume'); $this->addArgument('queue', InputArgument::REQUIRED); $this->addOption('jobs', 'j', InputOption::VALUE_REQUIRED, 'Number of jobs consumed by one consumer in one process', 1); $this->addOption('priorities', 'p', InputOption::VALUE_REQUIRED, 'Priorities for consume (e.g. 10, 20-40, 25-, -20)'); - $this->setDescription('Start consumer.'); } + /** + * @throws Exception + */ protected function execute(InputInterface $input, OutputInterface $output): int { $jobs = $input->getOption('jobs'); @@ -40,7 +38,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int if (!is_numeric($jobs)) { $output->writeln("Option --jobs has to be integer"); - return 1; + return self::FAILURE; } for ($i = 0; $i < (int)$jobs; $i++) { @@ -48,9 +46,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int $this->consumer->consume($input->getArgument('queue'), $priorities); } - return 0; + return self::SUCCESS; } + /** + * @throws Exception + */ private function getPrioritiesListBasedConfig(?string $prioritiesText = null): array { $prioritiesAvailable = $this->backgroundQueue->getConfig()['priorities']; @@ -59,10 +60,10 @@ private function getPrioritiesListBasedConfig(?string $prioritiesText = null): a return $prioritiesAvailable; } - if (strpos($prioritiesText, '-') === false) { + if (!str_contains($prioritiesText, '-')) { $priority = (int)$prioritiesText; if (!in_array($priority, $prioritiesAvailable)) { - throw new \Exception("Priority $priority is not in available priorities [" . implode(',', $prioritiesAvailable) . "]"); + throw new Exception("Priority $priority is not in available priorities [" . implode(',', $prioritiesAvailable) . "]"); } return [$priority]; } @@ -83,7 +84,7 @@ private function getPrioritiesListBasedConfig(?string $prioritiesText = null): a } if (!count($priorities)) { - throw new \Exception("Priority $prioritiesText has not intersections with availables priorities [" . implode(',', $prioritiesAvailable) . "]"); + throw new Exception("Priority $prioritiesText has not intersections with availables priorities [" . implode(',', $prioritiesAvailable) . "]"); } return $priorities; diff --git a/src/Console/ProcessCommand.php b/src/Console/ProcessCommand.php index 53ba37b..82318ad 100644 --- a/src/Console/ProcessCommand.php +++ b/src/Console/ProcessCommand.php @@ -3,77 +3,30 @@ namespace ADT\BackgroundQueue\Console; use ADT\BackgroundQueue\BackgroundQueue; -use ADT\BackgroundQueue\Entity\BackgroundJob; -use DateTime; use Exception; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -#[AsCommand(name: 'background-queue:process')] +#[AsCommand(name: 'background-queue:process', description: 'Processes all records in the READY or TEMPORARILY_FAILED state.')] class ProcessCommand extends Command { - protected static $defaultName = 'background-queue:process'; - - private BackgroundQueue $backgroundQueue; - /** * @throws Exception */ - public function __construct(BackgroundQueue $backgroundQueue) + public function __construct(private readonly BackgroundQueue $backgroundQueue) { parent::__construct(); - $this->backgroundQueue = $backgroundQueue; - } - - protected function configure() - { - $this->setName('background-queue:process'); - $this->setDescription('Processes all records in the READY or TEMPORARILY_FAILED state.'); } /** * @throws Exception + * @throws \Doctrine\DBAL\Exception */ protected function executeCommand(InputInterface $input, OutputInterface $output): int { - $states = BackgroundJob::READY_TO_PROCESS_STATES; - if ($this->backgroundQueue->getConfig()['producer']) { - unset ($states[BackgroundJob::STATE_READY]); - unset ($states[BackgroundJob::STATE_TEMPORARILY_FAILED]); - unset ($states[BackgroundJob::STATE_WAITING]); - - } else { - // Nemáme producera - - unset ($states[BackgroundJob::STATE_BACK_TO_BROKER]); - } - - $qb = $this->backgroundQueue->createQueryBuilder() - ->andWhere('state IN (:state)') - ->setParameter('state', $states); - - /** @var BackgroundJob $_entity */ - foreach ($this->backgroundQueue->fetchAll($qb) as $_entity) { - if ( - $this->backgroundQueue->getConfig()['producer'] - && - $_entity->getState() !== BackgroundJob::STATE_BROKER_FAILED - ) { - $_entity->setState(BackgroundJob::STATE_READY); - $this->backgroundQueue->save($_entity); - $this->backgroundQueue->publishToBroker($_entity); - } else { - if (!$_entity->getProcessedByBroker() && $_entity->getAvailableFrom() > new DateTime()) { - continue; - } - $_entity->setProcessedByBroker(false); - // Chceme použít prioritu na entitě. Může být využito na přeřazení do jiné priority. - // Chceme zařadit do stejné fronty jako původně bylo. Tedy musíme zohlednit co je nastaveno u callbacku. - $this->backgroundQueue->process($_entity, $this->backgroundQueue->getQueueForEntityIncludeCallback($_entity), $_entity->getPriority()); - } - } + $this->backgroundQueue->process(); - return 0; + return self::SUCCESS; } } diff --git a/src/Console/ReloadConsumersCommand.php b/src/Console/ReloadConsumersCommand.php index 6a38715..46b5b3e 100644 --- a/src/Console/ReloadConsumersCommand.php +++ b/src/Console/ReloadConsumersCommand.php @@ -8,20 +8,15 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -#[AsCommand(name: 'background-queue:reload-consumers')] +#[AsCommand(name: 'background-queue:reload-consumers', description: 'Creates the specified number of noop messages to reload consumers consuming specified queue.')] class ReloadConsumersCommand extends Command { - protected static $defaultName = 'background-queue:reload-consumers'; - - protected Producer $producer; - - public function __construct(Producer $producer) + public function __construct(private readonly Producer $producer) { parent::__construct(); - $this->producer = $producer; } - protected function configure() + protected function configure(): void { $this->addArgument( "queue", @@ -33,7 +28,6 @@ protected function configure() InputArgument::REQUIRED, 'Number of consumers to reload.' ); - $this->setDescription('Creates the specified number of noop messages to reload consumers consuming specified queue.'); } protected function executeCommand(InputInterface $input, OutputInterface $output): int @@ -42,6 +36,6 @@ protected function executeCommand(InputInterface $input, OutputInterface $output $this->producer->publishDie($input->getArgument("queue")); } - return 0; + return self::SUCCESS; } } diff --git a/src/Console/UpdateSchemaCommand.php b/src/Console/UpdateSchemaCommand.php index 9bb055b..8a65613 100755 --- a/src/Console/UpdateSchemaCommand.php +++ b/src/Console/UpdateSchemaCommand.php @@ -7,30 +7,14 @@ use Exception; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -#[AsCommand(name: 'background-queue:update-schema')] +#[AsCommand(name: 'background-queue:update-schema', description: 'Update table schema if needed.')] class UpdateSchemaCommand extends Command { - protected static $defaultName = 'background-queue:update-schema'; - - private BackgroundQueue $backgroundQueue; - - /** - * @throws Exception - */ - public function __construct(BackgroundQueue $backgroundQueue) + public function __construct(private readonly BackgroundQueue $backgroundQueue) { parent::__construct(); - $this->backgroundQueue = $backgroundQueue; - } - - protected function configure() - { - $this->setName('background-queue:update-schema'); - $this->setDescription('Update table schema if needed.'); - $this->addOption('force', 'f', InputOption::VALUE_NONE, 'Force schema update'); } /** @@ -40,8 +24,8 @@ protected function configure() */ protected function executeCommand(InputInterface $input, OutputInterface $output): int { - $this->backgroundQueue->updateSchema($input->getOption('force')); + $this->backgroundQueue->updateSchema(); - return 0; + return self::SUCCESS; } } diff --git a/src/Entity/Enums/CallbackNameEnum.php b/src/Entity/Enums/CallbackNameEnum.php new file mode 100644 index 0000000..4dd219b --- /dev/null +++ b/src/Entity/Enums/CallbackNameEnum.php @@ -0,0 +1,8 @@ +fetchAll($backgroundQueue->createQueryBuilder()); - $backgroundQueue->process($backgroundJobs[0]); + $backgroundQueue->processJob($backgroundJobs[0]->getId()); $this->tester->assertEquals($expectedState, $backgroundJobs[0]->getState()); } From 01b26904d653156cffb4d1bccaeac6a18e485b6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 11:46:20 +0100 Subject: [PATCH 02/27] Improves background job handling and type safety Updates the PHP version in the Dockerfile to 8.2 for improved performance and security. Refactors parameter handling within the BackgroundJob entity for better type safety and efficiency. Uses null-safe operator to simplify conditional formatting of datetime properties. --- Dockerfile | 2 +- src/Entity/BackgroundJob.php | 19 ++++++++----------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/Dockerfile b/Dockerfile index 593e969..c06d8b5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM php:8.0-cli +FROM php:8.2-cli RUN apt-get update diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index 1599072..00a3a3e 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -5,6 +5,7 @@ use ADT\Utils\Utils; use DateTime; use DateTimeImmutable; +use DateTimeInterface; use Exception; use ReflectionClass; @@ -142,11 +143,9 @@ public function getParameters(): array } $parametersJson = json_decode($this->parameters_json, true); - $parameters = []; - foreach ($parametersJson as $key => $value) { - $parameters[$key] = Utils::getDateTimeFromArray($value, true); - } - return $parameters; + return array_map(function ($value) { + return Utils::getDateTimeFromArray($value, true); + }, $parametersJson); } /** @@ -157,17 +156,15 @@ public function getParameters(): array * - parametry mohou obsahovat pouze skalární typy, pole, NULL a \DateTimeInterface * - pokud je nějaký z parametrů objekt, automaticky se použije "serialize" * - * @param object|array|string|int|float|bool|null $parameters - * @param string $parametersFormat * @throws Exception */ - public function setParameters($parameters, string $parametersFormat): self + public function setParameters(float|object|int|bool|array|string|null $parameters, string $parametersFormat): self { $parameters = is_array($parameters) ? $parameters : [$parameters]; if ($parametersFormat == self::PARAMETERS_FORMAT_JSON) { foreach ($parameters as $parameter) { - if (!is_scalar($parameter) && !is_array($parameter) && !is_null($parameter) && !($parameter instanceof \DateTimeInterface)) { + if (!is_scalar($parameter) && !is_array($parameter) && !is_null($parameter) && !($parameter instanceof DateTimeInterface)) { $parametersFormat = self::PARAMETERS_FORMAT_SERIALIZE; break; } @@ -372,7 +369,7 @@ public function getDatabaseValues(): array 'parameters_json' => $this->parameters_json, 'state' => $this->state, 'created_at' => $this->createdAt->format('Y-m-d H:i:s'), - 'last_attempt_at' => $this->lastAttemptAt ? $this->lastAttemptAt->format('Y-m-d H:i:s') : null, + 'last_attempt_at' => $this->lastAttemptAt?->format('Y-m-d H:i:s'), 'number_of_attempts' => $this->numberOfAttempts, 'error_message' => $this->errorMessage, 'serial_group' => $this->serialGroup, @@ -381,7 +378,7 @@ public function getDatabaseValues(): array 'postponed_by' => $this->postponedBy, 'processed_by_broker' => (int) $this->processedByBroker, 'execution_time' => (int) $this->executionTime, - 'finished_at' => $this->finishedAt ? $this->finishedAt->format('Y-m-d H:i:s') : null, + 'finished_at' => $this->finishedAt?->format('Y-m-d H:i:s'), 'pid' => $this->pid, 'metadata' => $this->metadata, 'memory' => $this->memory, From 38cfdb8de49ed853015aeea9a9843115c8d4a53a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 11:58:01 +0100 Subject: [PATCH 03/27] Adds exception to process method Adds `Exception` to the `@throws` annotation of the `process` method. This ensures that the method's potential to throw a generic `Exception` is properly documented, improving code clarity and maintainability. --- src/BackgroundQueue.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 656e933..d6e3f72 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -178,6 +178,7 @@ public function doPublishToBroker(): void /** * @throws SchemaException * @throws \Doctrine\DBAL\Exception + * @throws Exception */ public function process(): void { From e08d7c5851e5c7439e1b0304378ec6a1937536a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 12:04:18 +0100 Subject: [PATCH 04/27] Appends queue name for callback jobs Ensures callback jobs are correctly routed to their specified queues by appending the callback queue name to the base queue name. This prevents jobs from being placed in the default queue when a callback-specific queue is defined. --- src/BackgroundQueue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index d6e3f72..c7fe6d3 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -124,7 +124,7 @@ public function publish( $priority = $this->getPriority($priority, $callbackName); $entity = new BackgroundJob(); - $entity->setQueue($this->config['callbacks'][$callbackName]['queue'] ?? $this->config['queue']); + $entity->setQueue($this->config['queue'] . trim($this->config['callbacks'][$callbackName]['queue'] ?? '', '_')); $entity->setPriority($priority); $entity->setCallbackName($callbackName); $entity->setParameters($parameters, $this->config['parametersFormat']); From 88a505270a74669d6b4d389b9ab7c5fec3b436bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 12:16:17 +0100 Subject: [PATCH 05/27] Sets job state to ready before publishing Ensures jobs are marked as ready before being published to the broker. This prevents potential race conditions where a job might be processed before its state is properly initialized. --- src/BackgroundQueue.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index c7fe6d3..fa48bed 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -767,6 +767,8 @@ private function processWaitingJobs(): void continue; } + $_entity->setState(BackgroundJob::STATE_READY); + $this->save($_entity); $this->publishToBroker($_entity); } From 140dad697d141efaffab784727e6c76a4deeb001 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 14:02:51 +0100 Subject: [PATCH 06/27] Adds recurring background job functionality Implements recurring background jobs by introducing an `isRecurring` flag to the `BackgroundJob` entity. Adds logic to re-publish the job after completion if the flag is set. This allows jobs to be automatically rescheduled, enabling periodic tasks. Also, instead of postponing waiting jobs, it clones and republishes them. --- src/BackgroundQueue.php | 16 +++++++++++++--- src/Entity/BackgroundJob.php | 11 +++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index fa48bed..b24be00 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -106,7 +106,8 @@ public function publish( ?string $identifier = null, bool $isUnique = false, ?int $postponeBy = null, - ?int $priority = null + ?int $priority = null, + bool $isRecurring = false, ): void { if (!$callbackName) { @@ -132,6 +133,7 @@ public function publish( $entity->setIdentifier($identifier); $entity->setIsUnique($isUnique); $entity->setPostponedBy($postponeBy); + $entity->setIsRecurring($isRecurring); $this->save($entity); $this->publishToBroker($entity); @@ -340,8 +342,7 @@ public function processJob(int $id): void $state = BackgroundJob::STATE_PERMANENTLY_FAILED; break; case $e instanceof WaitingException: - $state = BackgroundJob::STATE_WAITING; - $entity->setPostponedBy($this->config['waitingJobExpiration']); + $this->cloneAndPublish($entity); break; default: $state = BackgroundJob::STATE_TEMPORARILY_FAILED; @@ -358,6 +359,9 @@ public function processJob(int $id): void if (in_array($state, [BackgroundJob::STATE_TEMPORARILY_FAILED], true)) { $this->publishToBroker($entity); } + if ($entity->getIsRecurring()) { + $this->cloneAndPublish($entity); + } if ($state === BackgroundJob::STATE_PERMANENTLY_FAILED) { // odeslání emailu o chybě v callbacku @@ -445,6 +449,7 @@ public function updateSchema(): void $table->addColumn('pid', Types::INTEGER)->setNotnull(false); $table->addColumn('metadata', Types::JSON)->setNotnull(false); $table->addColumn('memory', Types::JSON)->setNotnull(false); + $table->addColumn('is_recurring', Types::BOOLEAN)->setNotnull(true); $table->setPrimaryKey(['id']); $table->addIndex(['identifier']); @@ -957,4 +962,9 @@ private function databasePing(): bool throw $e; } } + + private function cloneAndPublish(BackgroundJob $entity): void + { + $this->publish($entity->getCallbackName(), $entity->getParameters(), $entity->getSerialGroup(), $entity->getIdentifier(), $entity->isUnique(), $this->config['waitingJobExpiration'], $entity->getPriority()); + } } diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index 00a3a3e..87620cd 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -62,6 +62,7 @@ final class BackgroundJob private ?int $pid = null; // PID supervisor consumera uvintř docker kontejneru private ?string $metadata = null; // ukládá ve formátu JSON private ?string $memory = null; // ukládá ve formátu JSON + private bool $isRecurring = false; public function __construct() { @@ -403,4 +404,14 @@ public function setExecutionTime(?int $executionTime): self $this->executionTime = $executionTime; return $this; } + + public function getIsRecurring(): bool + { + return $this->isRecurring; + } + + public function setIsRecurring(bool $isRecurring): void + { + $this->isRecurring = $isRecurring; + } } From bb6acd09fa5a6144ee824ee6732f3b771ba17031 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 14:12:32 +0100 Subject: [PATCH 07/27] Adds DBAL exception to cloneAndPublish Adds a `@throws \Doctrine\DBAL\Exception` annotation to the `cloneAndPublish` method. This clarifies the method's potential to throw a Doctrine DBAL exception, which improves code maintainability and helps prevent unexpected runtime errors. --- src/BackgroundQueue.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index b24be00..def9099 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -963,6 +963,9 @@ private function databasePing(): bool } } + /** + * @throws \Doctrine\DBAL\Exception + */ private function cloneAndPublish(BackgroundJob $entity): void { $this->publish($entity->getCallbackName(), $entity->getParameters(), $entity->getSerialGroup(), $entity->getIdentifier(), $entity->isUnique(), $this->config['waitingJobExpiration'], $entity->getPriority()); From a16af6aaace3c31c8d55297a1683c445e334bb5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 14:30:10 +0100 Subject: [PATCH 08/27] Replaces boolean flags with ModeEnum Replaces the boolean flags `isUnique` and `isRecurring` with a `ModeEnum` to represent the job's execution mode. This provides a more structured and extensible way to manage job behavior, allowing for easier addition of new modes in the future. It also removes the need for separate boolean checks, simplifying the code and improving readability. --- src/BackgroundQueue.php | 25 ++++++++++++----------- src/Entity/BackgroundJob.php | 37 +++++++++++++++++------------------ src/Entity/Enums/ModeEnum.php | 10 ++++++++++ 3 files changed, 42 insertions(+), 30 deletions(-) create mode 100644 src/Entity/Enums/ModeEnum.php diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index def9099..ca1b910 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -5,6 +5,7 @@ use ADT\BackgroundQueue\Broker\Producer; use ADT\BackgroundQueue\Entity\BackgroundJob; use ADT\BackgroundQueue\Entity\Enums\CallbackNameEnum; +use ADT\BackgroundQueue\Entity\Enums\ModeEnum; use ADT\BackgroundQueue\Exception\DieException; use ADT\BackgroundQueue\Exception\JobNotFoundException; use ADT\BackgroundQueue\Exception\PermanentErrorException; @@ -104,10 +105,9 @@ public function publish( ?array $parameters = null, ?string $serialGroup = null, ?string $identifier = null, - bool $isUnique = false, + ModeEnum $mode = ModeEnum::NORMAL, ?int $postponeBy = null, ?int $priority = null, - bool $isRecurring = false, ): void { if (!$callbackName) { @@ -118,8 +118,12 @@ public function publish( throw new Exception('Callback "' . $callbackName . '" does not exist.'); } - if (!$identifier && $isUnique) { - throw new Exception('Parameter "identifier" has to be set if "isUnique" is true.'); + if (!$identifier && $mode === ModeEnum::UNIQUE) { + throw new Exception('Parameter "identifier" has to be set if "mode" is unique.'); + } + + if (!$identifier && $mode === ModeEnum::RECURRING) { + throw new Exception('Parameter "identifier" has to be set if "mode" is recurring.'); } $priority = $this->getPriority($priority, $callbackName); @@ -131,9 +135,8 @@ public function publish( $entity->setParameters($parameters, $this->config['parametersFormat']); $entity->setSerialGroup($serialGroup); $entity->setIdentifier($identifier); - $entity->setIsUnique($isUnique); + $entity->setMode($mode); $entity->setPostponedBy($postponeBy); - $entity->setIsRecurring($isRecurring); $this->save($entity); $this->publishToBroker($entity); @@ -191,7 +194,7 @@ public function process(): void unset ($states[BackgroundJob::STATE_WAITING]); if (!$this->getUnfinishedJobIdentifiers([CallbackNameEnum::PROCESS_WAITING_JOBS->value])) { - $this->publish(CallbackNameEnum::PROCESS_WAITING_JOBS->value, identifier: CallbackNameEnum::PROCESS_WAITING_JOBS->value); + $this->publish(CallbackNameEnum::PROCESS_WAITING_JOBS->value, identifier: CallbackNameEnum::PROCESS_WAITING_JOBS->value, mode: ModeEnum::RECURRING); } } else { @@ -356,10 +359,10 @@ public function processJob(int $id): void $entity->setState($state) ->setErrorMessage($e ? $e->getMessage() : null); $this->save($entity); - if (in_array($state, [BackgroundJob::STATE_TEMPORARILY_FAILED], true)) { + if ($state === BackgroundJob::STATE_TEMPORARILY_FAILED) { $this->publishToBroker($entity); } - if ($entity->getIsRecurring()) { + if ($entity->isModeRecurring()) { $this->cloneAndPublish($entity); } @@ -569,7 +572,7 @@ private function fetch(QueryBuilder $qb): BackgroundJob */ private function isRedundant(BackgroundJob $entity): bool { - if (!$entity->isUnique()) { + if (!$entity->isModeUnique()) { return false; } @@ -968,6 +971,6 @@ private function databasePing(): bool */ private function cloneAndPublish(BackgroundJob $entity): void { - $this->publish($entity->getCallbackName(), $entity->getParameters(), $entity->getSerialGroup(), $entity->getIdentifier(), $entity->isUnique(), $this->config['waitingJobExpiration'], $entity->getPriority()); + $this->publish($entity->getCallbackName(), $entity->getParameters(), $entity->getSerialGroup(), $entity->getIdentifier(), $entity->getMode(), $this->config['waitingJobExpiration'], $entity->getPriority()); } } diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index 87620cd..51f759a 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -2,6 +2,7 @@ namespace ADT\BackgroundQueue\Entity; +use ADT\BackgroundQueue\Entity\Enums\ModeEnum; use ADT\Utils\Utils; use DateTime; use DateTimeImmutable; @@ -54,7 +55,6 @@ final class BackgroundJob private ?string $errorMessage = null; private ?string $serialGroup = null; private ?string $identifier = null; - private bool $isUnique = false; private ?int $postponedBy = null; private bool $processedByBroker = false; private ?int $executionTime = null; @@ -62,7 +62,7 @@ final class BackgroundJob private ?int $pid = null; // PID supervisor consumera uvintř docker kontejneru private ?string $metadata = null; // ukládá ve formátu JSON private ?string $memory = null; // ukládá ve formátu JSON - private bool $isRecurring = false; + private ModeEnum $mode = ModeEnum::NORMAL; public function __construct() { @@ -247,17 +247,6 @@ public function setIdentifier(?string $identifier): self return $this; } - public function isUnique(): bool - { - return $this->isUnique; - } - - public function setIsUnique(bool $isUnique): self - { - $this->isUnique = $isUnique; - return $this; - } - public function getPostponedBy(): ?int { return $this->postponedBy; @@ -348,7 +337,7 @@ public static function createEntity(array $values): self $entity->errorMessage = $values['error_message']; $entity->serialGroup = $values['serial_group']; $entity->identifier = $values['identifier']; - $entity->isUnique = $values['is_unique']; + $entity->mode = ModeEnum::from($values['mode']); $entity->postponedBy = $values['postponed_by']; $entity->processedByBroker = $values['processed_by_broker']; $entity->executionTime = $values['execution_time']; @@ -375,7 +364,7 @@ public function getDatabaseValues(): array 'error_message' => $this->errorMessage, 'serial_group' => $this->serialGroup, 'identifier' => $this->identifier, - 'is_unique' => (int) $this->isUnique, + 'mode' => $this->mode->value, 'postponed_by' => $this->postponedBy, 'processed_by_broker' => (int) $this->processedByBroker, 'execution_time' => (int) $this->executionTime, @@ -405,13 +394,23 @@ public function setExecutionTime(?int $executionTime): self return $this; } - public function getIsRecurring(): bool + public function getMode(): ModeEnum + { + return $this->mode; + } + + public function setMode(ModeEnum $mode): void + { + $this->mode = $mode; + } + + public function isModeUnique(): bool { - return $this->isRecurring; + return $this->mode === ModeEnum::UNIQUE; } - public function setIsRecurring(bool $isRecurring): void + public function isModeRecurring(): bool { - $this->isRecurring = $isRecurring; + return $this->mode === ModeEnum::RECURRING; } } diff --git a/src/Entity/Enums/ModeEnum.php b/src/Entity/Enums/ModeEnum.php new file mode 100644 index 0000000..b85fcac --- /dev/null +++ b/src/Entity/Enums/ModeEnum.php @@ -0,0 +1,10 @@ + Date: Mon, 17 Nov 2025 15:45:25 +0100 Subject: [PATCH 09/27] Refactors background queue table schema. Removes the `is_unique` and `is_recurring` columns from the background queue table. Replaces the `is_recurring` column with a more generic `mode` column, allowing for different execution modes beyond simple recurrence. This simplifies the schema and provides a more flexible way to manage different types of background jobs. --- src/BackgroundQueue.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index ca1b910..7e5ce3b 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -444,7 +444,6 @@ public function updateSchema(): void $table->addColumn('error_message', Types::TEXT)->setNotnull(false); $table->addColumn('serial_group', Types::STRING, ['length' => 255])->setNotnull(false); $table->addColumn('identifier', Types::STRING, ['length' => 255])->setNotnull(false); - $table->addColumn('is_unique', Types::BOOLEAN)->setNotnull(true)->setDefault(0); $table->addColumn('postponed_by', Types::INTEGER)->setNotnull(false); $table->addColumn('processed_by_broker', Types::BOOLEAN)->setNotnull(true)->setDefault(0); $table->addColumn('execution_time', Types::INTEGER)->setNotnull(false); @@ -452,7 +451,7 @@ public function updateSchema(): void $table->addColumn('pid', Types::INTEGER)->setNotnull(false); $table->addColumn('metadata', Types::JSON)->setNotnull(false); $table->addColumn('memory', Types::JSON)->setNotnull(false); - $table->addColumn('is_recurring', Types::BOOLEAN)->setNotnull(true); + $table->addColumn('mode', Types::STRING)->setNotnull(true)->setDefault(ModeEnum::NORMAL->value); $table->setPrimaryKey(['id']); $table->addIndex(['identifier']); From dca38b22568413d9ca2bb7399e8d6af465b9dee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 15:46:31 +0100 Subject: [PATCH 10/27] Specifies length for the mode column. Ensures database compatibility by setting an explicit length for the mode column, preventing potential errors due to exceeded maximum length during data storage. --- src/BackgroundQueue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 7e5ce3b..910d011 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -451,7 +451,7 @@ public function updateSchema(): void $table->addColumn('pid', Types::INTEGER)->setNotnull(false); $table->addColumn('metadata', Types::JSON)->setNotnull(false); $table->addColumn('memory', Types::JSON)->setNotnull(false); - $table->addColumn('mode', Types::STRING)->setNotnull(true)->setDefault(ModeEnum::NORMAL->value); + $table->addColumn('mode', Types::STRING, ['length' => 255])->setNotnull(true)->setDefault(ModeEnum::NORMAL->value); $table->setPrimaryKey(['id']); $table->addIndex(['identifier']); From f632ef5ec4299ec37744ec81c85f0fe155d4f294 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 16:06:40 +0100 Subject: [PATCH 11/27] Simplifies parameter handling in BackgroundJob Removes the parameters format configuration and always stores parameters as JSON, falling back to serialization if JSON encoding fails. This change simplifies the logic and removes the need for format checking and conversion. --- src/BackgroundQueue.php | 8 +---- src/Entity/BackgroundJob.php | 69 ++++++++++-------------------------- 2 files changed, 20 insertions(+), 57 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 910d011..adb1138 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -75,12 +75,6 @@ public function __construct(array $config) if (!isset($config['bulkSize'])) { $config['bulkSize'] = 1; } - if (!isset($config['parametersFormat'])) { - $config['parametersFormat'] = BackgroundJob::PARAMETERS_FORMAT_JSON; - } - if (!in_array($config['parametersFormat'], BackgroundJob::PARAMETERS_FORMATS, true)) { - throw new Exception('Unsupported parameters format: ' . $config['parametersFormat']); - } if ($config['producer']) { $config['callbacks'][CallbackNameEnum::PROCESS_WAITING_JOBS->value] = [ 'callback' => [$this, trim(CallbackNameEnum::PROCESS_WAITING_JOBS->value, '_')], @@ -132,7 +126,7 @@ public function publish( $entity->setQueue($this->config['queue'] . trim($this->config['callbacks'][$callbackName]['queue'] ?? '', '_')); $entity->setPriority($priority); $entity->setCallbackName($callbackName); - $entity->setParameters($parameters, $this->config['parametersFormat']); + $entity->setParameters($parameters); $entity->setSerialGroup($serialGroup); $entity->setIdentifier($identifier); $entity->setMode($mode); diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index 51f759a..09d140c 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -6,8 +6,9 @@ use ADT\Utils\Utils; use DateTime; use DateTimeImmutable; -use DateTimeInterface; use Exception; +use JsonException; +use Nette\Utils\Json; use ReflectionClass; final class BackgroundJob @@ -35,19 +36,12 @@ final class BackgroundJob self::STATE_REDUNDANT => self::STATE_REDUNDANT, ]; - const PARAMETERS_FORMAT_SERIALIZE = 'serialize'; - const PARAMETERS_FORMAT_JSON = 'json'; - const PARAMETERS_FORMATS = [ - self::PARAMETERS_FORMAT_SERIALIZE, - self::PARAMETERS_FORMAT_JSON, - ]; - private ?int $id = null; private string $queue; private ?int $priority; private string $callbackName; - private $parameters; /** @see self::setParameters() */ - private $parameters_json = null; /** @see self::setParameters() */ + private ?string $parameters = null; /** @see self::setParameters() */ + private ?string $parametersJson = null; /** @see self::setParameters() */ private int $state = self::STATE_READY; private DateTimeImmutable $createdAt; private ?DateTimeImmutable $lastAttemptAt = null; @@ -130,59 +124,34 @@ public function setSerialGroup(?string $serialGroup): self } /** - * Při získávání parametrů detekujeme automaticky formát uložení. - * Tedy při přechodu z self::PARAMETERS_FORMAT_SERIALIZE na self::PARAMETERS_FORMAT_JSON nedojde k výpadku. - * Také je možné v případě ladění chyb data v DB ručně upravit ze serializovaného pole do json formátu, i když pro ukládání používáme self::PARAMETERS_FORMAT_SERIALIZE * @return array + * @throws \Nette\Utils\JsonException */ public function getParameters(): array { + if ($this->parametersJson) { + return array_map(function ($value) { + return Utils::getDateTimeFromArray($value, true); + }, Json::decode($this->parametersJson, forceArrays: true)); + } + $this->parameters = is_resource($this->parameters) ? stream_get_contents($this->parameters) : $this->parameters; if (!is_null($this->parameters)) { return unserialize($this->parameters); } - $parametersJson = json_decode($this->parameters_json, true); - return array_map(function ($value) { - return Utils::getDateTimeFromArray($value, true); - }, $parametersJson); + return []; } - /** - * Parametry ukládá jako serializované pole nebo jako json. - * Formát určuje parametr v BackgroundQueue `parametersFormat`. - * - `serialize` => ukládá jako serializované pole a je bez omezení - * - `json` => ukládá jako json - * - parametry mohou obsahovat pouze skalární typy, pole, NULL a \DateTimeInterface - * - pokud je nějaký z parametrů objekt, automaticky se použije "serialize" - * - * @throws Exception - */ - public function setParameters(float|object|int|bool|array|string|null $parameters, string $parametersFormat): self + public function setParameters(mixed $parameters): self { $parameters = is_array($parameters) ? $parameters : [$parameters]; - if ($parametersFormat == self::PARAMETERS_FORMAT_JSON) { - foreach ($parameters as $parameter) { - if (!is_scalar($parameter) && !is_array($parameter) && !is_null($parameter) && !($parameter instanceof DateTimeInterface)) { - $parametersFormat = self::PARAMETERS_FORMAT_SERIALIZE; - break; - } - } - } - - switch ($parametersFormat) { - case self::PARAMETERS_FORMAT_SERIALIZE: - $this->parameters = serialize($parameters); - $this->parameters_json = null; - break; - case self::PARAMETERS_FORMAT_JSON: - $this->parameters = null; - $this->parameters_json = json_encode($parameters); - break; - default: - throw new Exception("Unsupported parameters format: $parametersFormat"); + try { + $this->parametersJson = Json::encode($parameters); + } catch (JsonException) { + $this->parameters = serialize($parameters); } return $this; @@ -329,7 +298,7 @@ public static function createEntity(array $values): self $entity->priority = $values['priority']; $entity->callbackName = $values['callback_name']; $entity->parameters = $values['parameters']; - $entity->parameters_json = $values['parameters_json']; + $entity->parametersJson = $values['parameters_json']; $entity->state = $values['state']; $entity->createdAt = new DateTimeImmutable($values['created_at']); $entity->lastAttemptAt = $values['last_attempt_at'] ? new DateTimeImmutable($values['last_attempt_at']) : null; @@ -356,7 +325,7 @@ public function getDatabaseValues(): array 'priority' => $this->priority, 'callback_name' => $this->callbackName, 'parameters' => $this->parameters, - 'parameters_json' => $this->parameters_json, + 'parameters_json' => $this->parametersJson, 'state' => $this->state, 'created_at' => $this->createdAt->format('Y-m-d H:i:s'), 'last_attempt_at' => $this->lastAttemptAt?->format('Y-m-d H:i:s'), From 33e610038d785bfd974e894ddbbda12305e72c2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 16:50:37 +0100 Subject: [PATCH 12/27] Handles non-JSONable parameters for background jobs Ensures background job parameters can be serialized even when they contain objects that aren't directly JSON-encodable. It achieves this by first checking if the parameters are "JSONable". If not, it serializes the parameters using PHP's serialize function, providing a fallback mechanism for complex data structures. --- src/Entity/BackgroundJob.php | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index 09d140c..3c1f29a 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -144,13 +144,11 @@ public function getParameters(): array return []; } - public function setParameters(mixed $parameters): self + public function setParameters(array $parameters): self { - $parameters = is_array($parameters) ? $parameters : [$parameters]; - - try { + if ($this->isJsonable($parameters)) { $this->parametersJson = Json::encode($parameters); - } catch (JsonException) { + } else { $this->parameters = serialize($parameters); } @@ -382,4 +380,19 @@ public function isModeRecurring(): bool { return $this->mode === ModeEnum::RECURRING; } + + private function isJsonable(array $value): bool + { + foreach ($value as $item) { + if (is_array($item)) { + if (!$this->isJsonable($item)) { + return false; + } + } elseif (is_object($item) && !$item instanceof \DateTimeInterface) { + return false; + } + } + + return true; + } } From e8e7050965288cde6ad825faa63da75e53dad4e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 16:55:43 +0100 Subject: [PATCH 13/27] Renames JsonException for clarity Renames the aliased JsonException to its non-aliased version for better clarity and consistency. It also updates the PHPDoc for setParameters method to use the non-aliased exception. --- src/Entity/BackgroundJob.php | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index 3c1f29a..f0bede0 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -7,8 +7,8 @@ use DateTime; use DateTimeImmutable; use Exception; -use JsonException; use Nette\Utils\Json; +use Nette\Utils\JsonException; use ReflectionClass; final class BackgroundJob @@ -125,7 +125,7 @@ public function setSerialGroup(?string $serialGroup): self /** * @return array - * @throws \Nette\Utils\JsonException + * @throws JsonException */ public function getParameters(): array { @@ -144,6 +144,9 @@ public function getParameters(): array return []; } + /** + * @throws JsonException + */ public function setParameters(array $parameters): self { if ($this->isJsonable($parameters)) { From 709f4c561ac50d8c950603c5e69b424c06a8c296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 17:03:23 +0100 Subject: [PATCH 14/27] Exposes method to fetch unfinished job identifiers Makes the `getUnfinishedJobIdentifiers` method public. This method retrieves identifiers of background jobs that are not in a finished state, allowing external processes to check for pending or failed jobs. This functionality is required to enable the processing of jobs stuck in a waiting state. --- src/BackgroundQueue.php | 66 ++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index adb1138..60271a9 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -470,6 +470,39 @@ public function dieIfNecessary(): void } } + /** + * @throws Exception + * @throws \Doctrine\DBAL\Exception + */ + public function getUnfinishedJobIdentifiers(array $identifiers = [], bool $excludeProcessing = false): array + { + $qb = $this->createQueryBuilder(); + + $states = BackgroundJob::FINISHED_STATES; + if ($excludeProcessing) { + $states[BackgroundJob::STATE_PROCESSING] = BackgroundJob::STATE_PROCESSING; + } + + $qb->andWhere('state NOT IN (:state)') + ->setParameter('state', $states); + + if ($identifiers) { + $qb->andWhere('identifier IN (:identifier)') + ->setParameter('identifier', $identifiers); + } else { + $qb->andWhere('identifier IS NOT NULL'); + } + + $qb->select('identifier')->groupBy('identifier'); + + $unfinishedJobIdentifiers = []; + foreach ($this->fetchAll($qb, null, false) as $_entity) { + $unfinishedJobIdentifiers[] = $_entity['identifier']; + } + + return $unfinishedJobIdentifiers; + } + public static function parseDsn($dsn): array { // Parse the DSN string @@ -514,39 +547,6 @@ private function createConnection(): void } } - /** - * @throws Exception - * @throws \Doctrine\DBAL\Exception - */ - private function getUnfinishedJobIdentifiers(array $identifiers = [], bool $excludeProcessing = false): array - { - $qb = $this->createQueryBuilder(); - - $states = BackgroundJob::FINISHED_STATES; - if ($excludeProcessing) { - $states[BackgroundJob::STATE_PROCESSING] = BackgroundJob::STATE_PROCESSING; - } - - $qb->andWhere('state NOT IN (:state)') - ->setParameter('state', $states); - - if ($identifiers) { - $qb->andWhere('identifier IN (:identifier)') - ->setParameter('identifier', $identifiers); - } else { - $qb->andWhere('identifier IS NOT NULL'); - } - - $qb->select('identifier')->groupBy('identifier'); - - $unfinishedJobIdentifiers = []; - foreach ($this->fetchAll($qb, null, false) as $_entity) { - $unfinishedJobIdentifiers[] = $_entity['identifier']; - } - - return $unfinishedJobIdentifiers; - } - /** * @throws Exception|\Doctrine\DBAL\Exception */ From 6357dca143309373bc3f209c0bb65b6e7560f555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 17:22:15 +0100 Subject: [PATCH 15/27] Refactors queue naming and consumption Simplifies queue naming logic by introducing a dedicated `getQueue` method, ensuring consistent queue name generation across the application. This change also allows consumers to process jobs from the default queue without specifying a queue name. --- src/BackgroundQueue.php | 19 +++++++++++++++- src/Broker/Consumer.php | 2 +- src/Broker/PhpAmqpLib/Consumer.php | 4 ++-- src/Broker/PhpAmqpLib/Manager.php | 35 +++++------------------------- src/Broker/PhpAmqpLib/Producer.php | 14 +++++++----- src/Console/ConsumeCommand.php | 2 +- 6 files changed, 37 insertions(+), 39 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 60271a9..b784f9d 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -22,6 +22,7 @@ use Doctrine\DBAL\Types\Types; use Exception; use InvalidArgumentException; +use Nette\Utils\JsonException; use PDOException; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -123,7 +124,7 @@ public function publish( $priority = $this->getPriority($priority, $callbackName); $entity = new BackgroundJob(); - $entity->setQueue($this->config['queue'] . trim($this->config['callbacks'][$callbackName]['queue'] ?? '', '_')); + $entity->setQueue($this->getQueue($this->config['callbacks'][$callbackName]['queue'] ?? null)); $entity->setPriority($priority); $entity->setCallbackName($callbackName); $entity->setParameters($parameters); @@ -961,9 +962,25 @@ private function databasePing(): bool /** * @throws \Doctrine\DBAL\Exception + * @throws JsonException */ private function cloneAndPublish(BackgroundJob $entity): void { $this->publish($entity->getCallbackName(), $entity->getParameters(), $entity->getSerialGroup(), $entity->getIdentifier(), $entity->getMode(), $this->config['waitingJobExpiration'], $entity->getPriority()); } + + public function getQueue(?string $queue, ?int $priority = null): string + { + $result = $this->config['queue']; + + if ($queue !== null) { + $result .= '_' . $queue; + } + + if ($priority !== null) { + $result .= '_' . $priority; + } + + return $result; + } } diff --git a/src/Broker/Consumer.php b/src/Broker/Consumer.php index 3229a39..b671ce1 100644 --- a/src/Broker/Consumer.php +++ b/src/Broker/Consumer.php @@ -4,5 +4,5 @@ interface Consumer { - public function consume(string $queue, array $priorities): void; + public function consume(?string $queue = null, array $priorities = []): void; } \ No newline at end of file diff --git a/src/Broker/PhpAmqpLib/Consumer.php b/src/Broker/PhpAmqpLib/Consumer.php index bb66120..52f07f7 100644 --- a/src/Broker/PhpAmqpLib/Consumer.php +++ b/src/Broker/PhpAmqpLib/Consumer.php @@ -20,7 +20,7 @@ public function __construct(Manager $manager, BackgroundQueue $backgroundQueue) /** * @throws Exception */ - public function consume(string $queue, array $priorities): void + public function consume(?string $queue = null, array $priorities = []): void { // TODO Do budoucna cheme podporovat libovolné priority a ne pouze jejich výčet. // Zde si musíme vytáhnout seznam existujících front. To lze přes HTTP API pomocí CURL. @@ -31,7 +31,7 @@ public function consume(string $queue, array $priorities): void // Sestavíme si seznam názvů front v RabbitMQ (tedy včetně priorit) a všechny inicializujeme $queuesWithPriorities = []; foreach ($priorities as $priority) { - $queueWithPriority = $this->manager->getQueueWithPriority($queue, $priority); + $queueWithPriority = $this->backgroundQueue->getQueue($queue, $priority); $queuesWithPriorities[] = $queueWithPriority; $this->manager->createExchange($queueWithPriority); $this->manager->createQueue($queueWithPriority, $queueWithPriority); diff --git a/src/Broker/PhpAmqpLib/Manager.php b/src/Broker/PhpAmqpLib/Manager.php index 9a16836..5065abc 100644 --- a/src/Broker/PhpAmqpLib/Manager.php +++ b/src/Broker/PhpAmqpLib/Manager.php @@ -37,6 +37,9 @@ private function getConnection(): AMQPStreamConnection return $this->connection; } + /** + * @throws Exception + */ public function closeConnection(bool $hard = false): void { $this->closeChannel($hard); @@ -82,7 +85,7 @@ public function closeChannel(bool $hard = false): void $this->initQos = false; } - public function createExchange(string $exchange) + public function createExchange(string $exchange): void { if (isset($this->initExchanges[$exchange])) { return; @@ -99,7 +102,7 @@ public function createExchange(string $exchange) $this->initExchanges[$exchange] = true; } - public function createQueue(string $queue, ?string $exchange = null, array $additionalArguments = []) + public function createQueue(string $queue, ?string $exchange = null, array $additionalArguments = []): void { if (isset($this->initQueues[$queue])) { return; @@ -126,7 +129,7 @@ public function createQueue(string $queue, ?string $exchange = null, array $addi $this->initQueues[$queue] = true; } - public function setupQos() + public function setupQos(): void { if ($this->initQos) { return; @@ -140,30 +143,4 @@ public function setupQos() $this->initQos = true; } - - public function getQueueWithPriority(string $queue, int $priority): string - { - return $queue . '_' . $priority; - } - - public function parseQueueAndPriority(string $queueWithPriority): array - { - $parts = explode('_', $queueWithPriority); - - if (count($parts) === 2) { - return [$parts[0], $parts[1]]; - } elseif (count($parts) > 2) { - $nameParts = []; - while (true) { - $part = array_shift($parts); - if (is_numeric($part)) { - return [implode('_', $nameParts), $part]; - } else { - $nameParts[] = $part; - } - } - } else { - throw new \Exception('Missing priority in queue name.'); - } - } } \ No newline at end of file diff --git a/src/Broker/PhpAmqpLib/Producer.php b/src/Broker/PhpAmqpLib/Producer.php index 4495d11..d163f7a 100644 --- a/src/Broker/PhpAmqpLib/Producer.php +++ b/src/Broker/PhpAmqpLib/Producer.php @@ -3,6 +3,7 @@ namespace ADT\BackgroundQueue\Broker\PhpAmqpLib; use ADT\BackgroundQueue\BackgroundQueue; +use Exception; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Message\AMQPMessage; @@ -11,16 +12,16 @@ class Producer implements \ADT\BackgroundQueue\Broker\Producer { const DIE = 'die'; - private Manager $manager; - - public function __construct( Manager $manager) + public function __construct(private readonly BackgroundQueue $backgroundQueue, private readonly Manager $manager) { - $this->manager = $manager; } + /** + * @throws Exception + */ public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void { - $queue = $this->manager->getQueueWithPriority($queue, $priority); + $queue = $this->backgroundQueue->getQueue($queue, $priority); $exchange = $queue; $this->manager->createExchange($exchange); @@ -46,6 +47,9 @@ public function publish(string $id, string $queue, int $priority, ?int $expirati } + /** + * @throws Exception + */ public function publishDie(string $queue): void { $this->publish(self::DIE, $queue, Manager::QUEUE_TOP_PRIORITY); diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index ba249f0..10dce3a 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -23,7 +23,7 @@ public function __construct( protected function configure(): void { - $this->addArgument('queue', InputArgument::REQUIRED); + $this->addArgument('queue', InputArgument::OPTIONAL); $this->addOption('jobs', 'j', InputOption::VALUE_REQUIRED, 'Number of jobs consumed by one consumer in one process', 1); $this->addOption('priorities', 'p', InputOption::VALUE_REQUIRED, 'Priorities for consume (e.g. 10, 20-40, 25-, -20)'); } From 3ed191dcc3ace124c81f131c7327d2b17b01527e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 17:46:35 +0100 Subject: [PATCH 16/27] Refactors queue naming logic to Manager Moves queue name generation with priority to the Manager class for better consistency. The Manager class now provides `getQueueWithPriority` method which is used in both Producer and Consumer. This ensures that queue names are generated consistently across the application. Also, the Producer and Consumer classes are now readonly and dependencies are injected via constructor promotion. --- src/Broker/PhpAmqpLib/Consumer.php | 11 +++-------- src/Broker/PhpAmqpLib/Manager.php | 5 +++++ src/Broker/PhpAmqpLib/Producer.php | 7 +++---- src/Console/ReloadConsumersCommand.php | 15 ++++++++------- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/Broker/PhpAmqpLib/Consumer.php b/src/Broker/PhpAmqpLib/Consumer.php index 52f07f7..849a80d 100644 --- a/src/Broker/PhpAmqpLib/Consumer.php +++ b/src/Broker/PhpAmqpLib/Consumer.php @@ -6,15 +6,10 @@ use Exception; use PhpAmqpLib\Message\AMQPMessage; -class Consumer implements \ADT\BackgroundQueue\Broker\Consumer +readonly class Consumer implements \ADT\BackgroundQueue\Broker\Consumer { - private BackgroundQueue $backgroundQueue; - private Manager $manager; - - public function __construct(Manager $manager, BackgroundQueue $backgroundQueue) + public function __construct(private Manager $manager, private BackgroundQueue $backgroundQueue) { - $this->manager = $manager; - $this->backgroundQueue = $backgroundQueue; } /** @@ -31,7 +26,7 @@ public function consume(?string $queue = null, array $priorities = []): void // Sestavíme si seznam názvů front v RabbitMQ (tedy včetně priorit) a všechny inicializujeme $queuesWithPriorities = []; foreach ($priorities as $priority) { - $queueWithPriority = $this->backgroundQueue->getQueue($queue, $priority); + $queueWithPriority = $this->manager->getQueueWithPriority($queue, $priority); $queuesWithPriorities[] = $queueWithPriority; $this->manager->createExchange($queueWithPriority); $this->manager->createQueue($queueWithPriority, $queueWithPriority); diff --git a/src/Broker/PhpAmqpLib/Manager.php b/src/Broker/PhpAmqpLib/Manager.php index 5065abc..620186e 100644 --- a/src/Broker/PhpAmqpLib/Manager.php +++ b/src/Broker/PhpAmqpLib/Manager.php @@ -143,4 +143,9 @@ public function setupQos(): void $this->initQos = true; } + + public function getQueueWithPriority(string $queue, int $priority): string + { + return $queue . '_' . $priority; + } } \ No newline at end of file diff --git a/src/Broker/PhpAmqpLib/Producer.php b/src/Broker/PhpAmqpLib/Producer.php index d163f7a..824f715 100644 --- a/src/Broker/PhpAmqpLib/Producer.php +++ b/src/Broker/PhpAmqpLib/Producer.php @@ -2,17 +2,16 @@ namespace ADT\BackgroundQueue\Broker\PhpAmqpLib; -use ADT\BackgroundQueue\BackgroundQueue; use Exception; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Message\AMQPMessage; -class Producer implements \ADT\BackgroundQueue\Broker\Producer +readonly class Producer implements \ADT\BackgroundQueue\Broker\Producer { const DIE = 'die'; - public function __construct(private readonly BackgroundQueue $backgroundQueue, private readonly Manager $manager) + public function __construct(private Manager $manager) { } @@ -21,7 +20,7 @@ public function __construct(private readonly BackgroundQueue $backgroundQueue, p */ public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void { - $queue = $this->backgroundQueue->getQueue($queue, $priority); + $queue = $this->manager->getQueueWithPriority($queue, $priority); $exchange = $queue; $this->manager->createExchange($exchange); diff --git a/src/Console/ReloadConsumersCommand.php b/src/Console/ReloadConsumersCommand.php index 46b5b3e..0eb405d 100644 --- a/src/Console/ReloadConsumersCommand.php +++ b/src/Console/ReloadConsumersCommand.php @@ -2,6 +2,7 @@ namespace ADT\BackgroundQueue\Console; +use ADT\BackgroundQueue\BackgroundQueue; use ADT\BackgroundQueue\Broker\Producer; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputArgument; @@ -11,29 +12,29 @@ #[AsCommand(name: 'background-queue:reload-consumers', description: 'Creates the specified number of noop messages to reload consumers consuming specified queue.')] class ReloadConsumersCommand extends Command { - public function __construct(private readonly Producer $producer) + public function __construct(private readonly BackgroundQueue $backgroundQueue, private readonly Producer $producer) { parent::__construct(); } protected function configure(): void { - $this->addArgument( - "queue", - InputArgument::REQUIRED, - 'A queue whose consumers are to reload.' - ); $this->addArgument( "number", InputArgument::REQUIRED, 'Number of consumers to reload.' ); + $this->addArgument( + "queue", + InputArgument::OPTIONAL, + 'A queue whose consumers are to reload.' + ); } protected function executeCommand(InputInterface $input, OutputInterface $output): int { for ($i = 0; $i < $input->getArgument("number"); $i++) { - $this->producer->publishDie($input->getArgument("queue")); + $this->producer->publishDie($this->backgroundQueue->getQueue($input->getArgument("queue"))); } return self::SUCCESS; From 27aa26f116e46ece653b398e77ffa33f8a3e4de8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 17:51:12 +0100 Subject: [PATCH 17/27] Removes nullable queue from consumer Ensures that the queue parameter is always required when consuming messages. This prevents potential errors and makes the code more robust by explicitly enforcing the presence of a queue name. --- src/Broker/Consumer.php | 2 +- src/Broker/PhpAmqpLib/Consumer.php | 2 +- src/Console/ConsumeCommand.php | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Broker/Consumer.php b/src/Broker/Consumer.php index b671ce1..3229a39 100644 --- a/src/Broker/Consumer.php +++ b/src/Broker/Consumer.php @@ -4,5 +4,5 @@ interface Consumer { - public function consume(?string $queue = null, array $priorities = []): void; + public function consume(string $queue, array $priorities): void; } \ No newline at end of file diff --git a/src/Broker/PhpAmqpLib/Consumer.php b/src/Broker/PhpAmqpLib/Consumer.php index 849a80d..e81c293 100644 --- a/src/Broker/PhpAmqpLib/Consumer.php +++ b/src/Broker/PhpAmqpLib/Consumer.php @@ -15,7 +15,7 @@ public function __construct(private Manager $manager, private BackgroundQueue $b /** * @throws Exception */ - public function consume(?string $queue = null, array $priorities = []): void + public function consume(string $queue, array $priorities): void { // TODO Do budoucna cheme podporovat libovolné priority a ne pouze jejich výčet. // Zde si musíme vytáhnout seznam existujících front. To lze přes HTTP API pomocí CURL. diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index 10dce3a..263f57f 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -43,7 +43,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int for ($i = 0; $i < (int)$jobs; $i++) { $this->backgroundQueue->dieIfNecessary(); - $this->consumer->consume($input->getArgument('queue'), $priorities); + $this->consumer->consume($this->backgroundQueue->getQueue($input->getArgument('queue')), $priorities); } return self::SUCCESS; From b8fbe474c91586285d228feca813dc16441b422a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 18:03:37 +0100 Subject: [PATCH 18/27] Allows null parameters for background jobs Prevents errors when a background job is created without initial parameters. It skips the JSON encoding process if the parameters array is null or empty. --- src/Entity/BackgroundJob.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index f0bede0..526c5e5 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -147,8 +147,12 @@ public function getParameters(): array /** * @throws JsonException */ - public function setParameters(array $parameters): self + public function setParameters(?array $parameters): self { + if (!$parameters) { + return $this; + } + if ($this->isJsonable($parameters)) { $this->parametersJson = Json::encode($parameters); } else { From e4bdc73e2a5cc0a39468c6ead8d6fb6cdcdf11dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 18:28:44 +0100 Subject: [PATCH 19/27] Removes redundant process waiting jobs trigger The check and trigger for processing waiting jobs was removed because it's now handled elsewhere, preventing potential duplicate triggers. --- src/BackgroundQueue.php | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index b784f9d..663c34b 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -773,10 +773,6 @@ private function processWaitingJobs(): void $this->save($_entity); $this->publishToBroker($_entity); } - - if (!$this->getUnfinishedJobIdentifiers([CallbackNameEnum::PROCESS_WAITING_JOBS->value], excludeProcessing: true)) { - $this->publish(CallbackNameEnum::PROCESS_WAITING_JOBS->value, identifier: CallbackNameEnum::PROCESS_WAITING_JOBS->value); - } } private function createQueryBuilder(): QueryBuilder From c5f1c9f12771644e4c9a4be6dd144c840eb47f70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 21:28:30 +0100 Subject: [PATCH 20/27] Reschedules recurring jobs on completion Ensures recurring jobs are rescheduled only after successful completion and only if a job with the same identifier doesn't already exist. This prevents duplicate job creation and ensures that recurring tasks are reliably executed. --- src/BackgroundQueue.php | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 663c34b..bc7d383 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -354,21 +354,20 @@ public function processJob(int $id): void $entity->setState($state) ->setErrorMessage($e ? $e->getMessage() : null); $this->save($entity); - if ($state === BackgroundJob::STATE_TEMPORARILY_FAILED) { + if ($state === BackgroundJob::STATE_FINISHED) { + if ($entity->isModeRecurring()) { + $this->cloneAndPublish($entity); + } + } elseif ($state === BackgroundJob::STATE_TEMPORARILY_FAILED) { $this->publishToBroker($entity); - } - if ($entity->isModeRecurring()) { - $this->cloneAndPublish($entity); - } - if ($state === BackgroundJob::STATE_PERMANENTLY_FAILED) { - // odeslání emailu o chybě v callbacku - $this->logException('Permanent error occured.', $entity, $e); - } elseif ($state === BackgroundJob::STATE_TEMPORARILY_FAILED) { // pri urcitem mnozstvi neuspesnych pokusu posilat email if ($this->config['notifyOnNumberOfAttempts'] && $this->config['notifyOnNumberOfAttempts'] === $entity->getNumberOfAttempts()) { $this->logException('Number of attempts reached "' . $entity->getNumberOfAttempts() . '".', $entity, $e); } + } elseif ($state === BackgroundJob::STATE_PERMANENTLY_FAILED) { + // odeslání emailu o chybě v callbacku + $this->logException('Permanent error occured.', $entity, $e); } } catch (Exception $innerEx) { $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $innerEx); @@ -962,7 +961,9 @@ private function databasePing(): bool */ private function cloneAndPublish(BackgroundJob $entity): void { - $this->publish($entity->getCallbackName(), $entity->getParameters(), $entity->getSerialGroup(), $entity->getIdentifier(), $entity->getMode(), $this->config['waitingJobExpiration'], $entity->getPriority()); + if ($this->getUnfinishedJobIdentifiers([$entity->getIdentifier()])) { + $this->publish($entity->getCallbackName(), $entity->getParameters(), $entity->getSerialGroup(), $entity->getIdentifier(), $entity->getMode(), $this->config['waitingJobExpiration'], $entity->getPriority()); + } } public function getQueue(?string $queue, ?int $priority = null): string From f43e642bb006bf22893429e5a5b60ebaa0ef4a3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 21:34:52 +0100 Subject: [PATCH 21/27] Fixes job re-publishing logic Corrects the condition for re-publishing waiting jobs. It now correctly publishes a job only if it is not already unfinished, preventing potential duplicate processing. --- src/BackgroundQueue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index bc7d383..416d5af 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -961,7 +961,7 @@ private function databasePing(): bool */ private function cloneAndPublish(BackgroundJob $entity): void { - if ($this->getUnfinishedJobIdentifiers([$entity->getIdentifier()])) { + if (!$this->getUnfinishedJobIdentifiers([$entity->getIdentifier()])) { $this->publish($entity->getCallbackName(), $entity->getParameters(), $entity->getSerialGroup(), $entity->getIdentifier(), $entity->getMode(), $this->config['waitingJobExpiration'], $entity->getPriority()); } } From d6b483a4866872a3a714103ef45d36c80b822bd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Mon, 17 Nov 2025 22:44:30 +0100 Subject: [PATCH 22/27] Refactors background job processing Simplifies the process of fetching and transitioning waiting background jobs to ready state, improving efficiency. The changes include: - Modifies logic to find oldest unfinished job IDs by group. - Streamlines queue name generation. --- src/BackgroundQueue.php | 20 ++++++-------------- src/Middleware/Connection.php | 5 +++++ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 416d5af..861e6c5 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -586,7 +586,7 @@ private function isRedundant(BackgroundJob $entity): bool */ private function getPreviousUnfinishedJobId(BackgroundJob $entity): ?int { - foreach ($this->findOldestUnfinishedJobIdsByGroup($entity) as $id) { + foreach ($this->findOldestUnfinishedJobIdsByGroup(array_merge(BackgroundJob::READY_TO_PROCESS_STATES, [BackgroundJob::STATE_PROCESSING]), $entity) as $id) { return $id; } return null; @@ -596,14 +596,14 @@ private function getPreviousUnfinishedJobId(BackgroundJob $entity): ?int * @throws SchemaException * @throws \Doctrine\DBAL\Exception */ - private function findOldestUnfinishedJobIdsByGroup(?BackgroundJob $entity = null): iterable + private function findOldestUnfinishedJobIdsByGroup(array|string $state, ?BackgroundJob $entity = null): iterable { $qb = $this->createQueryBuilder(); $qb->select('MIN(id) as id'); - $qb->andWhere('state NOT IN (:state)') - ->setParameter('state', BackgroundJob::FINISHED_STATES); + $qb->andWhere('state IN (:state)') + ->setParameter('state', $state); if ($entity) { $qb->andWhere('serial_group = :serialGroup') @@ -761,13 +761,9 @@ private function getMemories(): array */ private function processWaitingJobs(): void { - foreach ($this->findOldestUnfinishedJobIdsByGroup() as $id) { + foreach ($this->findOldestUnfinishedJobIdsByGroup(BackgroundJob::STATE_WAITING) as $id) { $_entity = $this->getEntity($id); - if ($_entity->getState() === BackgroundJob::STATE_PROCESSING) { - continue; - } - $_entity->setState(BackgroundJob::STATE_READY); $this->save($_entity); $this->publishToBroker($_entity); @@ -966,7 +962,7 @@ private function cloneAndPublish(BackgroundJob $entity): void } } - public function getQueue(?string $queue, ?int $priority = null): string + public function getQueue(?string $queue): string { $result = $this->config['queue']; @@ -974,10 +970,6 @@ public function getQueue(?string $queue, ?int $priority = null): string $result .= '_' . $queue; } - if ($priority !== null) { - $result .= '_' . $priority; - } - return $result; } } diff --git a/src/Middleware/Connection.php b/src/Middleware/Connection.php index 9a3627e..b86a8e5 100644 --- a/src/Middleware/Connection.php +++ b/src/Middleware/Connection.php @@ -7,6 +7,7 @@ use ADT\BackgroundQueue\BackgroundQueue; use Doctrine\DBAL\Driver\Connection as ConnectionInterface; use Doctrine\DBAL\Driver\Middleware\AbstractConnectionMiddleware; +use Doctrine\DBAL\Exception; final class Connection extends AbstractConnectionMiddleware { @@ -31,6 +32,10 @@ public function rollBack(): void $this->transactionNestingLevel--; } + /** + * @throws Exception + * @throws \Doctrine\DBAL\Driver\Exception + */ public function commit(): void { parent::commit(); From 5c36e74af5739910043609da6e1c3b781363ca7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Tue, 18 Nov 2025 21:06:04 +0100 Subject: [PATCH 23/27] Adds updated_at field to BackgroundJob entity Adds an `updated_at` field to the BackgroundJob entity and database table. This ensures that the last modification time of a background job is tracked, allowing for more efficient processing and potentially enabling features based on job update frequency. The update time is automatically updated whenever the entity is persisted. --- src/BackgroundQueue.php | 3 +++ src/Entity/BackgroundJob.php | 13 +++++++++++++ 2 files changed, 16 insertions(+) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 861e6c5..30e1c36 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -12,6 +12,7 @@ use ADT\BackgroundQueue\Exception\SkipException; use ADT\BackgroundQueue\Exception\WaitingException; use DateTime; +use DateTimeImmutable; use DateTimeInterface; use Doctrine\DBAL\Connection; use Doctrine\DBAL\DriverManager; @@ -446,6 +447,7 @@ public function updateSchema(): void $table->addColumn('metadata', Types::JSON)->setNotnull(false); $table->addColumn('memory', Types::JSON)->setNotnull(false); $table->addColumn('mode', Types::STRING, ['length' => 255])->setNotnull(true)->setDefault(ModeEnum::NORMAL->value); + $table->addColumn('updated_at', Types::DATETIME_IMMUTABLE)->setNotnull(true); $table->setPrimaryKey(['id']); $table->addIndex(['identifier']); @@ -642,6 +644,7 @@ private function save(BackgroundJob $entity): void $this->doPublishToDatabase(); } } else { + $entity->setUpdatedAt(new DateTimeImmutable()); $this->connection->update($this->config['tableName'], $entity->getDatabaseValues(), ['id' => $entity->getId()]); } } diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index 526c5e5..1b57c3a 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -57,6 +57,7 @@ final class BackgroundJob private ?string $metadata = null; // ukládá ve formátu JSON private ?string $memory = null; // ukládá ve formátu JSON private ModeEnum $mode = ModeEnum::NORMAL; + private DateTimeImmutable $updatedAt; public function __construct() { @@ -319,6 +320,7 @@ public static function createEntity(array $values): self $entity->pid = $values['pid']; $entity->metadata = $values['metadata']; $entity->memory = $values['memory']; + $entity->updatedAt = new DateTimeImmutable($values['updated_at']); return $entity; } @@ -346,6 +348,7 @@ public function getDatabaseValues(): array 'pid' => $this->pid, 'metadata' => $this->metadata, 'memory' => $this->memory, + 'updated_at' => $this->updatedAt->format('Y-m-d H:i:s') ]; } @@ -402,4 +405,14 @@ private function isJsonable(array $value): bool return true; } + + public function getUpdatedAt(): DateTimeImmutable + { + return $this->updatedAt; + } + + public function setUpdatedAt(DateTimeImmutable $updatedAt): void + { + $this->updatedAt = $updatedAt; + } } From 87226ba53717b5540ba571cf923b2aa369afc8d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Tue, 18 Nov 2025 21:10:11 +0100 Subject: [PATCH 24/27] Sets default value for 'updated_at' column Ensures that the 'updated_at' column in the database table defaults to the current timestamp. This change addresses a potential issue where the 'updated_at' value might not be initialized properly when a new record is created. --- src/BackgroundQueue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 30e1c36..7cc82b5 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -447,7 +447,7 @@ public function updateSchema(): void $table->addColumn('metadata', Types::JSON)->setNotnull(false); $table->addColumn('memory', Types::JSON)->setNotnull(false); $table->addColumn('mode', Types::STRING, ['length' => 255])->setNotnull(true)->setDefault(ModeEnum::NORMAL->value); - $table->addColumn('updated_at', Types::DATETIME_IMMUTABLE)->setNotnull(true); + $table->addColumn('updated_at', Types::DATETIME_IMMUTABLE)->setNotnull(true)->setDefault('CURRENT_TIMESTAMP'); $table->setPrimaryKey(['id']); $table->addIndex(['identifier']); From 01cabc32300a10f0b26ee60015c95993abf42439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Tue, 18 Nov 2025 21:24:57 +0100 Subject: [PATCH 25/27] Initializes updated at field on job creation Ensures that the `updatedAt` field is properly initialized when a new background job is created. This allows to correctly track when a job was last modified for the process waiting jobs functionality. --- src/Entity/BackgroundJob.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index 1b57c3a..d9a6ed2 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -62,6 +62,7 @@ final class BackgroundJob public function __construct() { $this->createdAt = new DateTimeImmutable(); + $this->updatedAt = new DateTimeImmutable(); } public function __clone() From cc5fc1ca308efb5146bc80cf51cb98f06d95ed91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Thu, 20 Nov 2025 05:39:27 +0100 Subject: [PATCH 26/27] Updates entity only if in ready state Updates the entity's update timestamp only when the entity is in the 'ready' state. This prevents unnecessary updates to the database record when the entity is not yet ready for processing. --- src/BackgroundQueue.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 7cc82b5..ab30b27 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -644,7 +644,9 @@ private function save(BackgroundJob $entity): void $this->doPublishToDatabase(); } } else { - $entity->setUpdatedAt(new DateTimeImmutable()); + if ($entity->getState() === BackgroundJob::STATE_READY) { + $entity->setUpdatedAt(new DateTimeImmutable()); + } $this->connection->update($this->config['tableName'], $entity->getDatabaseValues(), ['id' => $entity->getId()]); } } From 70b6cfc4fd6f5beb08507ab4b21d0c45067a8016 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Fri, 19 Dec 2025 12:20:02 +0100 Subject: [PATCH 27/27] Simplifies job fetching logic Refactors the job fetching logic to use a single method for retrieving jobs. This change improves code maintainability and reduces redundancy by consolidating the fetching logic into a single, reusable method. It also addresses potential inconsistencies and improves the overall reliability of job retrieval. --- src/BackgroundQueue.php | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index ab30b27..dce9029 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -549,18 +549,6 @@ private function createConnection(): void } } - /** - * @throws Exception|\Doctrine\DBAL\Exception - */ - private function fetch(QueryBuilder $qb): BackgroundJob - { - if (!$entities = $this->fetchAll($qb, 1)) { - throw new JobNotFoundException(); - } - - return $entities[0]; - } - /** * @throws Exception * @throws \Doctrine\DBAL\Exception @@ -579,7 +567,7 @@ private function isRedundant(BackgroundJob $entity): bool $qb->andWhere('id < :id') ->setParameter('id', $entity->getId()); - return (bool) $this->fetch($qb); + return (bool) $this->fetchAll($qb, 1); } /** @@ -699,11 +687,11 @@ private function getEntity(int $id): BackgroundJob ->andWhere('id = :id') ->setParameter('id', $id); - if (!$entity = $this->fetch($qb)) { - throw new JobNotFoundException('Job ' . $id . ' not found.'); + if (!$entities = $this->fetchAll($qb, 1)) { + throw new JobNotFoundException(); } - return $entity; + return $entities[0]; } /**