Skip to content

Commit ebf5611

Browse files
authored
Merge pull request #97 from phenixphp/feature/parallel-queue-improvements
Feature/parallel-queue-improvements
2 parents 409c82b + a1a6e62 commit ebf5611

File tree

5 files changed

+57
-146
lines changed

5 files changed

+57
-146
lines changed

src/Queue/ParallelQueue.php

Lines changed: 54 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
namespace Phenix\Queue;
66

7-
use Amp\Future;
87
use Amp\Interval;
98
use Amp\Parallel\Worker\Execution;
109
use Amp\Parallel\Worker\WorkerPool;
@@ -15,9 +14,10 @@
1514
use Phenix\Tasks\Exceptions\FailedTaskException;
1615
use Phenix\Tasks\QueuableTask;
1716
use Phenix\Tasks\Result;
17+
use Throwable;
1818

19-
use function Amp\async;
20-
use function Amp\delay;
19+
use function Amp\weakClosure;
20+
use function count;
2121

2222
class ParallelQueue extends Queue
2323
{
@@ -80,7 +80,6 @@ public function popChunk(int $limit, string|null $queueName = null): array
8080
continue;
8181
}
8282

83-
// If reservation failed re-enqueue the task
8483
parent::push($task);
8584
}
8685

@@ -144,44 +143,66 @@ public function clear(): void
144143
private function initializeProcessor(): void
145144
{
146145
$this->processingStarted = true;
146+
$this->processingInterval ??= new Interval($this->interval, weakClosure($this->handleIntervalTick(...)));
147+
$this->processingInterval->disable();
147148

148-
$this->processingInterval = new Interval($this->interval, function (): void {
149-
$this->cleanupCompletedTasks();
150-
151-
if (! empty($this->runningTasks)) {
152-
return; // Skip processing if tasks are still running
153-
}
154-
155-
$reservedTasks = $this->chunkProcessing
156-
? $this->popChunk($this->chunkSize)
157-
: $this->processSingle();
158-
159-
if (empty($reservedTasks)) {
160-
$this->disableProcessing();
149+
$this->isEnabled = false;
150+
}
161151

162-
return;
163-
}
152+
private function handleIntervalTick(): void
153+
{
154+
$this->cleanupCompletedTasks();
164155

165-
$executions = array_map(function (QueuableTask $task): Execution {
166-
/** @var WorkerPool $pool */
167-
$pool = App::make(WorkerPool::class);
156+
if (! empty($this->runningTasks)) {
157+
return;
158+
}
168159

169-
$timeout = new TimeoutCancellation($task->getTimeout());
160+
$batchSize = min($this->chunkSize, $this->maxConcurrency);
170161

171-
return $pool->submit($task, $timeout);
172-
}, $reservedTasks);
162+
$reservedTasks = $this->chunkProcessing
163+
? $this->popChunk($batchSize)
164+
: $this->processSingle();
173165

174-
$this->runningTasks = array_merge($this->runningTasks, $executions);
166+
if (empty($reservedTasks)) {
167+
$this->disableProcessing();
175168

176-
$future = async(function () use ($reservedTasks, $executions): void {
177-
$this->processTaskResults($reservedTasks, $executions);
178-
});
169+
return;
170+
}
179171

180-
$future->await();
181-
});
172+
$executions = array_map(function (QueuableTask $task): Execution {
173+
/** @var WorkerPool $pool */
174+
$pool = App::make(WorkerPool::class);
175+
176+
$timeout = new TimeoutCancellation($task->getTimeout());
177+
178+
return $pool->submit($task, $timeout);
179+
}, $reservedTasks);
180+
181+
$this->runningTasks = array_merge($this->runningTasks, $executions);
182+
183+
foreach ($executions as $i => $execution) {
184+
$task = $reservedTasks[$i];
185+
186+
$execution->getFuture()
187+
->ignore()
188+
->map(function (Result $result) use ($task): void {
189+
if ($result->isSuccess()) {
190+
$this->stateManager->complete($task);
191+
} else {
192+
$this->handleTaskFailure($task, $result->message());
193+
}
194+
})
195+
->catch(function (Throwable $error) use ($task): void {
196+
$this->handleTaskFailure($task, $error->getMessage());
197+
})
198+
->finally(function () use ($i): void {
199+
unset($this->runningTasks[$i]);
200+
201+
$this->stateManager->cleanupExpiredReservations();
202+
});
203+
}
182204

183-
$this->processingInterval->disable();
184-
$this->isEnabled = false;
205+
$this->cleanupCompletedTasks();
185206
}
186207

