From 13ac3d4d5e73b07aff62e32b566bb5e15d5034dc Mon Sep 17 00:00:00 2001 From: YuviPanda Date: Sat, 25 Apr 2020 17:14:23 +0530 Subject: [PATCH] Split kernel operations into its own class Each NotebookClient can now produce multiple kernels, thus matching the notebook REST API. These can be used with async context managers, so we always clean up the kernel when done. We move OperationError out to break cyclic imports --- binderbot/__init__.py | 4 + binderbot/binderbot.py | 169 ++++------------------------------------ binderbot/cli.py | 50 ++++++------ binderbot/kernel.py | 156 +++++++++++++++++++++++++++++++++++++ tests/test_binderbot.py | 23 +++--- 5 files changed, 211 insertions(+), 191 deletions(-) create mode 100644 binderbot/kernel.py diff --git a/binderbot/__init__.py b/binderbot/__init__.py index 411d200..84c00e1 100644 --- a/binderbot/__init__.py +++ b/binderbot/__init__.py @@ -7,3 +7,7 @@ from ._version import get_versions __version__ = get_versions()['version'] del get_versions + + +class OperationError(Exception): + pass \ No newline at end of file diff --git a/binderbot/binderbot.py b/binderbot/binderbot.py index af125a3..926158f 100644 --- a/binderbot/binderbot.py +++ b/binderbot/binderbot.py @@ -17,10 +17,14 @@ import json import textwrap import re +from contextlib import asynccontextmanager import nbformat from nbconvert.preprocessors import ClearOutputPreprocessor +from binderbot import OperationError +from .kernel import Kernel + logger = structlog.get_logger() # https://stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python @@ -28,10 +32,6 @@ def _ansi_escape(text): return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', text) -class OperationError(Exception): - pass - - class BinderUser: class States(Enum): CLEAR = 1 @@ -102,13 +102,13 @@ async def shutdown_binder(self): # So we can't make requests to the hub API. # FIXME: Provide hub auth tokens from binderhub API nbclient = NotebookClient(self.session, self.notebook_url, self.token, self.log) - await nbclient.start_kernel() - await nbclient.run_code(""" - import os - import signal - # FIXME: Wait a bit, and send SIGKILL otherwise - os.kill(1, signal.SIGTERM) - """) + async with nbclient.start_kernel() as kernel: + await kernel.run_code(""" + import os + import signal + # FIXME: Wait a bit, and send SIGKILL otherwise + os.kill(1, signal.SIGTERM) + """) class NotebookClient: @@ -126,6 +126,7 @@ def __init__(self, session: aiohttp.ClientSession, url: URL, token: str, log: st 'Authorization': f'token {self.token}' } + @asynccontextmanager async def start_kernel(self): self.log.msg('Kernel: Starting', action='kernel-start', phase='start') start_time = time.monotonic() @@ -139,25 +140,13 @@ async def start_kernel(self): if resp.status != 201: self.log.msg('Kernel: Start failed', action='kernel-start', phase='failed') raise OperationError() - self.kernel_id = (await resp.json())['id'] + kernel_id = (await resp.json())['id'] self.log.msg('Kernel: Started', action='kernel-start', phase='complete') - self.state = BinderUser.States.KERNEL_STARTED - - - async def stop_kernel(self): - self.log.msg('Kernel: Stopping', action='kernel-stop', phase='start') - start_time = time.monotonic() + k = Kernel(self, kernel_id) try: - resp = await self.session.delete(self.url / 'api/kernels' / self.kernel_id, headers=self.auth_headers) - except Exception as e: - self.log.msg('Kernel:Failed Stopped {}'.format(str(e)), action='kernel-stop', phase='failed') - raise OperationError() - - if resp.status != 204: - self.log.msg('Kernel:Failed Stopped {}'.format(str(resp)), action='kernel-stop', phase='failed') - raise OperationError() - - self.log.msg('Kernel: Stopped', action='kernel-stop', phase='complete') + yield k + finally: + await k.stop_kernel() # https://github.com/jupyter/jupyter/wiki/Jupyter-Notebook-Server-API#notebook-and-file-contents-api async def get_contents(self, path): @@ -171,108 +160,6 @@ async def put_contents(self, path, nb_data): json=data, headers=self.auth_headers) resp.raise_for_status() - def request_execute_code(self, msg_id, code): - return { - "header": { - "msg_id": msg_id, - "username": "jovyan", - "msg_type": "execute_request", - "version": "5.2" - }, - "metadata": {}, - "content": { - "code": textwrap.dedent(code), - "silent": False, - "store_history": True, - "user_expressions": {}, - "allow_stdin": True, - "stop_on_error": True - }, - "buffers": [], - "parent_header": {}, - "channel": "shell" - } - - - async def run_code(self, code): - """Run code and return stdout, stderr.""" - channel_url = self.url / 'api/kernels' / self.kernel_id / 'channels' - self.log.msg('WS: Connecting', action='kernel-connect', phase='start') - is_connected = False - try: - async with self.session.ws_connect(channel_url) as ws: - is_connected = True - self.log.msg('WS: Connected', action='kernel-connect', phase='complete') - start_time = time.monotonic() - self.log.msg('Code Execute: Started', action='code-execute', phase='start') - exec_start_time = time.monotonic() - msg_id = str(uuid.uuid4()) - await ws.send_json(self.request_execute_code(msg_id, code)) - - stdout = '' - stderr = '' - - async for msg_text in ws: - if msg_text.type != aiohttp.WSMsgType.TEXT: - self.log.msg( - 'WS: Unexpected message type', - action='code-execute', phase='failure', - message_type=msg_text.type, message=str(msg_text), - duration=time.monotonic() - exec_start_time - ) - raise OperationError() - - msg = msg_text.json() - - if 'parent_header' in msg and msg['parent_header'].get('msg_id') == msg_id: - # These are responses to our request - self.log.msg(f'Code Execute: Receive response', action='code-execute', phase='receive-stream', - channel=msg['channel'], msg_type=msg['msg_type']) - if msg['channel'] == 'shell': - if msg['msg_type'] == 'execute_reply': - status = msg['content']['status'] - if status == 'ok': - self.log.msg('Code Execute: Status OK', action='code-execute', phase='success') - break - else: - self.log.msg('Code Execute: Status {status}', action='code-execute', phase='error') - raise OperationError() - if msg['channel'] == 'iopub': - response = None - msg_type = msg.get('msg_type') - # don't really know what this is doing - #if msg_type == 'execute_result': - # response = msg['content']['data']['text/plain'] - if msg_type == 'error': - traceback = _ansi_escape('\n'.join(msg['content']['traceback'])) - self.log.msg('Code Execute: Error', action='code-execute', - phase='error', - traceback=traceback) - raise OperationError() - elif msg_type == 'stream': - response = msg['content']['text'] - name = msg['content']['name'] - if name == 'stdout': - stdout += response - elif name == 'stderr': - stderr += response - #print(response) - self.log.msg( - 'Code Execute: complete', - action='code-execute', phase='complete', - duration=time.monotonic() - exec_start_time) - - return stdout, stderr - - except Exception as e: - if type(e) is OperationError: - raise - if is_connected: - self.log.msg('Code Execute: Failed {}'.format(str(e)), action='code-execute', phase='failure') - else: - self.log.msg('WS: Failed {}'.format(str(e)), action='kernel-connect', phase='failure') - raise OperationError() - async def list_notebooks(self): code = """ @@ -283,28 +170,6 @@ async def list_notebooks(self): stdout, stderr = await self.run_code(code) return json.loads(stdout) - async def execute_notebook(self, notebook_filename, timeout=600, - env_vars={}): - env_var_str = str(env_vars) - # https://nbconvert.readthedocs.io/en/latest/execute_api.html - code = f""" - import os - import nbformat - os.environ.update({env_var_str}) - from nbconvert.preprocessors import ExecutePreprocessor - ep = ExecutePreprocessor(timeout={timeout}) - print("Processing {notebook_filename}") - with open("{notebook_filename}") as f: - nb = nbformat.read(f, as_version=4) - ep.preprocess(nb, dict()) - print("OK") - print("Saving {notebook_filename}") - with open("{notebook_filename}", 'w', encoding='utf-8') as f: - nbformat.write(nb, f) - print("OK") - """ - return await self.run_code(code) - async def upload_local_notebook(self, notebook_filename): nb = open_nb_and_strip_output(notebook_filename) # probably want to use basename instead diff --git a/binderbot/cli.py b/binderbot/cli.py index cabed1e..9ce0fa7 100644 --- a/binderbot/cli.py +++ b/binderbot/cli.py @@ -57,32 +57,30 @@ async def main(binder_url, repo, ref, output_dir, nb_timeout, async with BinderUser(binder_url, repo, ref) as jovyan: await jovyan.start_binder(timeout=binder_start_timeout) nb = NotebookClient(jovyan.session, jovyan.notebook_url, jovyan.token, jovyan.log) - await nb.start_kernel() - click.echo(f"✅ Binder and kernel started successfully.") - # could think about asyncifying this whole loop - # for now, we run one notebook at a time to avoid overloading the binder - errors = {} - for fname in filenames: - try: - click.echo(f"⌛️ Uploading {fname}...", nl=False) - await nb.upload_local_notebook(fname) - click.echo("✅") - click.echo(f"⌛️ Executing {fname}...", nl=False) - await nb.execute_notebook(fname, timeout=nb_timeout, - env_vars=extra_env_vars) - click.echo("✅") - click.echo(f"⌛️ Downloading and saving {fname}...", nl=False) - nb_data = await nb.get_contents(fname) - nb = nbformat.from_dict(nb_data) - output_fname = os.path.join(output_dir, fname) if output_dir else fname - with open(output_fname, 'w', encoding='utf-8') as f: - nbformat.write(nb, f) - click.echo("✅") - except Exception as e: - errors[fname] = e - click.echo(f'❌ error running {fname}: {e}') - - await nb.stop_kernel() + async with nb.start_kernel() as kernel: + click.echo(f"✅ Binder and kernel started successfully.") + # could think about asyncifying this whole loop + # for now, we run one notebook at a time to avoid overloading the binder + errors = {} + for fname in filenames: + try: + click.echo(f"⌛️ Uploading {fname}...", nl=False) + await nb.upload_local_notebook(fname) + click.echo("✅") + click.echo(f"⌛️ Executing {fname}...", nl=False) + await kernel.execute_notebook(fname, timeout=nb_timeout, + env_vars=extra_env_vars) + click.echo("✅") + click.echo(f"⌛️ Downloading and saving {fname}...", nl=False) + nb_data = await nb.get_contents(fname) + nb = nbformat.from_dict(nb_data) + output_fname = os.path.join(output_dir, fname) if output_dir else fname + with open(output_fname, 'w', encoding='utf-8') as f: + nbformat.write(nb, f) + click.echo("✅") + except Exception as e: + errors[fname] = e + click.echo(f'❌ error running {fname}: {e}') if len(errors) > 0: raise RuntimeError(str(errors)) diff --git a/binderbot/kernel.py b/binderbot/kernel.py new file mode 100644 index 0000000..f2fd2ea --- /dev/null +++ b/binderbot/kernel.py @@ -0,0 +1,156 @@ +from . import OperationError +import uuid +import time +import aiohttp +import textwrap + +class Kernel: + """ + Represents a running jupyter kernel + """ + + def __init__(self, nbclient, kernel_id: str): + self.nbclient = nbclient + self.kernel_id = kernel_id + self.log = nbclient.log + + def request_execute_code(self, msg_id, code): + return { + "header": { + "msg_id": msg_id, + "username": "jovyan", + "msg_type": "execute_request", + "version": "5.2" + }, + "metadata": {}, + "content": { + "code": textwrap.dedent(code), + "silent": False, + "store_history": True, + "user_expressions": {}, + "allow_stdin": True, + "stop_on_error": True + }, + "buffers": [], + "parent_header": {}, + "channel": "shell" + } + + async def run_code(self, code): + """Run code and return stdout, stderr.""" + channel_url = self.nbclient.url / 'api/kernels' / self.kernel_id / 'channels' + self.log.msg('WS: Connecting', action='kernel-connect', phase='start') + is_connected = False + try: + async with self.nbclient.session.ws_connect(channel_url) as ws: + is_connected = True + self.log.msg('WS: Connected', action='kernel-connect', phase='complete') + start_time = time.monotonic() + self.log.msg('Code Execute: Started', action='code-execute', phase='start') + exec_start_time = time.monotonic() + msg_id = str(uuid.uuid4()) + await ws.send_json(self.request_execute_code(msg_id, code)) + + stdout = '' + stderr = '' + + async for msg_text in ws: + if msg_text.type != aiohttp.WSMsgType.TEXT: + self.log.msg( + 'WS: Unexpected message type', + action='code-execute', phase='failure', + message_type=msg_text.type, message=str(msg_text), + duration=time.monotonic() - exec_start_time + ) + raise OperationError() + + msg = msg_text.json() + + if 'parent_header' in msg and msg['parent_header'].get('msg_id') == msg_id: + # These are responses to our request + self.log.msg(f'Code Execute: Receive response', action='code-execute', phase='receive-stream', + channel=msg['channel'], msg_type=msg['msg_type']) + if msg['channel'] == 'shell': + if msg['msg_type'] == 'execute_reply': + status = msg['content']['status'] + if status == 'ok': + self.log.msg('Code Execute: Status OK', action='code-execute', phase='success') + break + else: + self.log.msg('Code Execute: Status {status}', action='code-execute', phase='error') + raise OperationError() + if msg['channel'] == 'iopub': + response = None + msg_type = msg.get('msg_type') + # don't really know what this is doing + #if msg_type == 'execute_result': + # response = msg['content']['data']['text/plain'] + if msg_type == 'error': + traceback = _ansi_escape('\n'.join(msg['content']['traceback'])) + self.log.msg('Code Execute: Error', action='code-execute', + phase='error', + traceback=traceback) + raise OperationError() + elif msg_type == 'stream': + response = msg['content']['text'] + name = msg['content']['name'] + if name == 'stdout': + stdout += response + elif name == 'stderr': + stderr += response + #print(response) + self.log.msg( + 'Code Execute: complete', + action='code-execute', phase='complete', + duration=time.monotonic() - exec_start_time) + + return stdout, stderr + + except Exception as e: + if type(e) is OperationError: + raise + if is_connected: + self.log.msg('Code Execute: Failed {}'.format(str(e)), action='code-execute', phase='failure') + else: + self.log.msg('WS: Failed {}'.format(str(e)), action='kernel-connect', phase='failure') + raise OperationError() + + + + async def execute_notebook(self, notebook_filename, timeout=600, + env_vars={}): + env_var_str = str(env_vars) + # https://nbconvert.readthedocs.io/en/latest/execute_api.html + code = f""" + import os + import nbformat + os.environ.update({env_var_str}) + from nbconvert.preprocessors import ExecutePreprocessor + ep = ExecutePreprocessor(timeout={timeout}) + print("Processing {notebook_filename}") + with open("{notebook_filename}") as f: + nb = nbformat.read(f, as_version=4) + ep.preprocess(nb, dict()) + print("OK") + print("Saving {notebook_filename}") + with open("{notebook_filename}", 'w', encoding='utf-8') as f: + nbformat.write(nb, f) + print("OK") + """ + return await self.run_code(code) + + + async def stop_kernel(self): + self.log.msg('Kernel: Stopping', action='kernel-stop', phase='start') + start_time = time.monotonic() + try: + resp = await self.nbclient.session.delete(self.nbclient.url / 'api/kernels' / self.kernel_id, headers=self.nbclient.auth_headers) + except Exception as e: + self.log.msg('Kernel:Failed Stopped {}'.format(str(e)), action='kernel-stop', phase='failed') + raise OperationError() + + if resp.status != 204: + self.log.msg('Kernel:Failed Stopped {}'.format(str(resp)), action='kernel-stop', phase='failed') + raise OperationError() + + self.log.msg('Kernel: Stopped', action='kernel-stop', phase='complete') diff --git a/tests/test_binderbot.py b/tests/test_binderbot.py index d53cf67..0e62207 100644 --- a/tests/test_binderbot.py +++ b/tests/test_binderbot.py @@ -130,15 +130,14 @@ async def test_nbclient_run_code(local_notebook: LocalNotebookServer): async with aiohttp.ClientSession() as session: nbclient = binderbot.NotebookClient(session, local_notebook.url, local_notebook.token, log) - await nbclient.start_kernel() - stdout, stderr = await nbclient.run_code(f""" - print('hi') - """) + async with nbclient.start_kernel() as kernel: + stdout, stderr = await kernel.run_code(f""" + print('hi') + """) assert stderr.strip() == "" assert stdout.strip() == 'hi' - await nbclient.stop_kernel() @pytest.mark.asyncio async def test_upload(local_notebook: LocalNotebookServer): @@ -146,21 +145,19 @@ async def test_upload(local_notebook: LocalNotebookServer): async with aiohttp.ClientSession() as session: nbclient = binderbot.NotebookClient(session, local_notebook.url, local_notebook.token, log) - await nbclient.start_kernel() fname = "example-notebook.ipynb" filepath = local_notebook.cwd / fname input_notebook = make_code_notebook(["print('hello')"]) with open(filepath, 'w', encoding='utf-8') as f: nbformat.write(input_notebook, f) - await nbclient.execute_notebook( - fname, - timeout=60, - ) + async with nbclient.start_kernel() as kernel: + await kernel.execute_notebook( + fname, + timeout=60, + ) nb_data = await nbclient.get_contents(fname) nb = nbformat.from_dict(nb_data) cell1 = nb['cells'][0]['outputs'][0]['text'] - assert cell1 == "hello\n" - - await nbclient.stop_kernel() \ No newline at end of file + assert cell1 == "hello\n" \ No newline at end of file