diff --git a/src/plumpy/events.py b/src/plumpy/events.py index 27a594d4..5312895f 100644 --- a/src/plumpy/events.py +++ b/src/plumpy/events.py @@ -48,16 +48,24 @@ def set_event_loop_policy() -> None: def reset_event_loop_policy() -> None: """Reset the event loop policy to the asyncio default.""" - # purge weakref to prevent memory leak - loop = asyncio.get_event_loop() + # 1. Close the existing event loop (if it isn't already closed): + old_loop = asyncio.get_event_loop() + if not old_loop.is_closed(): + # purge weakref to prevent memory leak + cls = old_loop.__class__ - cls = loop.__class__ + del cls._check_running # type: ignore + del cls._nest_patched # type: ignore - del cls._check_running # type: ignore - del cls._nest_patched # type: ignore + old_loop.close() + # 2. Reset the policy to the default (i.e. None): asyncio.set_event_loop_policy(None) + # 3. Create a brand-new event loop under this default policy: + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + def run_until_complete(future: asyncio.Future, loop: asyncio.AbstractEventLoop | None = None) -> Any: loop = loop or get_event_loop() diff --git a/tests/test_processes.py b/tests/test_processes.py index 767442e4..9dc75a08 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -41,6 +41,7 @@ def on_kill(self, msg): super().on_kill(msg) +@pytest.mark.usefixtures('custom_event_loop_policy') def test_process_is_savable(): proc = utils.DummyProcess() assert isinstance(proc, Savable) @@ -584,6 +585,7 @@ def run(self): proc = StackTest() proc.execute() + @pytest.mark.usefixtures('custom_event_loop_policy') def test_process_stack_multiple(self): """ Run multiple and nested processes to make sure the process stack is always correct @@ -619,6 +621,7 @@ def run(self): assert len(expect_true) == n_run * 3 + @pytest.mark.usefixtures('custom_event_loop_policy') def test_process_nested(self): """ Run multiple and nested processes to make sure the process stack is always correct