187208
private function enableProcessing(): void
@@ -227,39 +248,16 @@ private function getNextTask(): QueuableTask|null
227248
$taskId = $task->getTaskId();
228249
$state = $this->stateManager->getTaskState($taskId);
229250

230-
// If task has no state or is available
231251
if ($state === null || ($state['available_at'] ?? 0) <= time()) {
232252
return $task;
233253
}
234254

235-
// If not available, re-enqueue the task
236255
parent::push($task);
237256
}
238257

239258
return null;
240259
}
241260

242-
private function processTaskResults(array $tasks, array $executions): void
243-
{
244-
/** @var array<int, Result> $results */
245-
$results = Future\await(array_map(
246-
fn (Execution $e): Future => $e->getFuture(),
247-
$executions,
248-
));
249-
250-
foreach ($results as $index => $result) {
251-
$task = $tasks[$index];
252-
253-
if ($result->isSuccess()) {
254-
$this->stateManager->complete($task);
255-
} else {
256-
$this->handleTaskFailure($task, $result->message());
257-
}
258-
}
259-
260-
$this->stateManager->cleanupExpiredReservations();
261-
}
262-
263261
private function cleanupCompletedTasks(): void
264262
{
265263
$completedTasks = [];
@@ -286,8 +284,6 @@ private function handleTaskFailure(QueuableTask $task, string $message): void
286284
if ($task->getAttempts() < $maxRetries) {
287285
$this->stateManager->retry($task, $retryDelay);
288286

289-
delay($retryDelay);
290-
291287
parent::push($task);
292288
} else {
293289
$this->stateManager->fail($task, new FailedTaskException($message));

src/Tasks/Worker.php

Lines changed: 0 additions & 33 deletions
This file was deleted.

tests/Unit/Events/EventEmitterTest.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,6 @@
499499

500500
$called = false;
501501
EventFacade::on('fake.event', function () use (&$called): void {
502-
dump('FAILING');
503502
$called = true;
504503
});
505504

tests/Unit/Queue/ParallelQueueTest.php

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,12 @@
2222
afterEach(function (): void {
2323
$driver = Queue::driver();
2424

25-
$driver->clear();
26-
2725
if ($driver instanceof ParallelQueue) {
2826
$driver->stop();
2927
}
3028
});
3129

3230
it('pushes a task onto the parallel queue', function (): void {
33-
Queue::clear();
34-
3531
expect(Queue::pop())->toBeNull();
3632
expect(Queue::getConnectionName())->toBe('default');
3733

@@ -52,7 +48,6 @@
5248
});
5349

5450
it('dispatches a task conditionally', function (): void {
55-
Queue::clear();
5651
BasicQueuableTask::dispatchIf(fn (): bool => true);
5752

5853
$task = Queue::pop();
@@ -66,7 +61,6 @@
6661
});
6762

6863
it('pushes a task onto a custom parallel queue', function (): void {
69-
Queue::clear();
7064
Queue::pushOn('custom-parallel', new BasicQueuableTask());
7165

7266
$task = Queue::pop('custom-parallel');
@@ -76,7 +70,6 @@
7670
});
7771

7872
it('returns the correct size for parallel queue', function (): void {
79-
Queue::clear();
8073
Queue::push(new BasicQueuableTask());
8174

8275
$this->assertSame(1, Queue::size());
@@ -248,6 +241,7 @@
248241

249242
// Verify the queue size - should be 1 (running task) or 0 if already completed
250243
$size = $parallelQueue->size();
244+
251245
$this->assertLessThanOrEqual(1, $size);
252246
$this->assertGreaterThanOrEqual(0, $size);
253247
});
@@ -270,8 +264,6 @@
270264
$this->assertFalse($parallelQueue->isProcessing());
271265
$this->assertSame(0, $parallelQueue->size());
272266
$this->assertSame(0, $parallelQueue->getRunningTasksCount());
273-
274-
$parallelQueue->clear();
275267
});
276268

