Skip to content

Commit 798dc89

Browse files
committed
Instrument Multiprocessing forkserver
This depends on gevent/gevent#2126 being fixed. Update documentation
1 parent 4238f18 commit 798dc89

File tree

10 files changed

+106
-24
lines changed

10 files changed

+106
-24
lines changed

.idea/geventmp.iml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/misc.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.rst

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,6 @@ Supported Platforms
121121
* CPython 3.9, 3.10, 3.11, 3.12, 3.13
122122
* PyPy 3.9, 3.10
123123

124-
Known Issues
125-
============
126-
127-
* Multiprocessing `forkserver` works in GeventMP_, but the spawned child isn't green.
128-
129124
TODO
130125
====
131126
1. Monkey patch Windows to the extent possible.

src/integrationtest/python/monkey_manager_tests.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ def test_manager_fork(self):
7373
def test_manager_spawn(self):
7474
self.run_manager_test("spawn")
7575

76+
def test_manager_forkserver(self):
77+
self.run_manager_test("forkserver")
78+
7679
def run_manager_test(self, context, do_trace=False, remote_trace=False):
7780
ctx = mp.get_context(context)
7881
if do_trace:

src/integrationtest/python/monkey_mp_tests.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,18 @@ def test_mp_queues_fork(self):
5959
def test_mp_queues_spawn(self):
6060
self.run_test_mp_queues("spawn", _mp_test_gevent.test_queues)
6161

62+
def test_mp_queues_forkserver(self):
63+
self.run_test_mp_queues("forkserver", _mp_test_gevent.test_queues)
64+
6265
def test_mp_jqueues_fork(self):
6366
self.run_test_mp_jqueues("fork", _mp_test_gevent.test_queues)
6467

6568
def test_mp_jqueues_spawn(self):
6669
self.run_test_mp_jqueues("spawn", _mp_test_gevent.test_queues)
6770

71+
def test_mp_jqueues_forkserver(self):
72+
self.run_test_mp_jqueues("forkserver", _mp_test_gevent.test_queues)
73+
6874
def test_mp_no_args_fork(self):
6975
self.run_test_mp_no_args("fork", _mp_test_gevent.test_no_args)
7076

src/main/python/geventmp/_mp/3/_mp_forkserver.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import os
17+
import signal
1618
import sys
1719
from multiprocessing.forkserver import ForkServer as _ForkServer
1820

1921
from gevent.os import make_nonblocking, nb_read, nb_write
2022

