diff --git a/Dockerfile b/Dockerfile index 593e969..c06d8b5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM php:8.0-cli +FROM php:8.2-cli RUN apt-get update diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 0fa09f6..dce9029 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -4,11 +4,15 @@ use ADT\BackgroundQueue\Broker\Producer; use ADT\BackgroundQueue\Entity\BackgroundJob; +use ADT\BackgroundQueue\Entity\Enums\CallbackNameEnum; +use ADT\BackgroundQueue\Entity\Enums\ModeEnum; use ADT\BackgroundQueue\Exception\DieException; +use ADT\BackgroundQueue\Exception\JobNotFoundException; use ADT\BackgroundQueue\Exception\PermanentErrorException; use ADT\BackgroundQueue\Exception\SkipException; use ADT\BackgroundQueue\Exception\WaitingException; -use ADT\Utils\FileSystem; +use DateTime; +use DateTimeImmutable; use DateTimeInterface; use Doctrine\DBAL\Connection; use Doctrine\DBAL\DriverManager; @@ -19,6 +23,7 @@ use Doctrine\DBAL\Types\Types; use Exception; use InvalidArgumentException; +use Nette\Utils\JsonException; use PDOException; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -36,7 +41,7 @@ class BackgroundQueue private LoggerInterface $logger; private ?Producer $producer = null; private int $bulkSize = 1; - private array $bulkBrokerCallbacks = []; + private array $entitiesToPublish = []; private array $bulkDatabaseEntities = []; private bool $shouldDie = false; @@ -72,11 +77,12 @@ public function __construct(array $config) if (!isset($config['bulkSize'])) { $config['bulkSize'] = 1; } - if (!isset($config['parametersFormat'])) { - $config['parametersFormat'] = BackgroundJob::PARAMETERS_FORMAT_JSON; - } - if (!in_array($config['parametersFormat'], BackgroundJob::PARAMETERS_FORMATS, true)) { - throw new Exception('Unsupported parameters format: ' . $config['parametersFormat']); + if ($config['producer']) { + $config['callbacks'][CallbackNameEnum::PROCESS_WAITING_JOBS->value] = [ + 'callback' => [$this, trim(CallbackNameEnum::PROCESS_WAITING_JOBS->value, '_')], + 'queue' => null, + 'priority' => null + ]; } $this->config = $config; @@ -87,60 +93,6 @@ public function __construct(array $config) } } - /** - * Bezpečně ověříme, že nedošlo ke ztrátě spojení k DB. - * Pokud ano, připojíme se znovu. - * @throws \Doctrine\DBAL\Exception - */ - private function databaseConnectionCheckAndReconnect(): void - { - $warningHandler = function($errno, $errstr) { - $this->logger->log('critical', new Exception('BackgroundQueue - database connection lost (warning): code(' . $errno . ') ' . $errstr, 0)); - $this->connection->close(); - $this->connection->getNativeConnection(); - }; - - set_error_handler($warningHandler, E_WARNING); - - try { - if (!$this->databasePing()) { - $this->connection->close(); - $this->connection->getNativeConnection(); - } - } catch (Exception $e) { - $this->logger->log('critical', new Exception('BackgroundQueue - database connection lost (exception): ' . $e->getMessage(), 0, $e)); - $this->connection->close(); - $this->connection->getNativeConnection(); - } finally { - restore_error_handler(); - } - } - - /** - * @throws Exception - */ - private function databasePing(): bool - { - set_error_handler(function ($severity, $message) { - throw new PDOException($message, $severity); - }); - - try { - $this->connection->executeQuery($this->connection->getDatabasePlatform()->getDummySelectSQL()); - restore_error_handler(); - - return true; - - } catch (\Doctrine\DBAL\Exception) { - restore_error_handler(); - return false; - - } catch (Exception $e) { - restore_error_handler(); - throw $e; - } - } - /** * @throws Exception|\Doctrine\DBAL\Exception */ @@ -149,33 +101,37 @@ public function publish( ?array $parameters = null, ?string $serialGroup = null, ?string $identifier = null, - bool $isUnique = false, + ModeEnum $mode = ModeEnum::NORMAL, ?int $postponeBy = null, - ?int $priority = null + ?int $priority = null, ): void { if (!$callbackName) { throw new Exception('The job does not have the required parameter "callbackName" set.'); } - if (!isset($this->config['callbacks'][$callbackName])) { + if (!$this->getCallback($callbackName)) { throw new Exception('Callback "' . $callbackName . '" does not exist.'); } - if (!$identifier && $isUnique) { - throw new Exception('Parameter "identifier" has to be set if "isUnique" is true.'); + if (!$identifier && $mode === ModeEnum::UNIQUE) { + throw new Exception('Parameter "identifier" has to be set if "mode" is unique.'); + } + + if (!$identifier && $mode === ModeEnum::RECURRING) { + throw new Exception('Parameter "identifier" has to be set if "mode" is recurring.'); } $priority = $this->getPriority($priority, $callbackName); $entity = new BackgroundJob(); - $entity->setQueue($this->config['queue']); + $entity->setQueue($this->getQueue($this->config['callbacks'][$callbackName]['queue'] ?? null)); $entity->setPriority($priority); $entity->setCallbackName($callbackName); - $entity->setParameters($parameters, $this->config['parametersFormat']); + $entity->setParameters($parameters); $entity->setSerialGroup($serialGroup); $entity->setIdentifier($identifier); - $entity->setIsUnique($isUnique); + $entity->setMode($mode); $entity->setPostponedBy($postponeBy); $this->save($entity); @@ -184,6 +140,7 @@ public function publish( /** * @throws Exception + * @throws \Doctrine\DBAL\Exception * @internal */ public function publishToBroker(BackgroundJob $entity): void @@ -192,33 +149,78 @@ public function publishToBroker(BackgroundJob $entity): void return; } - $this->bulkBrokerCallbacks[] = function () use ($entity) { + $this->entitiesToPublish[] = $entity; + + if (!$this->connection->isTransactionActive() && count($this->entitiesToPublish) >= $this->bulkSize) { + $this->doPublishToBroker(); + } + } + + /** + * @throws \Doctrine\DBAL\Exception + * @internal + */ + public function doPublishToBroker(): void + { + foreach ($this->entitiesToPublish as $_entity) { try { - // Pokud mám u callbacku nastavenou frontu, v DB zůstává původní, ale do brokera použiju tu z konfigu. - // Tedy řadím do různých front pro různé consumery, ale z pohledu záznamů v DB se jedná o stejnou frontu. - $this->producer->publish((string)$entity->getId(), $this->getQueueForEntityIncludeCallback($entity), $entity->getPriority(), $entity->getPostponedBy()); + $this->producer->publish((string)$_entity->getId(), $_entity->getQueue(), $_entity->getPriority(), $_entity->getPostponedBy()); } catch (Exception $e) { - $entity->setState(BackgroundJob::STATE_BROKER_FAILED) + $_entity->setState(BackgroundJob::STATE_BROKER_FAILED) ->setErrorMessage($e->getMessage()); - $this->save($entity); + $this->save($_entity); - $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $e); + $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $_entity, $e); } - }; - if (!$this->connection->isTransactionActive() && count($this->bulkBrokerCallbacks) >= $this->bulkSize) { - $this->doPublishToBroker(); } + $this->entitiesToPublish = []; } /** - * @internal + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception + * @throws Exception */ - public function doPublishToBroker(): void + public function process(): void { - foreach ($this->bulkBrokerCallbacks as $_closure) { - $_closure(); + $states = BackgroundJob::READY_TO_PROCESS_STATES; + if ($this->getConfig()['producer']) { + unset ($states[BackgroundJob::STATE_READY]); + unset ($states[BackgroundJob::STATE_TEMPORARILY_FAILED]); + unset ($states[BackgroundJob::STATE_WAITING]); + + if (!$this->getUnfinishedJobIdentifiers([CallbackNameEnum::PROCESS_WAITING_JOBS->value])) { + $this->publish(CallbackNameEnum::PROCESS_WAITING_JOBS->value, identifier: CallbackNameEnum::PROCESS_WAITING_JOBS->value, mode: ModeEnum::RECURRING); + } + + } else { + // Nemáme producera + + unset ($states[BackgroundJob::STATE_BACK_TO_BROKER]); + } + + $qb = $this->createQueryBuilder() + ->andWhere('state IN (:state)') + ->setParameter('state', $states); + + /** @var BackgroundJob $_entity */ + foreach ($this->fetchAll($qb) as $_entity) { + if ( + $this->getConfig()['producer'] + && + $_entity->getState() !== BackgroundJob::STATE_BROKER_FAILED + ) { + $_entity->setState(BackgroundJob::STATE_READY); + $this->save($_entity); + $this->publishToBroker($_entity); + } else { + if (!$_entity->getProcessedByBroker() && $_entity->getAvailableFrom() > new DateTime()) { + continue; + } + $_entity->setProcessedByBroker(false); + $this->processJob($_entity->getId()); + } } - $this->bulkBrokerCallbacks = []; } /** @@ -227,16 +229,12 @@ public function doPublishToBroker(): void * @throws Exception * @internal */ - public function process(int|BackgroundJob $entity, string $queue, int $priority): void + public function processJob(int $id): void { // U publishera chceme transakci stejnou s flush, proto používáme stejné connection jako je v aplikaci. Ale u consumera chceme vlastní connection, aby když se revertne aplikační transakce, tak aby consumer mohl zapsat chybový stav k BackgroundJob. $this->createConnection(); - if (!$entity instanceof BackgroundJob) { - if (!$entity = $this->getEntity($entity, $queue, $priority)) { - return; - } - } + $entity = $this->getEntity($id); // Zpráva není ke zpracování v případě, že nemá stav READY nebo ERROR_REPEATABLE // Pokud při zpracování zprávy nastane chyba, zpráva zůstane ve stavu PROCESSING a consumer se ukončí. @@ -343,8 +341,7 @@ public function process(int|BackgroundJob $entity, string $queue, int $priority) $state = BackgroundJob::STATE_PERMANENTLY_FAILED; break; case $e instanceof WaitingException: - $state = BackgroundJob::STATE_WAITING; - $entity->setPostponedBy($this->config['waitingJobExpiration']); + $this->cloneAndPublish($entity); break; default: $state = BackgroundJob::STATE_TEMPORARILY_FAILED; @@ -358,29 +355,120 @@ public function process(int|BackgroundJob $entity, string $queue, int $priority) $entity->setState($state) ->setErrorMessage($e ? $e->getMessage() : null); $this->save($entity); - if (in_array($state, [BackgroundJob::STATE_TEMPORARILY_FAILED, BackgroundJob::STATE_WAITING], true)) { + if ($state === BackgroundJob::STATE_FINISHED) { + if ($entity->isModeRecurring()) { + $this->cloneAndPublish($entity); + } + } elseif ($state === BackgroundJob::STATE_TEMPORARILY_FAILED) { $this->publishToBroker($entity); - } - if ($state === BackgroundJob::STATE_PERMANENTLY_FAILED) { - // odeslání emailu o chybě v callbacku - $this->logException('Permanent error occured.', $entity, $e); - } elseif ($state === BackgroundJob::STATE_TEMPORARILY_FAILED) { // pri urcitem mnozstvi neuspesnych pokusu posilat email if ($this->config['notifyOnNumberOfAttempts'] && $this->config['notifyOnNumberOfAttempts'] === $entity->getNumberOfAttempts()) { $this->logException('Number of attempts reached "' . $entity->getNumberOfAttempts() . '".', $entity, $e); } + } elseif ($state === BackgroundJob::STATE_PERMANENTLY_FAILED) { + // odeslání emailu o chybě v callbacku + $this->logException('Permanent error occured.', $entity, $e); } } catch (Exception $innerEx) { $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $innerEx); } } - private function createConnection(): void + public function getConfig(): array { - if (!$this->connectionCreated) { - $this->connection = DriverManager::getConnection($this->connection->getParams()); - $this->connectionCreated = true; + return $this->config; + } + + public function startBulk(): void + { + $this->bulkSize = $this->config['bulkSize']; + } + + /** + * @throws \Doctrine\DBAL\Exception + * @throws SchemaException + */ + public function endBulk(): void + { + $this->doPublishToDatabase(); + $this->doPublishToBroker(); + $this->bulkSize = 1; + } + + /** + * @throws \Doctrine\DBAL\Exception + */ + public function clearFinishedJobs(?int $days = null): void + { + $qb = $this->createQueryBuilder() + ->delete($this->getConfig()['tableName']) + ->andWhere('state = :state') + ->setParameter('state', BackgroundJob::STATE_FINISHED); + + if ($days) { + $qb->andWhere('created_at <= :ago') + ->setParameter('ago', (new DateTime('midnight'))->modify('-' . $days . ' days')->format('Y-m-d H:i:s')); + } + + $qb->executeStatement(); + } + + /** + * @throws \Doctrine\DBAL\Exception + * @throws SchemaException* + * @throws Exception + * @internal + */ + public function updateSchema(): void + { + $schema = new Schema([], [], $this->createSchemaManager()->createSchemaConfig()); + + $table = $schema->createTable($this->config['tableName']); + + $table->addColumn('id', Types::BIGINT, ['unsigned' => true])->setAutoincrement(true)->setNotnull(true); + $table->addColumn('queue', Types::STRING, ['length' => 255])->setNotnull(true); + $table->addColumn('priority', Types::INTEGER)->setNotnull(false); + $table->addColumn('callback_name', Types::STRING, ['length' => 255])->setNotnull(true); + $table->addColumn('parameters', Types::BLOB)->setNotnull(false); + $table->addColumn('parameters_json', Types::JSON)->setNotnull(false); + $table->addColumn('state', Types::SMALLINT)->setNotnull(true); + $table->addColumn('created_at', Types::DATETIME_IMMUTABLE)->setNotnull(true); + $table->addColumn('last_attempt_at', Types::DATETIME_IMMUTABLE)->setNotnull(false); + $table->addColumn('number_of_attempts', Types::INTEGER)->setNotnull(true)->setDefault(0); + $table->addColumn('error_message', Types::TEXT)->setNotnull(false); + $table->addColumn('serial_group', Types::STRING, ['length' => 255])->setNotnull(false); + $table->addColumn('identifier', Types::STRING, ['length' => 255])->setNotnull(false); + $table->addColumn('postponed_by', Types::INTEGER)->setNotnull(false); + $table->addColumn('processed_by_broker', Types::BOOLEAN)->setNotnull(true)->setDefault(0); + $table->addColumn('execution_time', Types::INTEGER)->setNotnull(false); + $table->addColumn('finished_at', Types::DATETIME_IMMUTABLE)->setNotnull(false); + $table->addColumn('pid', Types::INTEGER)->setNotnull(false); + $table->addColumn('metadata', Types::JSON)->setNotnull(false); + $table->addColumn('memory', Types::JSON)->setNotnull(false); + $table->addColumn('mode', Types::STRING, ['length' => 255])->setNotnull(true)->setDefault(ModeEnum::NORMAL->value); + $table->addColumn('updated_at', Types::DATETIME_IMMUTABLE)->setNotnull(true)->setDefault('CURRENT_TIMESTAMP'); + + $table->setPrimaryKey(['id']); + $table->addIndex(['identifier']); + $table->addIndex(['state']); + + $schemaManager = $this->createSchemaManager(); + if ($schemaManager->tablesExist([$this->config['tableName']])) { + $tableDiff = $schemaManager->createComparator()->compareTables($this->createSchemaManager()->introspectTable($this->config['tableName']), $table); + $sqls = $this->connection->getDatabasePlatform()->getAlterTableSQL($tableDiff); + } else { + $sqls = $this->connection->getDatabasePlatform()->getCreateTableSQL($table); + } + foreach ($sqls as $sql) { + $this->executeStatement($sql); + } + } + + public function dieIfNecessary(): void + { + if ($this->shouldDie) { + die(); } } @@ -417,89 +505,57 @@ public function getUnfinishedJobIdentifiers(array $identifiers = [], bool $exclu return $unfinishedJobIdentifiers; } - public function getConfig(): array + public static function parseDsn($dsn): array { - return $this->config; - } + // Parse the DSN string + $parsedDsn = parse_url($dsn); + if ($parsedDsn === false || !isset($parsedDsn['scheme'], $parsedDsn['host'], $parsedDsn['path'])) { + throw new RuntimeException("Invalid DSN: " . $dsn); + } - /** - * @internal - * @throws \Doctrine\DBAL\Exception - * @throws SchemaException - */ - public function createQueryBuilder(): QueryBuilder - { - return $this->connection->createQueryBuilder() - ->select('*') - ->from($this->config['tableName']) - ->andWhere('queue = :queue') - ->setParameter('queue', $this->config['queue']); - } + // Convert DSN scheme to Doctrine DBAL driver name + $driversMap = [ + 'mysql' => 'pdo_mysql', + 'pgsql' => 'pdo_pgsql', + 'sqlsrv' => 'pdo_sqlsrv', + ]; - /** - * @throws Exception|\Doctrine\DBAL\Exception - * @internal - */ - public function fetchAll(QueryBuilder $qb, ?int $maxResults = null, $toEntity = true): array - { - $sql = $qb->setMaxResults($maxResults)->getSQL(); - $parameters = $qb->getParameters(); - foreach ($parameters as $key => &$_value) { - if (is_array($_value)) { - $sql = str_replace(':' . $key, self::bindParamArray($key, $_value, $parameters), $sql); - } elseif ($_value instanceof DateTimeInterface) { - $_value->format('Y-m-d H:i:s'); - } + if (!isset($driversMap[$parsedDsn['scheme']])) { + throw new RuntimeException("Unknown DSN scheme: " . $parsedDsn['scheme']); } - $entities = []; - foreach ($this->fetchAllAssociative($sql, $parameters) as $_row) { - $entities[] = $toEntity ? BackgroundJob::createEntity($_row) : $_row; + // Convert DSN components to Doctrine DBAL connection parameters + $dbParams = [ + 'driver' => $driversMap[$parsedDsn['scheme']], + 'user' => $parsedDsn['user'] ?? null, + 'password' => $parsedDsn['pass'] ?? null, + 'host' => $parsedDsn['host'], + 'dbname' => ltrim($parsedDsn['path'], '/'), + ]; + + if (isset($parsedDsn['port'])) { + $dbParams['port'] = $parsedDsn['port']; } - return $entities; + return $dbParams; } - public function startBulk(): void + private function createConnection(): void { - $this->bulkSize = $this->config['bulkSize']; + if (!$this->connectionCreated) { + $this->connection = DriverManager::getConnection($this->connection->getParams()); + $this->connectionCreated = true; + } } /** + * @throws Exception * @throws \Doctrine\DBAL\Exception - * @throws SchemaException */ - public function endBulk(): void + private function isRedundant(BackgroundJob $entity): bool { - $this->doPublishToDatabase(); - $this->doPublishToBroker(); - $this->bulkSize = 1; - } - - /** - * @throws Exception|\Doctrine\DBAL\Exception - */ - private function fetch(QueryBuilder $qb): ?BackgroundJob - { - return ($entities = $this->fetchAll($qb, 1)) ? $entities[0]: null; - } - - /** - * @throws Exception|\Doctrine\DBAL\Exception - */ - private function count(QueryBuilder $qb): int - { - return count($this->fetchAll($qb, 1, false)); - } - - /** - * @throws Exception - * @throws \Doctrine\DBAL\Exception - */ - private function isRedundant(BackgroundJob $entity): bool - { - if (!$entity->isUnique()) { + if (!$entity->isModeUnique()) { return false; } @@ -511,42 +567,57 @@ private function isRedundant(BackgroundJob $entity): bool $qb->andWhere('id < :id') ->setParameter('id', $entity->getId()); - return (bool) $this->fetch($qb); + return (bool) $this->fetchAll($qb, 1); } /** - * @throws SchemaException * @throws \Doctrine\DBAL\Exception + * @throws SchemaException */ - private function getPreviousUnfinishedJob(BackgroundJob $entity): ?BackgroundJob + private function getPreviousUnfinishedJobId(BackgroundJob $entity): ?int { - if (!$entity->getSerialGroup()) { - return null; + foreach ($this->findOldestUnfinishedJobIdsByGroup(array_merge(BackgroundJob::READY_TO_PROCESS_STATES, [BackgroundJob::STATE_PROCESSING]), $entity) as $id) { + return $id; } + return null; + } + /** + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception + */ + private function findOldestUnfinishedJobIdsByGroup(array|string $state, ?BackgroundJob $entity = null): iterable + { $qb = $this->createQueryBuilder(); - $qb->andWhere('state NOT IN (:state)') - ->setParameter('state', BackgroundJob::FINISHED_STATES); + $qb->select('MIN(id) as id'); - $qb->andWhere('serial_group = :serial_group') - ->setParameter('serial_group', $entity->getSerialGroup()); + $qb->andWhere('state IN (:state)') + ->setParameter('state', $state); + + if ($entity) { + $qb->andWhere('serial_group = :serialGroup') + ->setParameter('serialGroup', $entity->getSerialGroup()); - if ($entity->getId()) { $qb->andWhere('id < :id') ->setParameter('id', $entity->getId()); } - $qb->orderBy('id'); + $qb->groupBy('serial_group'); + + $qb->orderBy('id', 'ASC'); + + foreach ($this->fetchAll($qb, toEntity: false) as $row) { + yield $row['id']; + } - return $this->fetch($qb); + return []; } /** - * @internal * @throws \Doctrine\DBAL\Exception */ - public function save(BackgroundJob $entity): void + private function save(BackgroundJob $entity): void { $this->databaseConnectionCheckAndReconnect(); @@ -561,10 +632,169 @@ public function save(BackgroundJob $entity): void $this->doPublishToDatabase(); } } else { + if ($entity->getState() === BackgroundJob::STATE_READY) { + $entity->setUpdatedAt(new DateTimeImmutable()); + } $this->connection->update($this->config['tableName'], $entity->getDatabaseValues(), ['id' => $entity->getId()]); } } + /** + * @throws \Doctrine\DBAL\Exception + */ + private function createSchemaManager(): AbstractSchemaManager + { + return method_exists($this->connection, 'createSchemaManager') + ? $this->connection->createSchemaManager() + : $this->connection->getSchemaManager(); + } + + /** + * @throws \Doctrine\DBAL\Exception + */ + private function executeStatement(string $sql): void + { + method_exists($this->connection, 'executeStatement') + ? $this->connection->executeStatement($sql) + : $this->connection->exec($sql); + } + + /** + * @throws \Doctrine\DBAL\Exception + */ + private function fetchAllAssociative(string $sql, array $parameters): array + { + return method_exists($this->connection, 'fetchAllAssociative') + ? $this->connection->fetchAllAssociative($sql, $parameters) + : $this->connection->fetchAll($sql, $parameters); + } + + private static function getPostponement(int $numberOfAttempts): int + { + return min(16, 2 ** ($numberOfAttempts -1)) * 1000 * 60; + } + + /** + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception + * @throws Exception + */ + private function getEntity(int $id): BackgroundJob + { + $this->databaseConnectionCheckAndReconnect(); + + $qb = $this->createQueryBuilder() + ->andWhere('id = :id') + ->setParameter('id', $id); + + if (!$entities = $this->fetchAll($qb, 1)) { + throw new JobNotFoundException(); + } + + return $entities[0]; + } + + /** + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception + */ + private function checkUnfinishedJobs(BackgroundJob $entity): bool + { + if (!$entity->getSerialGroup()) { + return true; + } + + if ($previousEntityId = $this->getPreviousUnfinishedJobId($entity)) { + try { + $entity->setState(BackgroundJob::STATE_WAITING); + $entity->setErrorMessage('Waiting for job ID ' . $previousEntityId); + $entity->setPostponedBy($this->config['waitingJobExpiration']); + $this->save($entity); + } catch (Exception $e) { + $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $e); + } + return false; + } + + return true; + } + + private function getPriority(?int $priority, string $callbackName): int + { + if (is_null($priority)) { + $priority = $this->config['callbacks'][$callbackName]['priority'] ?? array_values($this->config['priorities'])[0]; + } + + if (!in_array($priority, $this->config['priorities'])) { + throw new InvalidArgumentException("Priority $priority for callback $callbackName is not in available priorities: " . implode(',', $this->config['priorities'])); + } + + return $priority; + } + + private function getCallback($callback): callable + { + return $this->config['callbacks'][$callback]['callback'] ?? $this->config['callbacks'][$callback]; + } + + private function getMemories(): array + { + return [ + 'notRealActual' => memory_get_usage(), + 'realActual' => memory_get_usage(true), + 'notRealPeak' => memory_get_peak_usage(), + 'realPeak' => memory_get_peak_usage(true), + ]; + } + + /** + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception + * @throws Exception + */ + private function processWaitingJobs(): void + { + foreach ($this->findOldestUnfinishedJobIdsByGroup(BackgroundJob::STATE_WAITING) as $id) { + $_entity = $this->getEntity($id); + + $_entity->setState(BackgroundJob::STATE_READY); + $this->save($_entity); + $this->publishToBroker($_entity); + } + } + + private function createQueryBuilder(): QueryBuilder + { + return $this->connection->createQueryBuilder() + ->select('*') + ->from($this->config['tableName']) + ->andWhere('queue LIKE :queue') + ->setParameter('queue', $this->config['queue'] . '%'); + } + + /** + * @throws Exception|\Doctrine\DBAL\Exception + */ + private function fetchAll(QueryBuilder $qb, ?int $maxResults = null, $toEntity = true): array + { + $sql = $qb->setMaxResults($maxResults)->getSQL(); + $parameters = $qb->getParameters(); + foreach ($parameters as $key => &$_value) { + if (is_array($_value)) { + $sql = str_replace(':' . $key, self::bindParamArray($key, $_value, $parameters), $sql); + } elseif ($_value instanceof DateTimeInterface) { + $_value->format('Y-m-d H:i:s'); + } + } + + $entities = []; + foreach ($this->fetchAllAssociative($sql, $parameters) as $_row) { + $entities[] = $toEntity ? BackgroundJob::createEntity($_row) : $_row; + } + + return $entities; + } + /** * @throws SchemaException * @throws \Doctrine\DBAL\Exception @@ -661,231 +891,78 @@ private static function bindParamArray(string $prefix, array $values, array &$bi } /** + * Bezpečně ověříme, že nedošlo ke ztrátě spojení k DB. + * Pokud ano, připojíme se znovu. * @throws \Doctrine\DBAL\Exception - * @throws SchemaException* - * @throws Exception - * @internal */ - public function updateSchema(): void + private function databaseConnectionCheckAndReconnect(): void { - $schema = new Schema([], [], $this->createSchemaManager()->createSchemaConfig()); - - $table = $schema->createTable($this->config['tableName']); - - $table->addColumn('id', Types::BIGINT, ['unsigned' => true])->setAutoincrement(true)->setNotnull(true); - $table->addColumn('queue', Types::STRING, ['length' => 255])->setNotnull(true); - $table->addColumn('priority', Types::INTEGER)->setNotnull(false); - $table->addColumn('callback_name', Types::STRING, ['length' => 255])->setNotnull(true); - $table->addColumn('parameters', Types::BLOB)->setNotnull(false); - $table->addColumn('parameters_json', Types::JSON)->setNotnull(false); - $table->addColumn('state', Types::SMALLINT)->setNotnull(true); - $table->addColumn('created_at', Types::DATETIME_IMMUTABLE)->setNotnull(true); - $table->addColumn('last_attempt_at', Types::DATETIME_IMMUTABLE)->setNotnull(false); - $table->addColumn('number_of_attempts', Types::INTEGER)->setNotnull(true)->setDefault(0); - $table->addColumn('error_message', Types::TEXT)->setNotnull(false); - $table->addColumn('serial_group', Types::STRING, ['length' => 255])->setNotnull(false); - $table->addColumn('identifier', Types::STRING, ['length' => 255])->setNotnull(false); - $table->addColumn('is_unique', Types::BOOLEAN)->setNotnull(true)->setDefault(0); - $table->addColumn('postponed_by', Types::INTEGER)->setNotnull(false); - $table->addColumn('processed_by_broker', Types::BOOLEAN)->setNotnull(true)->setDefault(0); - $table->addColumn('execution_time', Types::INTEGER)->setNotnull(false); - $table->addColumn('finished_at', Types::DATETIME_IMMUTABLE)->setNotnull(false); - $table->addColumn('pid', Types::INTEGER)->setNotnull(false); - $table->addColumn('metadata', Types::JSON)->setNotnull(false); - $table->addColumn('memory', Types::JSON)->setNotnull(false); + $warningHandler = function($errno, $errstr) { + $this->logger->log('critical', new Exception('BackgroundQueue - database connection lost (warning): code(' . $errno . ') ' . $errstr, 0)); + $this->connection->close(); + $this->connection->getNativeConnection(); + }; - $table->setPrimaryKey(['id']); - $table->addIndex(['identifier']); - $table->addIndex(['state']); + set_error_handler($warningHandler, E_WARNING); - $schemaManager = $this->createSchemaManager(); - if ($schemaManager->tablesExist([$this->config['tableName']])) { - $tableDiff = $schemaManager->createComparator()->compareTables($this->createSchemaManager()->introspectTable($this->config['tableName']), $table); - $sqls = $this->connection->getDatabasePlatform()->getAlterTableSQL($tableDiff); - } else { - $sqls = $this->connection->getDatabasePlatform()->getCreateTableSQL($table); - } - foreach ($sqls as $sql) { - $this->executeStatement($sql); + try { + if (!$this->databasePing()) { + $this->connection->close(); + $this->connection->getNativeConnection(); + } + } catch (Exception $e) { + $this->logger->log('critical', new Exception('BackgroundQueue - database connection lost (exception): ' . $e->getMessage(), 0, $e)); + $this->connection->close(); + $this->connection->getNativeConnection(); + } finally { + restore_error_handler(); } } /** - * @throws \Doctrine\DBAL\Exception - */ - private function createSchemaManager(): AbstractSchemaManager - { - return method_exists($this->connection, 'createSchemaManager') - ? $this->connection->createSchemaManager() - : $this->connection->getSchemaManager(); - } - - /** - * @throws \Doctrine\DBAL\Exception - */ - private function executeStatement(string $sql): void - { - method_exists($this->connection, 'executeStatement') - ? $this->connection->executeStatement($sql) - : $this->connection->exec($sql); - } - - /** - * @throws \Doctrine\DBAL\Exception + * @throws Exception */ - private function fetchAllAssociative(string $sql, array $parameters): array - { - return method_exists($this->connection, 'fetchAllAssociative') - ? $this->connection->fetchAllAssociative($sql, $parameters) - : $this->connection->fetchAll($sql, $parameters); - } - - private static function getPostponement(int $numberOfAttempts): int - { - return min(16, 2 ** ($numberOfAttempts -1)) * 1000 * 60; - } - - public static function parseDsn($dsn): array + private function databasePing(): bool { - // Parse the DSN string - $parsedDsn = parse_url($dsn); - - if ($parsedDsn === false || !isset($parsedDsn['scheme'], $parsedDsn['host'], $parsedDsn['path'])) { - throw new RuntimeException("Invalid DSN: " . $dsn); - } - - // Convert DSN scheme to Doctrine DBAL driver name - $driversMap = [ - 'mysql' => 'pdo_mysql', - 'pgsql' => 'pdo_pgsql', - 'sqlsrv' => 'pdo_sqlsrv', - ]; - - if (!isset($driversMap[$parsedDsn['scheme']])) { - throw new RuntimeException("Unknown DSN scheme: " . $parsedDsn['scheme']); - } - - // Convert DSN components to Doctrine DBAL connection parameters - $dbParams = [ - 'driver' => $driversMap[$parsedDsn['scheme']], - 'user' => $parsedDsn['user'] ?? null, - 'password' => $parsedDsn['pass'] ?? null, - 'host' => $parsedDsn['host'], - 'dbname' => ltrim($parsedDsn['path'], '/'), - ]; - - if (isset($parsedDsn['port'])) { - $dbParams['port'] = $parsedDsn['port']; - } + set_error_handler(function ($severity, $message) { + throw new PDOException($message, $severity); + }); - return $dbParams; - } + try { + $this->connection->executeQuery($this->connection->getDatabasePlatform()->getDummySelectSQL()); + restore_error_handler(); - /** - * @throws SchemaException - * @throws \Doctrine\DBAL\Exception - * @throws Exception - */ - private function getEntity(int $id, string $queue, int $priority): ?BackgroundJob - { - $this->databaseConnectionCheckAndReconnect(); + return true; - $entity = $this->fetch( - $this->createQueryBuilder() - ->andWhere('id = :id') - ->setParameter('id', $id) - ); + } catch (\Doctrine\DBAL\Exception) { + restore_error_handler(); + return false; - if (!$entity) { - if ($this->producer) { - // pokud je to rabbit fungujici v clusteru na vice serverech, - // tak jeste nemusi byt syncnuta master master databaze - - // coz si overime tak, ze v db neexistuje vetsi id - if (!$this->count($this->createQueryBuilder()->select('id')->andWhere('id > :id')->setParameter('id', $id))) { - // pridame znovu do queue - try { - // Zde máme $queue a $priority, ze kterých bylo vytaženo z brokera, tedy dáváme znovu do té stejné fronty a priority - $this->producer->publish((string) $id, $queue, $priority, $this->config['waitingJobExpiration']); - } catch (Exception $e) { - $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $e); - - $entity->setState(BackgroundJob::STATE_BROKER_FAILED); - $this->save($entity); - } - } else { - // zalogovat - $this->logException('Job "' . $id . '" not found.'); - } - } else { - // zalogovat - $this->logException('Job "' . $id . '" not found.'); - } - return null; + } catch (Exception $e) { + restore_error_handler(); + throw $e; } - - return $entity; } /** - * @throws SchemaException * @throws \Doctrine\DBAL\Exception + * @throws JsonException */ - private function checkUnfinishedJobs(BackgroundJob $entity): bool + private function cloneAndPublish(BackgroundJob $entity): void { - if ($previousEntity = $this->getPreviousUnfinishedJob($entity)) { - try { - $entity->setState(BackgroundJob::STATE_WAITING); - $entity->setErrorMessage('Waiting for job ID ' . $previousEntity->getId()); - $entity->setPostponedBy($this->config['waitingJobExpiration']); - $this->save($entity); - $this->publishToBroker($entity); - } catch (Exception $e) { - $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $e); - } - return false; + if (!$this->getUnfinishedJobIdentifiers([$entity->getIdentifier()])) { + $this->publish($entity->getCallbackName(), $entity->getParameters(), $entity->getSerialGroup(), $entity->getIdentifier(), $entity->getMode(), $this->config['waitingJobExpiration'], $entity->getPriority()); } - - return true; } - private function getPriority(?int $priority, string $callbackName): int + public function getQueue(?string $queue): string { - if (is_null($priority)) { - $priority = $this->config['callbacks'][$callbackName]['priority'] ?? array_values($this->config['priorities'])[0]; - } + $result = $this->config['queue']; - if (!in_array($priority, $this->config['priorities'])) { - throw new InvalidArgumentException("Priority $priority for callback $callbackName is not in available priorities: " . implode(',' , $this->config['priorities'])); + if ($queue !== null) { + $result .= '_' . $queue; } - return $priority; - } - - private function getCallback($callback): callable - { - return $this->config['callbacks'][$callback]['callback'] ?? $this->config['callbacks'][$callback]; - } - - public function getQueueForEntityIncludeCallback(BackgroundJob $entity): string - { - return $this->config['callbacks'][$entity->getCallbackName()]['queue'] ?? $entity->getQueue(); - } - - private function getMemories(): array - { - return [ - 'notRealActual' => memory_get_usage(), - 'realActual' => memory_get_usage(true), - 'notRealPeak' => memory_get_peak_usage(), - 'realPeak' => memory_get_peak_usage(true), - ]; - } - - public function dieIfNecessary(): void - { - if ($this->shouldDie) { - die(); - } + return $result; } } diff --git a/src/Broker/PhpAmqpLib/Consumer.php b/src/Broker/PhpAmqpLib/Consumer.php index ead4d9a..e81c293 100644 --- a/src/Broker/PhpAmqpLib/Consumer.php +++ b/src/Broker/PhpAmqpLib/Consumer.php @@ -4,18 +4,12 @@ use ADT\BackgroundQueue\BackgroundQueue; use Exception; -use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Message\AMQPMessage; -class Consumer implements \ADT\BackgroundQueue\Broker\Consumer +readonly class Consumer implements \ADT\BackgroundQueue\Broker\Consumer { - private BackgroundQueue $backgroundQueue; - private Manager $manager; - - public function __construct(Manager $manager, BackgroundQueue $backgroundQueue) + public function __construct(private Manager $manager, private BackgroundQueue $backgroundQueue) { - $this->manager = $manager; - $this->backgroundQueue = $backgroundQueue; } /** @@ -56,9 +50,7 @@ public function consume(string $queue, array $priorities): void die(); } - $queuesWithPriority = $msg->getConsumerTag(); - list($queue, $priority) = $this->manager->parseQueueAndPriority($queuesWithPriority); - $this->backgroundQueue->process((int)$msg->getBody(), $queue, $priority); + $this->backgroundQueue->processJob((int)$msg->getBody()); }); } diff --git a/src/Broker/PhpAmqpLib/Manager.php b/src/Broker/PhpAmqpLib/Manager.php index 9a16836..620186e 100644 --- a/src/Broker/PhpAmqpLib/Manager.php +++ b/src/Broker/PhpAmqpLib/Manager.php @@ -37,6 +37,9 @@ private function getConnection(): AMQPStreamConnection return $this->connection; } + /** + * @throws Exception + */ public function closeConnection(bool $hard = false): void { $this->closeChannel($hard); @@ -82,7 +85,7 @@ public function closeChannel(bool $hard = false): void $this->initQos = false; } - public function createExchange(string $exchange) + public function createExchange(string $exchange): void { if (isset($this->initExchanges[$exchange])) { return; @@ -99,7 +102,7 @@ public function createExchange(string $exchange) $this->initExchanges[$exchange] = true; } - public function createQueue(string $queue, ?string $exchange = null, array $additionalArguments = []) + public function createQueue(string $queue, ?string $exchange = null, array $additionalArguments = []): void { if (isset($this->initQueues[$queue])) { return; @@ -126,7 +129,7 @@ public function createQueue(string $queue, ?string $exchange = null, array $addi $this->initQueues[$queue] = true; } - public function setupQos() + public function setupQos(): void { if ($this->initQos) { return; @@ -145,25 +148,4 @@ public function getQueueWithPriority(string $queue, int $priority): string { return $queue . '_' . $priority; } - - public function parseQueueAndPriority(string $queueWithPriority): array - { - $parts = explode('_', $queueWithPriority); - - if (count($parts) === 2) { - return [$parts[0], $parts[1]]; - } elseif (count($parts) > 2) { - $nameParts = []; - while (true) { - $part = array_shift($parts); - if (is_numeric($part)) { - return [implode('_', $nameParts), $part]; - } else { - $nameParts[] = $part; - } - } - } else { - throw new \Exception('Missing priority in queue name.'); - } - } } \ No newline at end of file diff --git a/src/Broker/PhpAmqpLib/Producer.php b/src/Broker/PhpAmqpLib/Producer.php index 4495d11..824f715 100644 --- a/src/Broker/PhpAmqpLib/Producer.php +++ b/src/Broker/PhpAmqpLib/Producer.php @@ -2,22 +2,22 @@ namespace ADT\BackgroundQueue\Broker\PhpAmqpLib; -use ADT\BackgroundQueue\BackgroundQueue; +use Exception; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Message\AMQPMessage; -class Producer implements \ADT\BackgroundQueue\Broker\Producer +readonly class Producer implements \ADT\BackgroundQueue\Broker\Producer { const DIE = 'die'; - private Manager $manager; - - public function __construct( Manager $manager) + public function __construct(private Manager $manager) { - $this->manager = $manager; } + /** + * @throws Exception + */ public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void { $queue = $this->manager->getQueueWithPriority($queue, $priority); @@ -46,6 +46,9 @@ public function publish(string $id, string $queue, int $priority, ?int $expirati } + /** + * @throws Exception + */ public function publishDie(string $queue): void { $this->publish(self::DIE, $queue, Manager::QUEUE_TOP_PRIORITY); diff --git a/src/Console/ClearFinishedCommand.php b/src/Console/ClearFinishedCommand.php index 5101848..ae31326 100644 --- a/src/Console/ClearFinishedCommand.php +++ b/src/Console/ClearFinishedCommand.php @@ -3,62 +3,37 @@ namespace ADT\BackgroundQueue\Console; use ADT\BackgroundQueue\BackgroundQueue; -use ADT\BackgroundQueue\Entity\BackgroundJob; -use DateTime; use Doctrine\DBAL\Exception; -use Doctrine\DBAL\Schema\SchemaException; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Input\InputArgument; -#[AsCommand(name: 'background-queue:clear-finished')] +#[AsCommand(name: 'background-queue:clear-finished', description: 'Delete finished records.')] class ClearFinishedCommand extends Command { - protected static $defaultName = 'background-queue:clear-finished'; - - private BackgroundQueue $backgroundQueue; - - /** - * @throws Exception - */ - public function __construct(BackgroundQueue $backgroundQueue) + public function __construct(private readonly BackgroundQueue $backgroundQueue) { parent::__construct(); - $this->backgroundQueue = $backgroundQueue; } - protected function configure() + protected function configure(): void { - $this->setName('background-queue:clear-finished'); $this->addArgument( "days", InputArgument::OPTIONAL, 'Deletes finished records older than the specified number of days.', 1 ); - $this->setDescription('Delete finished records.'); } /** * @throws Exception - * @throws SchemaException - * @throws \Exception */ protected function executeCommand(InputInterface $input, OutputInterface $output): int { - $qb = $this->backgroundQueue->createQueryBuilder() - ->delete($this->backgroundQueue->getConfig()['tableName']) - ->andWhere('state = :state') - ->setParameter('state', BackgroundJob::STATE_FINISHED); - - if ($input->getArgument("days")) { - $qb->andWhere('created_at <= :ago') - ->setParameter('ago', (new DateTime('midnight'))->modify('-' . $input->getArgument("days") . ' days')->format('Y-m-d H:i:s')); - } - - $qb->executeStatement(); + $this->backgroundQueue->clearFinishedJobs($input->getArgument("days")); - return 0; + return self::SUCCESS; } } diff --git a/src/Console/Command.php b/src/Console/Command.php index 36fcd7c..98d4132 100644 --- a/src/Console/Command.php +++ b/src/Console/Command.php @@ -16,7 +16,7 @@ abstract class Command extends \Symfony\Component\Console\Command\Command abstract protected function executeCommand(InputInterface $input, OutputInterface $output): int; - public function setLocksDir(string $locksDir) + public function setLocksDir(string $locksDir): void { $this->locksDir = $locksDir; } diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index 6f54608..263f57f 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -4,35 +4,33 @@ use ADT\BackgroundQueue\BackgroundQueue; use ADT\BackgroundQueue\Broker\Consumer; +use Exception; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -#[AsCommand(name: 'background-queue:consume')] +#[AsCommand(name: 'background-queue:consume', description: 'Start consumer.')] class ConsumeCommand extends \Symfony\Component\Console\Command\Command { - protected static $defaultName = 'background-queue:consume'; - private Consumer $consumer; - private BackgroundQueue $backgroundQueue; - - public function __construct(Consumer $consumer, BackgroundQueue $backgroundQueue) - { + public function __construct( + private readonly Consumer $consumer, + private readonly BackgroundQueue $backgroundQueue + ) { parent::__construct(); - $this->consumer = $consumer; - $this->backgroundQueue = $backgroundQueue; } - protected function configure() + protected function configure(): void { - $this->setName('background-queue:consume'); - $this->addArgument('queue', InputArgument::REQUIRED); + $this->addArgument('queue', InputArgument::OPTIONAL); $this->addOption('jobs', 'j', InputOption::VALUE_REQUIRED, 'Number of jobs consumed by one consumer in one process', 1); $this->addOption('priorities', 'p', InputOption::VALUE_REQUIRED, 'Priorities for consume (e.g. 10, 20-40, 25-, -20)'); - $this->setDescription('Start consumer.'); } + /** + * @throws Exception + */ protected function execute(InputInterface $input, OutputInterface $output): int { $jobs = $input->getOption('jobs'); @@ -40,17 +38,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int if (!is_numeric($jobs)) { $output->writeln("Option --jobs has to be integer"); - return 1; + return self::FAILURE; } for ($i = 0; $i < (int)$jobs; $i++) { $this->backgroundQueue->dieIfNecessary(); - $this->consumer->consume($input->getArgument('queue'), $priorities); + $this->consumer->consume($this->backgroundQueue->getQueue($input->getArgument('queue')), $priorities); } - return 0; + return self::SUCCESS; } + /** + * @throws Exception + */ private function getPrioritiesListBasedConfig(?string $prioritiesText = null): array { $prioritiesAvailable = $this->backgroundQueue->getConfig()['priorities']; @@ -59,10 +60,10 @@ private function getPrioritiesListBasedConfig(?string $prioritiesText = null): a return $prioritiesAvailable; } - if (strpos($prioritiesText, '-') === false) { + if (!str_contains($prioritiesText, '-')) { $priority = (int)$prioritiesText; if (!in_array($priority, $prioritiesAvailable)) { - throw new \Exception("Priority $priority is not in available priorities [" . implode(',', $prioritiesAvailable) . "]"); + throw new Exception("Priority $priority is not in available priorities [" . implode(',', $prioritiesAvailable) . "]"); } return [$priority]; } @@ -83,7 +84,7 @@ private function getPrioritiesListBasedConfig(?string $prioritiesText = null): a } if (!count($priorities)) { - throw new \Exception("Priority $prioritiesText has not intersections with availables priorities [" . implode(',', $prioritiesAvailable) . "]"); + throw new Exception("Priority $prioritiesText has not intersections with availables priorities [" . implode(',', $prioritiesAvailable) . "]"); } return $priorities; diff --git a/src/Console/ProcessCommand.php b/src/Console/ProcessCommand.php index 53ba37b..82318ad 100644 --- a/src/Console/ProcessCommand.php +++ b/src/Console/ProcessCommand.php @@ -3,77 +3,30 @@ namespace ADT\BackgroundQueue\Console; use ADT\BackgroundQueue\BackgroundQueue; -use ADT\BackgroundQueue\Entity\BackgroundJob; -use DateTime; use Exception; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -#[AsCommand(name: 'background-queue:process')] +#[AsCommand(name: 'background-queue:process', description: 'Processes all records in the READY or TEMPORARILY_FAILED state.')] class ProcessCommand extends Command { - protected static $defaultName = 'background-queue:process'; - - private BackgroundQueue $backgroundQueue; - /** * @throws Exception */ - public function __construct(BackgroundQueue $backgroundQueue) + public function __construct(private readonly BackgroundQueue $backgroundQueue) { parent::__construct(); - $this->backgroundQueue = $backgroundQueue; - } - - protected function configure() - { - $this->setName('background-queue:process'); - $this->setDescription('Processes all records in the READY or TEMPORARILY_FAILED state.'); } /** * @throws Exception + * @throws \Doctrine\DBAL\Exception */ protected function executeCommand(InputInterface $input, OutputInterface $output): int { - $states = BackgroundJob::READY_TO_PROCESS_STATES; - if ($this->backgroundQueue->getConfig()['producer']) { - unset ($states[BackgroundJob::STATE_READY]); - unset ($states[BackgroundJob::STATE_TEMPORARILY_FAILED]); - unset ($states[BackgroundJob::STATE_WAITING]); - - } else { - // Nemáme producera - - unset ($states[BackgroundJob::STATE_BACK_TO_BROKER]); - } - - $qb = $this->backgroundQueue->createQueryBuilder() - ->andWhere('state IN (:state)') - ->setParameter('state', $states); - - /** @var BackgroundJob $_entity */ - foreach ($this->backgroundQueue->fetchAll($qb) as $_entity) { - if ( - $this->backgroundQueue->getConfig()['producer'] - && - $_entity->getState() !== BackgroundJob::STATE_BROKER_FAILED - ) { - $_entity->setState(BackgroundJob::STATE_READY); - $this->backgroundQueue->save($_entity); - $this->backgroundQueue->publishToBroker($_entity); - } else { - if (!$_entity->getProcessedByBroker() && $_entity->getAvailableFrom() > new DateTime()) { - continue; - } - $_entity->setProcessedByBroker(false); - // Chceme použít prioritu na entitě. Může být využito na přeřazení do jiné priority. - // Chceme zařadit do stejné fronty jako původně bylo. Tedy musíme zohlednit co je nastaveno u callbacku. - $this->backgroundQueue->process($_entity, $this->backgroundQueue->getQueueForEntityIncludeCallback($_entity), $_entity->getPriority()); - } - } + $this->backgroundQueue->process(); - return 0; + return self::SUCCESS; } } diff --git a/src/Console/ReloadConsumersCommand.php b/src/Console/ReloadConsumersCommand.php index 6a38715..0eb405d 100644 --- a/src/Console/ReloadConsumersCommand.php +++ b/src/Console/ReloadConsumersCommand.php @@ -2,46 +2,41 @@ namespace ADT\BackgroundQueue\Console; +use ADT\BackgroundQueue\BackgroundQueue; use ADT\BackgroundQueue\Broker\Producer; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -#[AsCommand(name: 'background-queue:reload-consumers')] +#[AsCommand(name: 'background-queue:reload-consumers', description: 'Creates the specified number of noop messages to reload consumers consuming specified queue.')] class ReloadConsumersCommand extends Command { - protected static $defaultName = 'background-queue:reload-consumers'; - - protected Producer $producer; - - public function __construct(Producer $producer) + public function __construct(private readonly BackgroundQueue $backgroundQueue, private readonly Producer $producer) { parent::__construct(); - $this->producer = $producer; } - protected function configure() + protected function configure(): void { - $this->addArgument( - "queue", - InputArgument::REQUIRED, - 'A queue whose consumers are to reload.' - ); $this->addArgument( "number", InputArgument::REQUIRED, 'Number of consumers to reload.' ); - $this->setDescription('Creates the specified number of noop messages to reload consumers consuming specified queue.'); + $this->addArgument( + "queue", + InputArgument::OPTIONAL, + 'A queue whose consumers are to reload.' + ); } protected function executeCommand(InputInterface $input, OutputInterface $output): int { for ($i = 0; $i < $input->getArgument("number"); $i++) { - $this->producer->publishDie($input->getArgument("queue")); + $this->producer->publishDie($this->backgroundQueue->getQueue($input->getArgument("queue"))); } - return 0; + return self::SUCCESS; } } diff --git a/src/Console/UpdateSchemaCommand.php b/src/Console/UpdateSchemaCommand.php index 9bb055b..8a65613 100755 --- a/src/Console/UpdateSchemaCommand.php +++ b/src/Console/UpdateSchemaCommand.php @@ -7,30 +7,14 @@ use Exception; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -#[AsCommand(name: 'background-queue:update-schema')] +#[AsCommand(name: 'background-queue:update-schema', description: 'Update table schema if needed.')] class UpdateSchemaCommand extends Command { - protected static $defaultName = 'background-queue:update-schema'; - - private BackgroundQueue $backgroundQueue; - - /** - * @throws Exception - */ - public function __construct(BackgroundQueue $backgroundQueue) + public function __construct(private readonly BackgroundQueue $backgroundQueue) { parent::__construct(); - $this->backgroundQueue = $backgroundQueue; - } - - protected function configure() - { - $this->setName('background-queue:update-schema'); - $this->setDescription('Update table schema if needed.'); - $this->addOption('force', 'f', InputOption::VALUE_NONE, 'Force schema update'); } /** @@ -40,8 +24,8 @@ protected function configure() */ protected function executeCommand(InputInterface $input, OutputInterface $output): int { - $this->backgroundQueue->updateSchema($input->getOption('force')); + $this->backgroundQueue->updateSchema(); - return 0; + return self::SUCCESS; } } diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index 1599072..d9a6ed2 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -2,10 +2,13 @@ namespace ADT\BackgroundQueue\Entity; +use ADT\BackgroundQueue\Entity\Enums\ModeEnum; use ADT\Utils\Utils; use DateTime; use DateTimeImmutable; use Exception; +use Nette\Utils\Json; +use Nette\Utils\JsonException; use ReflectionClass; final class BackgroundJob @@ -33,19 +36,12 @@ final class BackgroundJob self::STATE_REDUNDANT => self::STATE_REDUNDANT, ]; - const PARAMETERS_FORMAT_SERIALIZE = 'serialize'; - const PARAMETERS_FORMAT_JSON = 'json'; - const PARAMETERS_FORMATS = [ - self::PARAMETERS_FORMAT_SERIALIZE, - self::PARAMETERS_FORMAT_JSON, - ]; - private ?int $id = null; private string $queue; private ?int $priority; private string $callbackName; - private $parameters; /** @see self::setParameters() */ - private $parameters_json = null; /** @see self::setParameters() */ + private ?string $parameters = null; /** @see self::setParameters() */ + private ?string $parametersJson = null; /** @see self::setParameters() */ private int $state = self::STATE_READY; private DateTimeImmutable $createdAt; private ?DateTimeImmutable $lastAttemptAt = null; @@ -53,7 +49,6 @@ final class BackgroundJob private ?string $errorMessage = null; private ?string $serialGroup = null; private ?string $identifier = null; - private bool $isUnique = false; private ?int $postponedBy = null; private bool $processedByBroker = false; private ?int $executionTime = null; @@ -61,10 +56,13 @@ final class BackgroundJob private ?int $pid = null; // PID supervisor consumera uvintř docker kontejneru private ?string $metadata = null; // ukládá ve formátu JSON private ?string $memory = null; // ukládá ve formátu JSON + private ModeEnum $mode = ModeEnum::NORMAL; + private DateTimeImmutable $updatedAt; public function __construct() { $this->createdAt = new DateTimeImmutable(); + $this->updatedAt = new DateTimeImmutable(); } public function __clone() @@ -128,63 +126,39 @@ public function setSerialGroup(?string $serialGroup): self } /** - * Při získávání parametrů detekujeme automaticky formát uložení. - * Tedy při přechodu z self::PARAMETERS_FORMAT_SERIALIZE na self::PARAMETERS_FORMAT_JSON nedojde k výpadku. - * Také je možné v případě ladění chyb data v DB ručně upravit ze serializovaného pole do json formátu, i když pro ukládání používáme self::PARAMETERS_FORMAT_SERIALIZE * @return array + * @throws JsonException */ public function getParameters(): array { + if ($this->parametersJson) { + return array_map(function ($value) { + return Utils::getDateTimeFromArray($value, true); + }, Json::decode($this->parametersJson, forceArrays: true)); + } + $this->parameters = is_resource($this->parameters) ? stream_get_contents($this->parameters) : $this->parameters; if (!is_null($this->parameters)) { return unserialize($this->parameters); } - $parametersJson = json_decode($this->parameters_json, true); - $parameters = []; - foreach ($parametersJson as $key => $value) { - $parameters[$key] = Utils::getDateTimeFromArray($value, true); - } - return $parameters; + return []; } /** - * Parametry ukládá jako serializované pole nebo jako json. - * Formát určuje parametr v BackgroundQueue `parametersFormat`. - * - `serialize` => ukládá jako serializované pole a je bez omezení - * - `json` => ukládá jako json - * - parametry mohou obsahovat pouze skalární typy, pole, NULL a \DateTimeInterface - * - pokud je nějaký z parametrů objekt, automaticky se použije "serialize" - * - * @param object|array|string|int|float|bool|null $parameters - * @param string $parametersFormat - * @throws Exception + * @throws JsonException */ - public function setParameters($parameters, string $parametersFormat): self + public function setParameters(?array $parameters): self { - $parameters = is_array($parameters) ? $parameters : [$parameters]; - - if ($parametersFormat == self::PARAMETERS_FORMAT_JSON) { - foreach ($parameters as $parameter) { - if (!is_scalar($parameter) && !is_array($parameter) && !is_null($parameter) && !($parameter instanceof \DateTimeInterface)) { - $parametersFormat = self::PARAMETERS_FORMAT_SERIALIZE; - break; - } - } + if (!$parameters) { + return $this; } - switch ($parametersFormat) { - case self::PARAMETERS_FORMAT_SERIALIZE: - $this->parameters = serialize($parameters); - $this->parameters_json = null; - break; - case self::PARAMETERS_FORMAT_JSON: - $this->parameters = null; - $this->parameters_json = json_encode($parameters); - break; - default: - throw new Exception("Unsupported parameters format: $parametersFormat"); + if ($this->isJsonable($parameters)) { + $this->parametersJson = Json::encode($parameters); + } else { + $this->parameters = serialize($parameters); } return $this; @@ -249,17 +223,6 @@ public function setIdentifier(?string $identifier): self return $this; } - public function isUnique(): bool - { - return $this->isUnique; - } - - public function setIsUnique(bool $isUnique): self - { - $this->isUnique = $isUnique; - return $this; - } - public function getPostponedBy(): ?int { return $this->postponedBy; @@ -342,7 +305,7 @@ public static function createEntity(array $values): self $entity->priority = $values['priority']; $entity->callbackName = $values['callback_name']; $entity->parameters = $values['parameters']; - $entity->parameters_json = $values['parameters_json']; + $entity->parametersJson = $values['parameters_json']; $entity->state = $values['state']; $entity->createdAt = new DateTimeImmutable($values['created_at']); $entity->lastAttemptAt = $values['last_attempt_at'] ? new DateTimeImmutable($values['last_attempt_at']) : null; @@ -350,7 +313,7 @@ public static function createEntity(array $values): self $entity->errorMessage = $values['error_message']; $entity->serialGroup = $values['serial_group']; $entity->identifier = $values['identifier']; - $entity->isUnique = $values['is_unique']; + $entity->mode = ModeEnum::from($values['mode']); $entity->postponedBy = $values['postponed_by']; $entity->processedByBroker = $values['processed_by_broker']; $entity->executionTime = $values['execution_time']; @@ -358,6 +321,7 @@ public static function createEntity(array $values): self $entity->pid = $values['pid']; $entity->metadata = $values['metadata']; $entity->memory = $values['memory']; + $entity->updatedAt = new DateTimeImmutable($values['updated_at']); return $entity; } @@ -369,22 +333,23 @@ public function getDatabaseValues(): array 'priority' => $this->priority, 'callback_name' => $this->callbackName, 'parameters' => $this->parameters, - 'parameters_json' => $this->parameters_json, + 'parameters_json' => $this->parametersJson, 'state' => $this->state, 'created_at' => $this->createdAt->format('Y-m-d H:i:s'), - 'last_attempt_at' => $this->lastAttemptAt ? $this->lastAttemptAt->format('Y-m-d H:i:s') : null, + 'last_attempt_at' => $this->lastAttemptAt?->format('Y-m-d H:i:s'), 'number_of_attempts' => $this->numberOfAttempts, 'error_message' => $this->errorMessage, 'serial_group' => $this->serialGroup, 'identifier' => $this->identifier, - 'is_unique' => (int) $this->isUnique, + 'mode' => $this->mode->value, 'postponed_by' => $this->postponedBy, 'processed_by_broker' => (int) $this->processedByBroker, 'execution_time' => (int) $this->executionTime, - 'finished_at' => $this->finishedAt ? $this->finishedAt->format('Y-m-d H:i:s') : null, + 'finished_at' => $this->finishedAt?->format('Y-m-d H:i:s'), 'pid' => $this->pid, 'metadata' => $this->metadata, 'memory' => $this->memory, + 'updated_at' => $this->updatedAt->format('Y-m-d H:i:s') ]; } @@ -406,4 +371,49 @@ public function setExecutionTime(?int $executionTime): self $this->executionTime = $executionTime; return $this; } + + public function getMode(): ModeEnum + { + return $this->mode; + } + + public function setMode(ModeEnum $mode): void + { + $this->mode = $mode; + } + + public function isModeUnique(): bool + { + return $this->mode === ModeEnum::UNIQUE; + } + + public function isModeRecurring(): bool + { + return $this->mode === ModeEnum::RECURRING; + } + + private function isJsonable(array $value): bool + { + foreach ($value as $item) { + if (is_array($item)) { + if (!$this->isJsonable($item)) { + return false; + } + } elseif (is_object($item) && !$item instanceof \DateTimeInterface) { + return false; + } + } + + return true; + } + + public function getUpdatedAt(): DateTimeImmutable + { + return $this->updatedAt; + } + + public function setUpdatedAt(DateTimeImmutable $updatedAt): void + { + $this->updatedAt = $updatedAt; + } } diff --git a/src/Entity/Enums/CallbackNameEnum.php b/src/Entity/Enums/CallbackNameEnum.php new file mode 100644 index 0000000..4dd219b --- /dev/null +++ b/src/Entity/Enums/CallbackNameEnum.php @@ -0,0 +1,8 @@ +transactionNestingLevel--; } + /** + * @throws Exception + * @throws \Doctrine\DBAL\Driver\Exception + */ public function commit(): void { parent::commit(); diff --git a/tests/Integration/BackgroundQueueTest.php b/tests/Integration/BackgroundQueueTest.php index a84677b..ddc459a 100644 --- a/tests/Integration/BackgroundQueueTest.php +++ b/tests/Integration/BackgroundQueueTest.php @@ -203,7 +203,7 @@ public function testProcess(string $callback, int $expectedState) /** @var BackgroundJob[] $backgroundJobs */ $backgroundJobs = $backgroundQueue->fetchAll($backgroundQueue->createQueryBuilder()); - $backgroundQueue->process($backgroundJobs[0]); + $backgroundQueue->processJob($backgroundJobs[0]->getId()); $this->tester->assertEquals($expectedState, $backgroundJobs[0]->getState()); }