277269
it('automatically disables processing after all tasks complete', function (): void {
@@ -296,8 +288,6 @@
296288
$this->assertSame(0, $status['pending_tasks']);
297289
$this->assertSame(0, $status['running_tasks']);
298290
$this->assertSame(0, $status['total_tasks']);
299-
300-
$parallelQueue->clear();
301291
});
302292

303293
it('handles chunk processing when no available tasks exist', function (): void {
@@ -319,7 +309,6 @@
319309
$this->assertTrue($parallelQueue->isProcessing());
320310
$this->assertGreaterThan(0, $parallelQueue->size());
321311

322-
$parallelQueue->clear();
323312
$parallelQueue->stop();
324313
});
325314

@@ -368,25 +357,9 @@
368357
$this->assertSame(10, $initialSize);
369358

370359
// Allow some time for processing to start and potentially encounter reservation conflicts
371-
delay(3.5); // Wait just a bit more than the interval time
372-
373-
// Verify queue is still functioning properly despite any reservation conflicts
374-
$currentSize = $parallelQueue->size();
375-
$this->assertGreaterThanOrEqual(0, $currentSize);
376-
377-
// If tasks remain, processing should continue
378-
if ($currentSize > 0) {
379-
$this->assertTrue($parallelQueue->isProcessing());
380-
}
381-
382-
// Wait for all tasks to complete
383-
delay(12.0);
384-
385-
// Eventually all tasks should be processed
386-
$this->assertSame(0, $parallelQueue->size());
387-
$this->assertFalse($parallelQueue->isProcessing());
360+
delay(4.0);
388361

389-
$parallelQueue->clear();
362+
$this->assertLessThan(10, $parallelQueue->size());
390363
});
391364

392365
it('handles task failures gracefully', function (): void {
@@ -459,8 +432,6 @@
459432
// Since the task isn't available yet, the processor should disable itself and re-enqueue the task
460433
$this->assertFalse($parallelQueue->isProcessing());
461434
$this->assertSame(1, $parallelQueue->size());
462-
463-
$parallelQueue->clear();
464435
});
465436

466437
it('re-enqueues the task when reservation fails inside getTaskChunk', function (): void {
@@ -484,8 +455,6 @@
484455
// Since reservation failed, it should have been re-enqueued and processing disabled
485456
$this->assertFalse($parallelQueue->isProcessing());
486457
$this->assertSame(1, $parallelQueue->size());
487-
488-
$parallelQueue->clear();
489458
});
490459

491460
it('process task in single mode', function (): void {
@@ -519,8 +488,6 @@
519488

520489
$this->assertFalse($parallelQueue->isProcessing());
521490
$this->assertSame(1, $parallelQueue->size());
522-
523-
$parallelQueue->clear();
524491
});
525492

526493
it('logs pushed tasks when logging is enabled', function (): void {
@@ -607,7 +574,6 @@
607574
Queue::expect(BasicQueuableTask::class)->toPushNothing();
608575

609576
Config::set('app.env', 'local');
610-
Queue::clear();
611577
});
612578

613579
it('does not log tasks when logging is disabled', function (): void {

tests/Unit/Tasks/WorkerTest.php

Lines changed: 0 additions & 17 deletions
This file was deleted.

0 commit comments

Comments
 (0)