21-
__implements__ = ["ForkServer", "_forkserver", "ensure_running",
23+
__implements__ = ["ForkServer", "_forkserver", "ensure_running", "_serve_one",
2224
"get_inherited_fds", "connect_to_new_process",
2325
"set_forkserver_preload"]
2426
__target__ = "multiprocessing.forkserver"
@@ -85,6 +87,32 @@ def write_unsigned(fd, n):
8587
raise RuntimeError('should not get here')
8688
msg = msg[nbytes:]
8789

90+
91+
def _serve_one(child_r, fds, unused_fds, handlers):
92+
from multiprocessing import resource_tracker, spawn
93+
# close unnecessary stuff and reset signal handlers
94+
signal.set_wakeup_fd(-1)
95+
for sig, val in handlers.items():
96+
signal.signal(sig, val)
97+
for fd in unused_fds:
98+
os.close(fd)
99+
100+
(_forkserver._forkserver_alive_fd,
101+
resource_tracker._resource_tracker._fd,
102+
*_forkserver._inherited_fds) = fds
103+
make_nonblocking(_forkserver._forkserver_alive_fd)
104+
make_nonblocking(resource_tracker._resource_tracker._fd)
105+
for ifd in _forkserver._inherited_fds:
106+
make_nonblocking(ifd)
107+
108+
# Run process object received over pipe
109+
parent_sentinel = os.dup(child_r)
110+
make_nonblocking(child_r)
111+
make_nonblocking(parent_sentinel)
112+
code = spawn._main(child_r, parent_sentinel)
113+
114+
return code
115+
88116
_forkserver = ForkServer()
89117
ensure_running = _forkserver.ensure_running
90118
get_inherited_fds = _forkserver.get_inherited_fds
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2022 Karellen, Inc. and contributors
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from multiprocessing.popen_forkserver import Popen as _Popen
17+
18+
from gevent.os import make_nonblocking
19+
20+
__implements__ = ["Popen"]
21+
__target__ = "multiprocessing.popen_forkserver"
22+
23+
24+
class Popen(_Popen):
25+
def _launch(self, process_obj):
26+
self.sentinel = None
27+
try:
28+
super()._launch(process_obj)
29+
finally:
30+
if self.sentinel is not None:
31+
make_nonblocking(self.sentinel)

src/main/python/geventmp/_mp/3/_mp_spawn.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,14 @@ def get_command_line(**kwds):
2828
return ([sys.executable, '--multiprocessing-fork'] +
2929
['%s=%r' % item for item in kwds.items()])
3030
else:
31-
from gevent.monkey import saved
32-
from gevent import config
33-
from gevent._config import ImportableSetting
34-
from geventmp.monkey import GEVENT_SAVED_MODULE_SETTINGS
35-
36-
prog = 'from gevent import monkey; monkey.patch_all(**%r); ' + \
37-
'from gevent import config; [setattr(config, k, v) for k, v in %r.items()]; ' + \
38-
'from multiprocessing.spawn import spawn_main; ' + \
39-
'spawn_main(%s);'
40-
41-
prog %= (saved[GEVENT_SAVED_MODULE_SETTINGS],
42-
{k: getattr(config, k) for k in dir(config)
43-
if not isinstance(config.settings[k], ImportableSetting)},
44-
', '.join('%s=%r' % item for item in kwds.items()))
31+
prog, args = util.get_command_line_gevent_preamble()
32+
33+
prog += 'from multiprocessing.spawn import spawn_main; ' + \
34+
'spawn_main(%s);'
35+
36+
args += (', '.join('%s=%r' % item for item in kwds.items()),)
37+
38+
prog %= args
4539

4640
opts = util._args_from_interpreter_flags()
4741
return [spawn._python_exe] + opts + ['-c', prog, '--multiprocessing-fork']

src/main/python/geventmp/_mp/3/_mp_util.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,39 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
from multiprocessing.util import spawnv_passfds as _spawnv_passfd, register_after_fork
17+
1618
from gevent.os import _watch_child
1719
from gevent.threading import local
18-
from multiprocessing.util import spawnv_passfds as _spawnv_passfd, register_after_fork
1920

20-
__implements__ = ["spawnv_passfds", "ForkAwareLocal"]
21+
__implements__ = ["spawnv_passfds", "ForkAwareLocal", "get_command_line_gevent_preamble"]
2122
__target__ = "multiprocessing.util"
2223

2324

25+
def get_command_line_gevent_preamble():
26+
from gevent.monkey import saved
27+
from gevent import config
28+
from gevent._config import ImportableSetting
29+
from geventmp.monkey import GEVENT_SAVED_MODULE_SETTINGS
30+
31+
prog = 'from gevent import monkey; monkey.patch_all(**%r); ' + \
32+
'from gevent import config; [setattr(config, k, v) for k, v in %r.items()]; '
33+
34+
args = (saved[GEVENT_SAVED_MODULE_SETTINGS],
35+
{k: getattr(config, k) for k in dir(config)
36+
if not isinstance(config.settings[k], ImportableSetting)})
37+
38+
return prog, args
39+
40+
2441
def spawnv_passfds(path, args, passfds):
42+
launch_args = args[2] if len(args) >= 3 else None
43+
if launch_args and launch_args.startswith("from multiprocessing.forkserver import main;"):
44+
prog, prog_args = get_command_line_gevent_preamble()
45+
prog += "%s"
46+
launch_args = prog % (prog_args + (launch_args,))
47+
args[2] = launch_args
48+
2549
cpid = _spawnv_passfd(path, args, passfds)
2650
_watch_child(cpid)
2751
return cpid

src/main/python/geventmp/monkey.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def _patch_mp(will_patch_all):
7272
_patch_module("_mp.3._mp_connection", _package_prefix='geventmp.')
7373
_patch_module("_mp.3._mp_synchronize", _package_prefix='geventmp.')
7474
_patch_module("_mp.3._mp_popen_fork", _package_prefix='geventmp.')
75+
_patch_module("_mp.3._mp_popen_forkserver", _package_prefix='geventmp.')
7576
_patch_module("_mp.3._mp_popen_spawn_posix", _package_prefix='geventmp.')
7677
_patch_module("_mp.3._mp_forkserver", _package_prefix='geventmp.')
7778
_patch_module("_mp.3._mp_resource_tracker", _package_prefix='geventmp.')

0 commit comments

Comments
 (0)