Skip to content

Commit

Permalink
Split kernel operations into its own class
Browse files Browse the repository at this point in the history
Each NotebookClient can now produce multiple kernels, thus
matching the notebook REST API. These can be used with
async context managers, so we always clean up the kernel
when done.

We move OperationError out to break cyclic imports
  • Loading branch information
yuvipanda committed Apr 25, 2020
1 parent 73140db commit 13ac3d4
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 191 deletions.
4 changes: 4 additions & 0 deletions binderbot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions


class OperationError(Exception):
pass
169 changes: 17 additions & 152 deletions binderbot/binderbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@
import json
import textwrap
import re
from contextlib import asynccontextmanager

import nbformat
from nbconvert.preprocessors import ClearOutputPreprocessor

from binderbot import OperationError
from .kernel import Kernel

logger = structlog.get_logger()

# https://stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python
def _ansi_escape(text):
return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', text)


class OperationError(Exception):
pass


class BinderUser:
class States(Enum):
CLEAR = 1
Expand Down Expand Up @@ -102,13 +102,13 @@ async def shutdown_binder(self):
# So we can't make requests to the hub API.
# FIXME: Provide hub auth tokens from binderhub API
nbclient = NotebookClient(self.session, self.notebook_url, self.token, self.log)
await nbclient.start_kernel()
await nbclient.run_code("""
import os
import signal
# FIXME: Wait a bit, and send SIGKILL otherwise
os.kill(1, signal.SIGTERM)
""")
async with nbclient.start_kernel() as kernel:
await kernel.run_code("""
import os
import signal
# FIXME: Wait a bit, and send SIGKILL otherwise
os.kill(1, signal.SIGTERM)
""")


class NotebookClient:
Expand All @@ -126,6 +126,7 @@ def __init__(self, session: aiohttp.ClientSession, url: URL, token: str, log: st
'Authorization': f'token {self.token}'
}

@asynccontextmanager
async def start_kernel(self):
self.log.msg('Kernel: Starting', action='kernel-start', phase='start')
start_time = time.monotonic()
Expand All @@ -139,25 +140,13 @@ async def start_kernel(self):
if resp.status != 201:
self.log.msg('Kernel: Start failed', action='kernel-start', phase='failed')
raise OperationError()
self.kernel_id = (await resp.json())['id']
kernel_id = (await resp.json())['id']
self.log.msg('Kernel: Started', action='kernel-start', phase='complete')
self.state = BinderUser.States.KERNEL_STARTED


async def stop_kernel(self):
self.log.msg('Kernel: Stopping', action='kernel-stop', phase='start')
start_time = time.monotonic()
k = Kernel(self, kernel_id)
try:
resp = await self.session.delete(self.url / 'api/kernels' / self.kernel_id, headers=self.auth_headers)
except Exception as e:
self.log.msg('Kernel:Failed Stopped {}'.format(str(e)), action='kernel-stop', phase='failed')
raise OperationError()

if resp.status != 204:
self.log.msg('Kernel:Failed Stopped {}'.format(str(resp)), action='kernel-stop', phase='failed')
raise OperationError()

self.log.msg('Kernel: Stopped', action='kernel-stop', phase='complete')
yield k
finally:
await k.stop_kernel()

# https://github.com/jupyter/jupyter/wiki/Jupyter-Notebook-Server-API#notebook-and-file-contents-api
async def get_contents(self, path):
Expand All @@ -171,108 +160,6 @@ async def put_contents(self, path, nb_data):
json=data, headers=self.auth_headers)
resp.raise_for_status()

def request_execute_code(self, msg_id, code):
return {
"header": {
"msg_id": msg_id,
"username": "jovyan",
"msg_type": "execute_request",
"version": "5.2"
},
"metadata": {},
"content": {
"code": textwrap.dedent(code),
"silent": False,
"store_history": True,
"user_expressions": {},
"allow_stdin": True,
"stop_on_error": True
},
"buffers": [],
"parent_header": {},
"channel": "shell"
}


async def run_code(self, code):
"""Run code and return stdout, stderr."""
channel_url = self.url / 'api/kernels' / self.kernel_id / 'channels'
self.log.msg('WS: Connecting', action='kernel-connect', phase='start')
is_connected = False
try:
async with self.session.ws_connect(channel_url) as ws:
is_connected = True
self.log.msg('WS: Connected', action='kernel-connect', phase='complete')
start_time = time.monotonic()
self.log.msg('Code Execute: Started', action='code-execute', phase='start')
exec_start_time = time.monotonic()
msg_id = str(uuid.uuid4())
await ws.send_json(self.request_execute_code(msg_id, code))

stdout = ''
stderr = ''

async for msg_text in ws:
if msg_text.type != aiohttp.WSMsgType.TEXT:
self.log.msg(
'WS: Unexpected message type',
action='code-execute', phase='failure',
message_type=msg_text.type, message=str(msg_text),
duration=time.monotonic() - exec_start_time
)
raise OperationError()

msg = msg_text.json()

