Skip to content

Commit 7f8ca54

Browse files
committed
Add DelegatingWorkerPool
1 parent b4f3978 commit 7f8ca54

8 files changed

+200
-67
lines changed

src/Worker/DelegatingWorkerPool.php

+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace Amp\Parallel\Worker;
4+
5+
use Amp\Cancellation;
6+
use Amp\DeferredFuture;
7+
use Amp\Parallel\Worker\Internal\PooledWorker;
8+
9+
final class DelegatingWorkerPool implements WorkerPool
10+
{
11+
/** @var array<int, Worker> */
12+
private array $workerStorage = [];
13+
14+
private int $pendingWorkerCount = 0;
15+
16+
/** @var \SplQueue<DeferredFuture<Worker|null>> */
17+
private readonly \SplQueue $waiting;
18+
19+
/**
20+
* @param int $limit Maximum number of workers to use from the delegate pool.
21+
*/
22+
public function __construct(private readonly WorkerPool $pool, private readonly int $limit)
23+
{
24+
$this->waiting = new \SplQueue();
25+
}
26+
27+
public function isRunning(): bool
28+
{
29+
return $this->pool->isRunning();
30+
}
31+
32+
public function isIdle(): bool
33+
{
34+
return $this->pool->isIdle();
35+
}
36+
37+
public function submit(Task $task, ?Cancellation $cancellation = null): Execution
38+
{
39+
$worker = $this->selectWorker();
40+
41+
$execution = $worker->submit($task, $cancellation);
42+
43+
$execution->getFuture()->finally(fn () => $this->push($worker))->ignore();
44+
45+
return $execution;
46+
}
47+
48+
private function selectWorker(): Worker
49+
{
50+
do {
51+
if (\count($this->workerStorage) + $this->pendingWorkerCount < $this->limit) {
52+
$this->pendingWorkerCount++;
53+
54+
try {
55+
$worker = $this->pool->getWorker();
56+
} finally {
57+
$this->pendingWorkerCount--;
58+
}
59+
} else {
60+
/** @var DeferredFuture<Worker|null> $waiting */
61+
$waiting = new DeferredFuture();
62+
$this->waiting->push($waiting);
63+
64+
$worker = $waiting->getFuture()->await();
65+
if (!$worker?->isRunning()) {
66+
continue;
67+
}
68+
}
69+
70+
$this->workerStorage[\spl_object_id($worker)] = $worker;
71+
72+
return $worker;
73+
} while (true);
74+
}
75+
76+
private function push(Worker $worker): void
77+
{
78+
unset($this->workerStorage[\spl_object_id($worker)]);
79+
80+
if (!$this->waiting->isEmpty()) {
81+
$deferredFuture = $this->waiting->dequeue();
82+
$deferredFuture->complete($worker->isRunning() ? $worker : null);
83+
}
84+
}
85+
86+
public function shutdown(): void
87+
{
88+
if (!$this->waiting->isEmpty()) {
89+
$exception = new WorkerException('The pool was shutdown before a worker could be obtained');
90+
$this->clearWaiting($exception);
91+
}
92+
93+
$this->pool->shutdown();
94+
}
95+
96+
public function kill(): void
97+
{
98+
if (!$this->waiting->isEmpty()) {
99+
$exception = new WorkerException('The pool was killed before a worker could be obtained');
100+
$this->clearWaiting($exception);
101+
}
102+
103+
$this->pool->kill();
104+
}
105+
106+
private function clearWaiting(\Throwable $exception): void
107+
{
108+
while (!$this->waiting->isEmpty()) {
109+
$deferredFuture = $this->waiting->dequeue();
110+
$deferredFuture->error($exception);
111+
}
112+
}
113+
114+
public function getWorker(): Worker
115+
{
116+
return new PooledWorker($this->selectWorker(), $this->push(...));
117+
}
118+
119+
public function getWorkerCount(): int
120+
{
121+
return \min($this->limit, $this->pool->getWorkerCount());
122+
}
123+
124+
public function getIdleWorkerCount(): int
125+
{
126+
return \min($this->limit, $this->pool->getIdleWorkerCount());
127+
}
128+
}

test/Worker/AbstractPoolTest.php

