Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 +170,20 @@ async def _handle_event(self):
*all* waiting events are processed in order.
"""
# create async wrapper within coroutine
pipe_in = zmq.asyncio.Socket(self._pipe_in0)
try:
while True:
await pipe_in.recv()
# freeze event count so new writes don't extend the queue
# while we are processing
n_events = len(self._events)
for _ in range(n_events):
event_f = self._events.popleft()
event_f()
except Exception:
if self.thread.__stop.is_set():
return
raise
with zmq.asyncio.Socket(self._pipe_in0) as pipe_in:
try:
while True:
await pipe_in.recv()
# freeze event count so new writes don't extend the queue
# while we are processing
n_events = len(self._events)
for _ in range(n_events):
event_f = self._events.popleft()
event_f()
except Exception:
if self.thread.__stop.is_set():
return
raise

def _setup_pipe_in(self):
"""setup listening pipe for IOPub from forked subprocesses"""
Expand Down Expand Up @@ -218,6 +218,8 @@ async def _handle_pipe_msgs(self):
if self.thread.__stop.is_set():
return
raise
finally:
self._async_pipe_in1.close()

async def _handle_pipe_msg(self, msg=None):
"""handle a pipe message from a subprocess"""
Expand Down
Loading