Skip to content

Commit 3c21642

Browse files
authored
Forward port from 6.x: Correct use of asyncio.Lock to process a single control message at a time (#1418)
1 parent dc4d958 commit 3c21642

File tree

2 files changed

+122
-7
lines changed

2 files changed

+122
-7
lines changed

ipykernel/kernelbase.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,9 @@ def _parent_header(self):
251251
# execution count we store in the shell.
252252
execution_count = 0
253253

254+
# Asyncio lock to ensure only one control queue message is processed at a time.
255+
_control_lock = Instance(asyncio.Lock)
256+
254257
msg_types = [
255258
"execute_request",
256259
"complete_request",
@@ -301,7 +304,8 @@ def __init__(self, **kwargs):
301304

302305
async def dispatch_control(self, msg):
303306
"""Dispatch a control request, ensuring only one message is processed at a time."""
304-
async with asyncio.Lock():
307+
# Ensure only one control message is processed at a time
308+
async with self._control_lock:
305309
await self.process_control(msg)
306310

307311
async def process_control(self, msg):
@@ -578,6 +582,10 @@ def schedule_dispatch(self, dispatch, *args):
578582
# ensure the eventloop wakes up
579583
self.io_loop.add_callback(lambda: None)
580584

585+
async def _create_control_lock(self):
586+
# This can be removed when minimum python increases to 3.10
587+
self._control_lock = asyncio.Lock()
588+
581589
def start(self):
582590
"""register dispatchers for streams"""
583591
self.io_loop = ioloop.IOLoop.current()
@@ -588,6 +596,14 @@ def start(self):
588596
if self.control_stream:
589597
self.control_stream.on_recv(self.dispatch_control, copy=False)
590598

599+
if self.control_thread and sys.version_info < (3, 10):
600+
# Before Python 3.10 we need to ensure the _control_lock is created in the
601+
# thread that uses it. When our minimum python is 3.10 we can remove this
602+
# and always use the else below, or just assign it where it is declared.
603+
self.control_thread.io_loop.add_callback(self._create_control_lock)
604+
else:
605+
self._control_lock = asyncio.Lock()
606+
591607
if self.shell_stream:
592608
if self.shell_channel_thread:
593609
self.shell_channel_thread.manager.set_on_recv_callback(self.shell_main)

tests/test_debugger.py

Lines changed: 105 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pytest
44

5-
from .utils import TIMEOUT, get_reply, new_kernel
5+
from .utils import TIMEOUT, get_replies, get_reply, new_kernel
66

77
seq = 0
88

@@ -15,11 +15,8 @@
1515
debugpy = None
1616

1717

18-
def wait_for_debug_request(kernel, command, arguments=None, full_reply=False):
19-
"""Carry out a debug request and return the reply content.
20-
21-
It does not check if the request was successful.
22-
"""
18+
def prepare_debug_request(kernel, command, arguments=None):
19+
"""Prepare a debug request but do not send it."""
2320
global seq
2421
seq += 1
2522

@@ -32,6 +29,15 @@ def wait_for_debug_request(kernel, command, arguments=None, full_reply=False):
3229
"arguments": arguments or {},
3330
},
3431
)
32+
return msg
33+
34+
35+
def wait_for_debug_request(kernel, command, arguments=None, full_reply=False):
36+
"""Carry out a debug request and return the reply content.
37+
38+
It does not check if the request was successful.
39+
"""
40+
msg = prepare_debug_request(kernel, command, arguments)
3541
kernel.control_channel.send(msg)
3642
reply = get_reply(kernel, msg["header"]["msg_id"], channel="control")
3743
return reply if full_reply else reply["content"]
@@ -459,3 +465,96 @@ def my_test():
459465

460466
# Compare local and global variable
461467
assert global_var["value"] == local_var["value"] and global_var["type"] == local_var["type"] # noqa: PT018
468+
469+
470+
def test_debug_requests_sequential(kernel_with_debug):
471+
# Issue https://github.com/ipython/ipykernel/issues/1412
472+
# Control channel requests should be executed sequentially not concurrently.
473+
code = """def f(a, b):
474+
c = a + b
475+
return c
476+
477+
f(2, 3)"""
478+
479+
r = wait_for_debug_request(kernel_with_debug, "dumpCell", {"code": code})
480+
if debugpy:
481+
source = r["body"]["sourcePath"]
482+
else:
483+
assert r == {}
484+
source = "some path"
485+
486+
wait_for_debug_request(
487+
kernel_with_debug,
488+
"setBreakpoints",
489+
{
490+
"breakpoints": [{"line": 2}],
491+
"source": {"path": source},
492+
"sourceModified": False,
493+
},
494+
)
495+
496+
wait_for_debug_request(kernel_with_debug, "debugInfo")
497+
wait_for_debug_request(kernel_with_debug, "configurationDone")
498+
kernel_with_debug.execute(code)
499+
500+
if not debugpy:
501+
# Cannot stop on breakpoint if debugpy not installed
502+
return
503+
504+
# Wait for stop on breakpoint
505+
msg: dict = {"msg_type": "", "content": {}}
506+
while msg.get("msg_type") != "debug_event" or msg["content"].get("event") != "stopped":
507+
msg = kernel_with_debug.get_iopub_msg(timeout=TIMEOUT)
508+
509+
stacks = wait_for_debug_request(kernel_with_debug, "stackTrace", {"threadId": 1})["body"][
510+
"stackFrames"
511+
]
512+
513+
scopes = wait_for_debug_request(kernel_with_debug, "scopes", {"frameId": stacks[0]["id"]})[
514+
"body"
515+
]["scopes"]
516+
517+
# Get variablesReference for both Locals and Globals.
518+
locals_ref = next(filter(lambda s: s["name"] == "Locals", scopes))["variablesReference"]
519+
globals_ref = next(filter(lambda s: s["name"] == "Globals", scopes))["variablesReference"]
520+
521+
msgs = []
522+
for ref in [locals_ref, globals_ref]:
523+
msgs.append(
524+
prepare_debug_request(kernel_with_debug, "variables", {"variablesReference": ref})
525+
)
526+
527+
# Send messages in quick succession.
528+
for msg in msgs:
529+
kernel_with_debug.control_channel.send(msg)
530+
531+
replies = get_replies(kernel_with_debug, [msg["msg_id"] for msg in msgs], channel="control")
532+
533+
# Check debug variable returns are correct.
534+
locals = replies[0]["content"]
535+
assert locals["success"]
536+
variables = locals["body"]["variables"]
537+
var = next(filter(lambda v: v["name"] == "a", variables))
538+
assert var["type"] == "int"
539+
assert var["value"] == "2"
540+
var = next(filter(lambda v: v["name"] == "b", variables))
541+
assert var["type"] == "int"
542+
assert var["value"] == "3"
543+
544+
globals = replies[1]["content"]
545+
assert globals["success"]
546+
variables = globals["body"]["variables"]
547+
548+
names = [v["name"] for v in variables]
549+
assert "function variables" in names
550+
assert "special variables" in names
551+
552+
# Check status iopub messages alternate between busy and idle.
553+
execution_states = []
554+
while len(execution_states) < 8:
555+
msg = kernel_with_debug.get_iopub_msg(timeout=TIMEOUT)
556+
if msg["msg_type"] == "status":
557+
execution_states.append(msg["content"]["execution_state"])
558+
assert execution_states.count("busy") == 4
559+
assert execution_states.count("idle") == 4
560+
assert execution_states == ["busy", "idle"] * 4

0 commit comments

Comments
 (0)