+3-12
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
use Amp\Future;
66
use Amp\Parallel\Context\StatusError;
77
use Amp\Parallel\Test\Worker\Fixtures\TestTask;
8-
use Amp\Parallel\Worker\ContextWorkerFactory;
9-
use Amp\Parallel\Worker\ContextWorkerPool;
108
use Amp\Parallel\Worker\Execution;
119
use Amp\Parallel\Worker\Task;
1210
use Amp\Parallel\Worker\Worker;
@@ -180,15 +178,8 @@ protected function createWorker(?string $autoloadPath = null): Worker
180178
return $this->createPool(autoloadPath: $autoloadPath);
181179
}
182180

183-
protected function createPool(
181+
abstract protected function createPool(
184182
int $max = WorkerPool::DEFAULT_WORKER_LIMIT,
185-
?string $autoloadPath = null
186-
): WorkerPool {
187-
$factory = new ContextWorkerFactory(
188-
bootstrapPath: $autoloadPath,
189-
contextFactory: $this->createContextFactory(),
190-
);
191-
192-
return new ContextWorkerPool($max, $factory);
193-
}
183+
?string $autoloadPath = null,
184+
): WorkerPool;
194185
}

test/Worker/AbstractWorkerTest.php

+1-13
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@
66
use Amp\CancelledException;
77
use Amp\DeferredCancellation;
88
use Amp\Future;
9-
use Amp\Parallel\Context\ContextFactory;
109
use Amp\Parallel\Context\StatusError;
1110
use Amp\Parallel\Test\Worker\Fixtures\CommunicatingTask;
12-
use Amp\Parallel\Worker\ContextWorkerFactory;
1311
use Amp\Parallel\Worker\Task;
1412
use Amp\Parallel\Worker\TaskCancelledException;
1513
use Amp\Parallel\Worker\TaskFailureError;
@@ -382,15 +380,5 @@ public function testCommunicatingJob(): void
382380
self::assertSame('out', $execution->await($cancellation));
383381
}
384382

385-
protected function createWorker(?string $autoloadPath = null): Worker
386-
{
387-
$factory = new ContextWorkerFactory(
388-
bootstrapPath: $autoloadPath,
389-
contextFactory: $this->createContextFactory(),
390-
);
391-
392-
return $factory->create();
393-
}
394-
395-
abstract protected function createContextFactory(): ContextFactory;
383+
abstract protected function createWorker(?string $autoloadPath = null): Worker;
396384
}
+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace Amp\Parallel\Test\Worker;
4+
5+
use Amp\Parallel\Context\ProcessContextFactory;
6+
use Amp\Parallel\Worker\ContextWorkerFactory;
7+
use Amp\Parallel\Worker\ContextWorkerPool;
8+
use Amp\Parallel\Worker\DelegatingWorkerPool;
9+
use Amp\Parallel\Worker\WorkerPool;
10+
11+
class DelegatingWorkerPoolTest extends AbstractPoolTest
12+
{
13+
protected function createPool(
14+
int $max = WorkerPool::DEFAULT_WORKER_LIMIT,
15+
?string $autoloadPath = null,
16+
): WorkerPool {
17+
$pool = new ContextWorkerPool(
18+
limit: $max * 2,
19+
factory: new ContextWorkerFactory($autoloadPath, contextFactory: new ProcessContextFactory()),
20+
);
21+
22+
return new DelegatingWorkerPool($pool, $max);
23+
}
24+
}

test/Worker/ProcessPoolTest.php

+13-11
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,22 @@
22

33
namespace Amp\Parallel\Test\Worker;
44

5-
use Amp\Cancellation;
6-
use Amp\Parallel\Context\Context;
7-
use Amp\Parallel\Context\ContextFactory;
85
use Amp\Parallel\Context\ProcessContextFactory;
6+
use Amp\Parallel\Worker\ContextWorkerFactory;
7+
use Amp\Parallel\Worker\ContextWorkerPool;
8+
use Amp\Parallel\Worker\WorkerPool;
99

