From 8ac48fac0d20e4458060179b60bf6572d0f71863 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 1 Mar 2025 09:36:05 -0600 Subject: [PATCH 1/5] Add ForkContext --- composer-require-check.json | 3 +- src/Context/ForkContext.php | 122 +++++++++++++++++++++++++++++ src/Context/ForkContextFactory.php | 49 ++++++++++++ src/Context/Internal/functions.php | 17 ++-- 4 files changed, 185 insertions(+), 6 deletions(-) create mode 100644 src/Context/ForkContext.php create mode 100644 src/Context/ForkContextFactory.php diff --git a/composer-require-check.json b/composer-require-check.json index 0d0c4fc..621bf16 100644 --- a/composer-require-check.json +++ b/composer-require-check.json @@ -39,6 +39,7 @@ "SPL", "standard", "hash", - "pcntl" + "pcntl", + "posix" ] } diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php new file mode 100644 index 0000000..76e9c76 --- /dev/null +++ b/src/Context/ForkContext.php @@ -0,0 +1,122 @@ + + */ +final class ForkContext extends AbstractContext +{ + private const DEFAULT_START_TIMEOUT = 5; + + /** + * @param string|non-empty-list $script Path to PHP script or array with first element as path and + * following elements options to the PHP script (e.g.: ['bin/worker.php', 'Option1Value', 'Option2Value']). + * @param positive-int $childConnectTimeout Number of seconds the child will attempt to connect to the parent + * before failing. + * + * @throws ContextException If starting the process fails. + */ + public static function start( + IpcHub $ipcHub, + string|array $script, + ?Cancellation $cancellation = null, + int $childConnectTimeout = self::DEFAULT_START_TIMEOUT, + Serializer $serializer = new NativeSerializer(), + ): self { + $key = $ipcHub->generateKey(); + + // Fork + if (($pid = \pcntl_fork()) < 0) { + throw new ContextException("Forking failed: " . \posix_strerror(\posix_get_last_error())); + } + + // Parent + if ($pid > 0) { + try { + $socket = $ipcHub->accept($key, $cancellation); + $ipcChannel = new StreamChannel($socket, $socket, $serializer); + + $socket = $ipcHub->accept($key, $cancellation); + $resultChannel = new StreamChannel($socket, $socket, $serializer); + } catch (\Throwable $exception) { + $cancellation?->throwIfRequested(); + + throw new ContextException("Connecting failed after forking", previous: $exception); + } + + return new self($pid, $ipcChannel, $resultChannel); + } + + // Child + \define("AMP_CONTEXT", "parallel"); + \define("AMP_CONTEXT_ID", \getmypid()); + + if (\is_string($script)) { + $script = [$script]; + } + + $connectCancellation = new TimeoutCancellation((float) $childConnectTimeout); + Internal\runContext($ipcHub->getUri(), $key, $connectCancellation, $script, $serializer); + + exit(0); + } + + private bool $exited = false; + + /** + * @param StreamChannel $ipcChannel + */ + private function __construct( + private readonly int $pid, + StreamChannel $ipcChannel, + StreamChannel $resultChannel, + ) { + parent::__construct($ipcChannel, $resultChannel); + } + + public function __destruct() + { + $this->close(); + } + + public function close(): void + { + if (!$this->exited) { + \posix_kill($this->pid, \SIGKILL); + $this->exited = true; + } + + parent::close(); + } + + public function join(?Cancellation $cancellation = null): mixed + { + $data = $this->receiveExitResult($cancellation); + + $this->close(); + + return $data->getResult(); + } +} diff --git a/src/Context/ForkContextFactory.php b/src/Context/ForkContextFactory.php new file mode 100644 index 0000000..d6665d6 --- /dev/null +++ b/src/Context/ForkContextFactory.php @@ -0,0 +1,49 @@ + $script + * + * @throws ContextException + */ + public function start(string|array $script, ?Cancellation $cancellation = null): ForkContext + { + return ForkContext::start( + ipcHub: $this->ipcHub, + script: $script, + cancellation: $cancellation, + childConnectTimeout: $this->childConnectTimeout, + ); + } +} diff --git a/src/Context/Internal/functions.php b/src/Context/Internal/functions.php index e0f130d..c31bc28 100644 --- a/src/Context/Internal/functions.php +++ b/src/Context/Internal/functions.php @@ -6,22 +6,29 @@ use Amp\Cancellation; use Amp\Future; use Amp\Parallel\Ipc; +use Amp\Serialization\NativeSerializer; use Amp\Serialization\SerializationException; +use Amp\Serialization\Serializer; use Revolt\EventLoop; /** @internal */ -function runContext(string $uri, string $key, Cancellation $connectCancellation, array $argv): void -{ - EventLoop::queue(function () use ($argv, $uri, $key, $connectCancellation): void { +function runContext( + string $uri, + string $key, + Cancellation $connectCancellation, + array $argv, + Serializer $serializer = new NativeSerializer(), +): void { + EventLoop::queue(function () use ($argv, $uri, $key, $connectCancellation, $serializer): void { /** @noinspection PhpUnusedLocalVariableInspection */ $argc = \count($argv); try { $socket = Ipc\connect($uri, $key, $connectCancellation); - $ipcChannel = new StreamChannel($socket, $socket); + $ipcChannel = new StreamChannel($socket, $socket, $serializer); $socket = Ipc\connect($uri, $key, $connectCancellation); - $resultChannel = new StreamChannel($socket, $socket); + $resultChannel = new StreamChannel($socket, $socket, $serializer); } catch (\Throwable $exception) { \trigger_error($exception->getMessage(), E_USER_ERROR); } From 31d8b70286542a1fe1750218f4ba588404614704 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 1 Mar 2025 14:48:49 -0600 Subject: [PATCH 2/5] Updates --- src/Context/ForkContext.php | 54 +++++++++-- src/Context/Internal/functions.php | 115 ++++++++++++------------ src/Context/Internal/process-runner.php | 4 +- src/Context/ThreadContext.php | 4 +- 4 files changed, 109 insertions(+), 68 deletions(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index 76e9c76..e73ac7d 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -12,7 +12,7 @@ /** * USE AT YOUR OWN RISK! This context is not used by default in {@see DefaultContextFactory} because the timing of its - * use must be purposeful and situational. + * creation must be purposeful and situational. * * Forking is not recommended at arbitrary points in an application since the entire state of the parent process is * inherited into the child process, including the event-loop! @@ -83,7 +83,9 @@ public static function start( exit(0); } - private bool $exited = false; + private ?int $exited = null; + + private bool $weKilled = false; /** * @param StreamChannel $ipcChannel @@ -101,11 +103,49 @@ public function __destruct() $this->close(); } + public function receive(?Cancellation $cancellation = null): mixed + { + $this->checkExit(false); + + return parent::receive($cancellation); + } + + public function send(mixed $data): void + { + $this->checkExit(false); + + parent::send($data); + } + + private function checkExit(bool $wait): ?int + { + if ($this->exited === null) { + if (\pcntl_waitpid($this->pid, $status, $wait ? 0 : \WNOHANG) === 0) { + return null; + } + + $this->exited = match (true) { + \pcntl_wifsignaled($status) => \pcntl_wtermsig($status), + \pcntl_wifexited($status) => \pcntl_wexitstatus($status) - 128, + \pcntl_wifstopped($status) => \pcntl_wstopsig($status), + default => -1, + }; + } + + if (!$this->weKilled && $this->exited > 0) { + throw new ContextException("Worker exited due to signal {$this->exited}", $this->exited); + } + + return $this->exited; + } + public function close(): void { if (!$this->exited) { + $this->weKilled = true; \posix_kill($this->pid, \SIGKILL); - $this->exited = true; + + $this->checkExit(true); } parent::close(); @@ -113,9 +153,11 @@ public function close(): void public function join(?Cancellation $cancellation = null): mixed { - $data = $this->receiveExitResult($cancellation); - - $this->close(); + try { + $data = $this->receiveExitResult($cancellation); + } finally { + $this->close(); + } return $data->getResult(); } diff --git a/src/Context/Internal/functions.php b/src/Context/Internal/functions.php index c31bc28..7dcaf4d 100644 --- a/src/Context/Internal/functions.php +++ b/src/Context/Internal/functions.php @@ -9,7 +9,6 @@ use Amp\Serialization\NativeSerializer; use Amp\Serialization\SerializationException; use Amp\Serialization\Serializer; -use Revolt\EventLoop; /** @internal */ function runContext( @@ -19,73 +18,69 @@ function runContext( array $argv, Serializer $serializer = new NativeSerializer(), ): void { - EventLoop::queue(function () use ($argv, $uri, $key, $connectCancellation, $serializer): void { - /** @noinspection PhpUnusedLocalVariableInspection */ - $argc = \count($argv); + /** @noinspection PhpUnusedLocalVariableInspection */ + $argc = \count($argv); - try { - $socket = Ipc\connect($uri, $key, $connectCancellation); - $ipcChannel = new StreamChannel($socket, $socket, $serializer); - - $socket = Ipc\connect($uri, $key, $connectCancellation); - $resultChannel = new StreamChannel($socket, $socket, $serializer); - } catch (\Throwable $exception) { - \trigger_error($exception->getMessage(), E_USER_ERROR); - } - - try { - if (!isset($argv[0])) { - throw new \Error("No script path given"); - } + try { + $socket = Ipc\connect($uri, $key, $connectCancellation); + $ipcChannel = new StreamChannel($socket, $socket, $serializer); - if (!\is_file($argv[0])) { - throw new \Error(\sprintf( - "No script found at '%s' (be sure to provide the full path to the script)", - $argv[0], - )); - } + $socket = Ipc\connect($uri, $key, $connectCancellation); + $resultChannel = new StreamChannel($socket, $socket, $serializer); + } catch (\Throwable $exception) { + \trigger_error($exception->getMessage(), E_USER_ERROR); + } - try { - // Protect current scope by requiring script within another function. - // Using $argc, so it is available to the required script. - $callable = (function () use ($argc, $argv): callable { - /** @psalm-suppress UnresolvableInclude */ - return require $argv[0]; - })(); - } catch (\TypeError $exception) { - throw new \Error(\sprintf( - "Script '%s' did not return a callable function: %s", - $argv[0], - $exception->getMessage(), - ), 0, $exception); - } catch (\ParseError $exception) { - throw new \Error(\sprintf( - "Script '%s' contains a parse error: %s", - $argv[0], - $exception->getMessage(), - ), 0, $exception); - } + try { + if (!isset($argv[0])) { + throw new \Error("No script path given"); + } - $returnValue = $callable(new ContextChannel($ipcChannel)); - $result = new ExitSuccess($returnValue instanceof Future ? $returnValue->await() : $returnValue); - } catch (\Throwable $exception) { - $result = new ExitFailure($exception); + if (!\is_file($argv[0])) { + throw new \Error(\sprintf( + "No script found at '%s' (be sure to provide the full path to the script)", + $argv[0], + )); } try { - try { - $resultChannel->send($result); - } catch (SerializationException $exception) { - // Serializing the result failed. Send the reason why. - $resultChannel->send(new ExitFailure($exception)); - } - } catch (\Throwable $exception) { - \trigger_error(\sprintf( - "Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent", + // Protect current scope by requiring script within another function. + // Using $argc, so it is available to the required script. + $callable = (function () use ($argc, $argv): callable { + /** @psalm-suppress UnresolvableInclude */ + return require $argv[0]; + })(); + } catch (\TypeError $exception) { + throw new \Error(\sprintf( + "Script '%s' did not return a callable function: %s", + $argv[0], + $exception->getMessage(), + ), 0, $exception); + } catch (\ParseError $exception) { + throw new \Error(\sprintf( + "Script '%s' contains a parse error: %s", + $argv[0], $exception->getMessage(), - ), E_USER_ERROR); + ), 0, $exception); } - }); - EventLoop::run(); + $returnValue = $callable(new ContextChannel($ipcChannel)); + $result = new ExitSuccess($returnValue instanceof Future ? $returnValue->await() : $returnValue); + } catch (\Throwable $exception) { + $result = new ExitFailure($exception); + } + + try { + try { + $resultChannel->send($result); + } catch (SerializationException $exception) { + // Serializing the result failed. Send the reason why. + $resultChannel->send(new ExitFailure($exception)); + } + } catch (\Throwable $exception) { + \trigger_error(\sprintf( + "Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent", + $exception->getMessage(), + ), E_USER_ERROR); + } } diff --git a/src/Context/Internal/process-runner.php b/src/Context/Internal/process-runner.php index d10a60f..ae6f94d 100644 --- a/src/Context/Internal/process-runner.php +++ b/src/Context/Internal/process-runner.php @@ -85,5 +85,7 @@ \trigger_error($exception->getMessage(), E_USER_ERROR); } - runContext($uri, $key, $cancellation, $argv); + EventLoop::queue(runContext(...), $uri, $key, $cancellation, $argv); + + EventLoop::run(); })(); diff --git a/src/Context/ThreadContext.php b/src/Context/ThreadContext.php index ef3544d..6f942a4 100644 --- a/src/Context/ThreadContext.php +++ b/src/Context/ThreadContext.php @@ -104,7 +104,9 @@ public static function start( // such as select() will not be interrupted. })); - Internal\runContext($uri, $key, new TimeoutCancellation($connectTimeout), $argv); + EventLoop::queue(Internal\runContext(...), $uri, $key, new TimeoutCancellation($connectTimeout), $argv); + + EventLoop::run(); return 0; // @codeCoverageIgnoreEnd From 03a5cddb84164a57c609fb22b3f8350ee247cff3 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 1 Mar 2025 14:50:55 -0600 Subject: [PATCH 3/5] Fix --- src/Context/ForkContext.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index e73ac7d..465c8ce 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -141,7 +141,7 @@ private function checkExit(bool $wait): ?int public function close(): void { - if (!$this->exited) { + if ($this->checkExit(false) === null) { $this->weKilled = true; \posix_kill($this->pid, \SIGKILL); From ffedc390043c40facd101e69acd59474acf9e653 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Fri, 7 Mar 2025 17:39:50 +0100 Subject: [PATCH 4/5] Fixup tests --- src/Context/ForkContext.php | 13 ++++++++----- test/Context/ForkContextTest.php | 30 ++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 5 deletions(-) create mode 100644 test/Context/ForkContextTest.php diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index 465c8ce..8859113 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -30,6 +30,11 @@ final class ForkContext extends AbstractContext { private const DEFAULT_START_TIMEOUT = 5; + public static function isSupported(): bool + { + return \function_exists('pcntl_fork'); + } + /** * @param string|non-empty-list $script Path to PHP script or array with first element as path and * following elements options to the PHP script (e.g.: ['bin/worker.php', 'Option1Value', 'Option2Value']). @@ -153,11 +158,9 @@ public function close(): void public function join(?Cancellation $cancellation = null): mixed { - try { - $data = $this->receiveExitResult($cancellation); - } finally { - $this->close(); - } + $data = $this->receiveExitResult($cancellation); + + $this->close(); return $data->getResult(); } diff --git a/test/Context/ForkContextTest.php b/test/Context/ForkContextTest.php new file mode 100644 index 0000000..da80077 --- /dev/null +++ b/test/Context/ForkContextTest.php @@ -0,0 +1,30 @@ +markTestSkipped('pcntl_fork required'); + } + + return (new ForkContextFactory())->start($script); + } + + public function testThrowingProcessOnReceive(): void + { + // tmp + $this->expectNotToPerformAssertions(); + } + + public function testThrowingProcessOnSend(): void + { + // tmp + $this->expectNotToPerformAssertions(); + } +} From 981895dd9e2122e73725f5792ce155698ce650a5 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Mon, 10 Mar 2025 13:05:23 +0100 Subject: [PATCH 5/5] Fixup --- src/Context/ForkContext.php | 5 ++++- test/Context/ForkContextTest.php | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index 8859113..fdf7f73 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -9,6 +9,8 @@ use Amp\Serialization\NativeSerializer; use Amp\Serialization\Serializer; use Amp\TimeoutCancellation; +use Revolt\EventLoop; +use Revolt\EventLoop\Driver\UvDriver; /** * USE AT YOUR OWN RISK! This context is not used by default in {@see DefaultContextFactory} because the timing of its @@ -32,7 +34,8 @@ final class ForkContext extends AbstractContext public static function isSupported(): bool { - return \function_exists('pcntl_fork'); + return \function_exists('pcntl_fork') + && !EventLoop::getDriver() instanceof UvDriver; } /** diff --git a/test/Context/ForkContextTest.php b/test/Context/ForkContextTest.php index da80077..c662b88 100644 --- a/test/Context/ForkContextTest.php +++ b/test/Context/ForkContextTest.php @@ -3,14 +3,15 @@ namespace Amp\Parallel\Test\Context; use Amp\Parallel\Context\Context; +use Amp\Parallel\Context\ForkContext; use Amp\Parallel\Context\ForkContextFactory; class ForkContextTest extends AbstractContextTest { public function createContext(string|array $script): Context { - if (!\function_exists('pcntl_fork')) { - $this->markTestSkipped('pcntl_fork required'); + if (!ForkContext::isSupported()) { + $this->markTestSkipped('Not supported on the current platform/driver'); } return (new ForkContextFactory())->start($script);