if 'parent_header' in msg and msg['parent_header'].get('msg_id') == msg_id:
# These are responses to our request
self.log.msg(f'Code Execute: Receive response', action='code-execute', phase='receive-stream',
channel=msg['channel'], msg_type=msg['msg_type'])
if msg['channel'] == 'shell':
if msg['msg_type'] == 'execute_reply':
status = msg['content']['status']
if status == 'ok':
self.log.msg('Code Execute: Status OK', action='code-execute', phase='success')
break
else:
self.log.msg('Code Execute: Status {status}', action='code-execute', phase='error')
raise OperationError()
if msg['channel'] == 'iopub':
response = None
msg_type = msg.get('msg_type')
# don't really know what this is doing
#if msg_type == 'execute_result':
# response = msg['content']['data']['text/plain']
if msg_type == 'error':
traceback = _ansi_escape('\n'.join(msg['content']['traceback']))
self.log.msg('Code Execute: Error', action='code-execute',
phase='error',
traceback=traceback)
raise OperationError()
elif msg_type == 'stream':
response = msg['content']['text']
name = msg['content']['name']
if name == 'stdout':
stdout += response
elif name == 'stderr':
stderr += response
#print(response)
self.log.msg(
'Code Execute: complete',
action='code-execute', phase='complete',
duration=time.monotonic() - exec_start_time)

return stdout, stderr

except Exception as e:
if type(e) is OperationError:
raise
if is_connected:
self.log.msg('Code Execute: Failed {}'.format(str(e)), action='code-execute', phase='failure')
else:
self.log.msg('WS: Failed {}'.format(str(e)), action='kernel-connect', phase='failure')
raise OperationError()


async def list_notebooks(self):
code = """
Expand All @@ -283,28 +170,6 @@ async def list_notebooks(self):
stdout, stderr = await self.run_code(code)
return json.loads(stdout)

async def execute_notebook(self, notebook_filename, timeout=600,
env_vars={}):
env_var_str = str(env_vars)
# https://nbconvert.readthedocs.io/en/latest/execute_api.html
code = f"""
import os
import nbformat
os.environ.update({env_var_str})
from nbconvert.preprocessors import ExecutePreprocessor
ep = ExecutePreprocessor(timeout={timeout})
print("Processing {notebook_filename}")
with open("{notebook_filename}") as f:
nb = nbformat.read(f, as_version=4)
ep.preprocess(nb, dict())
print("OK")
print("Saving {notebook_filename}")
with open("{notebook_filename}", 'w', encoding='utf-8') as f:
nbformat.write(nb, f)
print("OK")
"""
return await self.run_code(code)

async def upload_local_notebook(self, notebook_filename):
nb = open_nb_and_strip_output(notebook_filename)
# probably want to use basename instead
Expand Down
50 changes: 24 additions & 26 deletions binderbot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,32 +57,30 @@ async def main(binder_url, repo, ref, output_dir, nb_timeout,
async with BinderUser(binder_url, repo, ref) as jovyan:
await jovyan.start_binder(timeout=binder_start_timeout)
nb = NotebookClient(jovyan.session, jovyan.notebook_url, jovyan.token, jovyan.log)
await nb.start_kernel()
click.echo(f"✅ Binder and kernel started successfully.")
# could think about asyncifying this whole loop
# for now, we run one notebook at a time to avoid overloading the binder
errors = {}
for fname in filenames:
try:
click.echo(f"⌛️ Uploading {fname}...", nl=False)
await nb.upload_local_notebook(fname)
click.echo("✅")
click.echo(f"⌛️ Executing {fname}...", nl=False)
await nb.execute_notebook(fname, timeout=nb_timeout,
env_vars=extra_env_vars)
click.echo("✅")
click.echo(f"⌛️ Downloading and saving {fname}...", nl=False)
nb_data = await nb.get_contents(fname)
nb = nbformat.from_dict(nb_data)
output_fname = os.path.join(output_dir, fname) if output_dir else fname
with open(output_fname, 'w', encoding='utf-8') as f:
nbformat.write(nb, f)
click.echo("✅")
except Exception as e:
errors[fname] = e
click.echo(f'❌ error running {fname}: {e}')

await nb.stop_kernel()
async with nb.start_kernel() as kernel:
click.echo(f"✅ Binder and kernel started successfully.")
# could think about asyncifying this whole loop
# for now, we run one notebook at a time to avoid overloading the binder
errors = {}
for fname in filenames:
try:
click.echo(f"⌛️ Uploading {fname}...", nl=False)
await nb.upload_local_notebook(fname)
click.echo("✅")
click.echo(f"⌛️ Executing {fname}...", nl=False)
await kernel.execute_notebook(fname, timeout=nb_timeout,
env_vars=extra_env_vars)
click.echo("✅")
click.echo(f"⌛️ Downloading and saving {fname}...", nl=False)
nb_data = await nb.get_contents(fname)
nb = nbformat.from_dict(nb_data)
output_fname = os.path.join(output_dir, fname) if output_dir else fname
with open(output_fname, 'w', encoding='utf-8') as f:
nbformat.write(nb, f)
click.echo("✅")
except Exception as e:
errors[fname] = e
click.echo(f'❌ error running {fname}: {e}')

if len(errors) > 0:
raise RuntimeError(str(errors))
Expand Down
Loading

0 comments on commit 13ac3d4

Please sign in to comment.