Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid deadlock on channel mutex when stopping pool #148

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pebble/pool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,11 @@ def message_manager_loop(pool_manager: 'PoolManager'):
context = pool_manager.context

try:
while context.alive and not GLOBAL_SHUTDOWN:
# Keep pumping the message pipe as long as the pool manager lives. In
# particular, during the pool stopping procedure we want to avoid any
# worker from being blocked on writing to the pipe, as this would result
# in deadlocking on the channel mutex.
while pool_manager.alive and not GLOBAL_SHUTDOWN:
pool_manager.process_next_message(CONSTS.sleep_unit)
except BrokenProcessPool:
context.status = PoolStatus.ERROR
Expand All @@ -200,6 +204,7 @@ class PoolManager:
def __init__(self, context: PoolContext,
mp_context: multiprocessing.context.BaseContext):
self.context = context
self.alive = True
self.task_manager = TaskManager(context.task_queue.task_done)
self.worker_manager = WorkerManager(context.workers,
context.worker_parameters,
Expand All @@ -211,6 +216,7 @@ def start(self):
def stop(self):
self.worker_manager.close_channels()
self.worker_manager.stop_workers()
self.alive = False

def schedule(self, task: Task):
"""Schedules a new Task in the PoolManager."""
Expand Down
65 changes: 65 additions & 0 deletions test/test_process_pool_generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from concurrent.futures import FIRST_COMPLETED, wait
from dataclasses import dataclass
import time
import unittest

from pebble import ProcessPool
from pebble.common.types import CONSTS, FutureStatus

def function(argument, sleep_interval):
time.sleep(sleep_interval)
return argument

g_pool = None

@dataclass
class SpamMessage:
pass

def flooding_function():
workers_channel = g_pool._pool_manager.worker_manager.workers_channel
while True:
workers_channel.send(SpamMessage())

class TestProcessPoolGeneric(unittest.TestCase):
def test_big_values_and_cancellation(self):
"""Test that the pool handles workers with big tasks and can cancel them."""
# Ideally this should be bigger than the multiprocessing pipe's internal
# buffer.
BIG_VALUE = [0] * 10 * 1000 * 1000
# The bigger number of workers is, the higher is the chance of catching
# bugs.
CNT = 50
# Let the worker events cluster around the sleep unit granularity to
# increase the chance of catching bugs.
INITIAL_SLEEP = CONSTS.sleep_unit * 10
EPS = CONSTS.sleep_unit / 10

futures = []
with ProcessPool(max_workers=CNT) as pool:
for i in range(CNT):
futures.append(pool.schedule(function, args=[BIG_VALUE, INITIAL_SLEEP + i * EPS]))
wait(futures, return_when=FIRST_COMPLETED)
for f in futures:
f.cancel()
time.sleep(EPS * CNT / 2)
pool.stop()
pool.join()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found a good way to assert the state here: neither that there was no BrokenProcessPool in pool_manager_loop() nor that we slept for LOCK_TIMEOUT:

  • For the former, I tried changing the PoolContext.status setter to allow transitioning from STOPPED to ERROR, but apparently there are other places that try to set ERROR even in "good" shutdown scenarios.
  • For the latter, it seems brittle to rely on clocks in the tests. Unless maybe we override LOCK_TIMEOUT to some really big number in this test?..


def test_message_flood_from_worker(self):
"""Test that the pool stops despite the worker spamming the message pipe."""
with ProcessPool() as pool:
# Use a global variable to pass channels to the worker.
global g_pool
g_pool = pool

future = pool.schedule(flooding_function)

# Wait until the worker starts running the (spammy) task.
while future._state == FutureStatus.PENDING:
time.sleep(0.1)
self.assertEqual(future._state, FutureStatus.RUNNING)

pool.stop()
pool.join()
g_pool = None