1010
class ProcessPoolTest extends AbstractPoolTest
1111
{
12-
public function createContextFactory(): ContextFactory
13-
{
14-
return new class implements ContextFactory {
15-
public function start(array|string $script, ?Cancellation $cancellation = null): Context
16-
{
17-
return (new ProcessContextFactory())->start($script, cancellation: $cancellation);
18-
}
19-
};
12+
protected function createPool(
13+
int $max = WorkerPool::DEFAULT_WORKER_LIMIT,
14+
?string $autoloadPath = null,
15+
): WorkerPool {
16+
$factory = new ContextWorkerFactory(
17+
bootstrapPath: $autoloadPath,
18+
contextFactory: new ProcessContextFactory(),
19+
);
20+
21+
return new ContextWorkerPool($max, $factory);
2022
}
2123
}

test/Worker/ProcessWorkerTest.php

+9-10
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,19 @@
22

33
namespace Amp\Parallel\Test\Worker;
44

5-
use Amp\Cancellation;
6-
use Amp\Parallel\Context\Context;
7-
use Amp\Parallel\Context\ContextFactory;
85
use Amp\Parallel\Context\ProcessContextFactory;
6+
use Amp\Parallel\Worker\ContextWorkerFactory;
7+
use Amp\Parallel\Worker\Worker;
98

109
class ProcessWorkerTest extends AbstractWorkerTest
1110
{
12-
public function createContextFactory(): ContextFactory
11+
protected function createWorker(?string $autoloadPath = null): Worker
1312
{
14-
return new class implements ContextFactory {
15-
public function start(array|string $script, ?Cancellation $cancellation = null): Context
16-
{
17-
return (new ProcessContextFactory())->start($script, cancellation: $cancellation);
18-
}
19-
};
13+
$factory = new ContextWorkerFactory(
14+
bootstrapPath: $autoloadPath,
15+
contextFactory: new ProcessContextFactory(),
16+
);
17+
18+
return $factory->create();
2019
}
2120
}

test/Worker/ThreadPoolTest.php

+13-11
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,27 @@
22

33
namespace Amp\Parallel\Test\Worker;
44

5-
use Amp\Cancellation;
6-
use Amp\Parallel\Context\Context;
7-
use Amp\Parallel\Context\ContextFactory;
85
use Amp\Parallel\Context\ThreadContext;
96
use Amp\Parallel\Context\ThreadContextFactory;
7+
use Amp\Parallel\Worker\ContextWorkerFactory;
8+
use Amp\Parallel\Worker\ContextWorkerPool;
9+
use Amp\Parallel\Worker\WorkerPool;
1010

1111
class ThreadPoolTest extends AbstractPoolTest
1212
{
13-
public function createContextFactory(): ContextFactory
14-
{
13+
protected function createPool(
14+
int $max = WorkerPool::DEFAULT_WORKER_LIMIT,
15+
?string $autoloadPath = null,
16+
): WorkerPool {
1517
if (!ThreadContext::isSupported()) {
1618
$this->markTestSkipped('ext-parallel required');
1719
}
1820

19-
return new class implements ContextFactory {
20-
public function start(array|string $script, ?Cancellation $cancellation = null): Context
21-
{
22-
return (new ThreadContextFactory())->start($script, cancellation: $cancellation);
23-
}
24-
};
21+
$factory = new ContextWorkerFactory(
22+
bootstrapPath: $autoloadPath,
23+
contextFactory: new ThreadContextFactory(),
24+
);
25+
26+
return new ContextWorkerPool($max, $factory);
2527
}
2628
}

test/Worker/ThreadWorkerTest.php

+9-10
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,24 @@
22

33
namespace Amp\Parallel\Test\Worker;
44

5-
use Amp\Cancellation;
6-
use Amp\Parallel\Context\Context;
7-
use Amp\Parallel\Context\ContextFactory;
85
use Amp\Parallel\Context\ThreadContext;
96
use Amp\Parallel\Context\ThreadContextFactory;
7+
use Amp\Parallel\Worker\ContextWorkerFactory;
8+
use Amp\Parallel\Worker\Worker;
109

1110
class ThreadWorkerTest extends AbstractWorkerTest
1211
{
13-
public function createContextFactory(): ContextFactory
12+
protected function createWorker(?string $autoloadPath = null): Worker
1413
{
1514
if (!ThreadContext::isSupported()) {
1615
$this->markTestSkipped('ext-parallel required');
1716
}
1817

19-
return new class implements ContextFactory {
20-
public function start(array|string $script, ?Cancellation $cancellation = null): Context
21-
{
22-
return (new ThreadContextFactory())->start($script, cancellation: $cancellation);
23-
}
24-
};
18+
$factory = new ContextWorkerFactory(
19+
bootstrapPath: $autoloadPath,
20+
contextFactory: new ThreadContextFactory(),
21+
);
22+
23+
return $factory->create();
2524
}
2625
}

0 commit comments

Comments
 (0)