Skip to content

Commit c09b678

Browse files
committed
Do not submit cancelled tasks
Fixes #198.
1 parent 5aeaad2 commit c09b678

File tree

2 files changed

+38
-0
lines changed

2 files changed

+38
-0
lines changed

src/Worker/Internal/ContextWorker.php

+20
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
namespace Amp\Parallel\Worker\Internal;
44

5+
use Amp\ByteStream\ReadableBuffer;
6+
use Amp\ByteStream\StreamChannel;
7+
use Amp\ByteStream\WritableBuffer;
58
use Amp\Cancellation;
69
use Amp\CancelledException;
710
use Amp\DeferredFuture;
@@ -139,6 +142,10 @@ public function submit(Task $task, ?Cancellation $cancellation = null): Executio
139142
throw new StatusError("The worker has been shut down");
140143
}
141144

145+
if ($cancellation?->isRequested()) {
146+
return self::createCancelledExecution($task, $cancellation);
147+
}
148+
142149
$receive = empty($this->jobQueue);
143150
$submission = new Internal\TaskSubmission($task);
144151
$jobId = $submission->getId();
@@ -220,4 +227,17 @@ public function kill(): void
220227
$this->exitStatus ??= Future::error(new WorkerException("The worker was killed"));
221228
$this->exitStatus->ignore();
222229
}
230+
231+
private static function createCancelledExecution(Task $task, Cancellation $cancellation): Execution
232+
{
233+
$channel = new StreamChannel(new ReadableBuffer(), new WritableBuffer());
234+
$channel->close();
235+
236+
try {
237+
$cancellation->throwIfRequested();
238+
throw new \Error('Expected cancellation to have been requested');
239+
} catch (CancelledException $exception) {
240+
return new Execution($task, $channel, Future::error($exception));
241+
}
242+
}
223243
}

test/Worker/AbstractWorkerTest.php

+18
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
namespace Amp\Parallel\Test\Worker;
44

55
use Amp\Cancellation;
6+
use Amp\CancelledException;
7+
use Amp\DeferredCancellation;
68
use Amp\Future;
79
use Amp\Parallel\Context\ContextFactory;
810
use Amp\Parallel\Context\StatusError;
@@ -337,6 +339,22 @@ public function testSubmitAfterCancelledTask(): void
337339
$worker->shutdown();
338340
}
339341

342+
public function testCancelBeforeSubmit(): void
343+
{
344+
$this->expectException(CancelledException::class);
345+
346+
$worker = $this->createWorker();
347+
348+
$deferredCancellation = new DeferredCancellation();
349+
$deferredCancellation->cancel();
350+
351+
try {
352+
$worker->submit(new Fixtures\CancellingTask, $deferredCancellation->getCancellation())->await();
353+
} finally {
354+
$worker->shutdown();
355+
}
356+
}
357+
340358
public function testCancellingCompletedTask(): void
341359
{
342360
$worker = $this->createWorker();

0 commit comments

Comments
 (0)