diff --git a/binderbot/__init__.py b/binderbot/__init__.py index 411d200..84c00e1 100644 --- a/binderbot/__init__.py +++ b/binderbot/__init__.py @@ -7,3 +7,7 @@ from ._version import get_versions __version__ = get_versions()['version'] del get_versions + + +class OperationError(Exception): + pass \ No newline at end of file diff --git a/binderbot/binderbot.py b/binderbot/binderbot.py index af125a3..926158f 100644 --- a/binderbot/binderbot.py +++ b/binderbot/binderbot.py @@ -17,10 +17,14 @@ import json import textwrap import re +from contextlib import asynccontextmanager import nbformat from nbconvert.preprocessors import ClearOutputPreprocessor +from binderbot import OperationError +from .kernel import Kernel + logger = structlog.get_logger() # https://stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python @@ -28,10 +32,6 @@ def _ansi_escape(text): return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', text) -class OperationError(Exception): - pass - - class BinderUser: class States(Enum): CLEAR = 1 @@ -102,13 +102,13 @@ async def shutdown_binder(self): # So we can't make requests to the hub API. # FIXME: Provide hub auth tokens from binderhub API nbclient = NotebookClient(self.session, self.notebook_url, self.token, self.log) - await nbclient.start_kernel() - await nbclient.run_code(""" - import os - import signal - # FIXME: Wait a bit, and send SIGKILL otherwise - os.kill(1, signal.SIGTERM) - """) + async with nbclient.start_kernel() as kernel: + await kernel.run_code(""" + import os + import signal + # FIXME: Wait a bit, and send SIGKILL otherwise + os.kill(1, signal.SIGTERM) + """) class NotebookClient: @@ -126,6 +126,7 @@ def __init__(self, session: aiohttp.ClientSession, url: URL, token: str, log: st 'Authorization': f'token {self.token}' } + @asynccontextmanager async def start_kernel(self): self.log.msg('Kernel: Starting', action='kernel-start', phase='start') start_time = time.monotonic() @@ -139,25 +140,13 @@ async def start_kernel(self): if resp.status != 201: self.log.msg('Kernel: Start failed', action='kernel-start', phase='failed') raise OperationError() - self.kernel_id = (await resp.json())['id'] + kernel_id = (await resp.json())['id'] self.log.msg('Kernel: Started', action='kernel-start', phase='complete') - self.state = BinderUser.States.KERNEL_STARTED - - - async def stop_kernel(self): - self.log.msg('Kernel: Stopping', action='kernel-stop', phase='start') - start_time = time.monotonic() + k = Kernel(self, kernel_id) try: - resp = await self.session.delete(self.url / 'api/kernels' / self.kernel_id, headers=self.auth_headers) - except Exception as e: - self.log.msg('Kernel:Failed Stopped {}'.format(str(e)), action='kernel-stop', phase='failed') - raise OperationError() - - if resp.status != 204: - self.log.msg('Kernel:Failed Stopped {}'.format(str(resp)), action='kernel-stop', phase='failed') - raise OperationError() - - self.log.msg('Kernel: Stopped', action='kernel-stop', phase='complete') + yield k + finally: + await k.stop_kernel() # https://github.com/jupyter/jupyter/wiki/Jupyter-Notebook-Server-API#notebook-and-file-contents-api async def get_contents(self, path): @@ -171,108 +160,6 @@ async def put_contents(self, path, nb_data): json=data, headers=self.auth_headers) resp.raise_for_status() - def request_execute_code(self, msg_id, code): - return { - "header": { - "msg_id": msg_id, - "username": "jovyan", - "msg_type": "execute_request", - "version": "5.2" - }, - "metadata": {}, - "content": { - "code": textwrap.dedent(code), - "silent": False, - "store_history": True, - "user_expressions": {}, - "allow_stdin": True, - "stop_on_error": True - }, - "buffers": [], - "parent_header": {}, - "channel": "shell" - } - - - async def run_code(self, code): - """Run code and return stdout, stderr.""" - channel_url = self.url / 'api/kernels' / self.kernel_id / 'channels' - self.log.msg('WS: Connecting', action='kernel-connect', phase='start') - is_connected = False - try: - async with self.session.ws_connect(channel_url) as ws: - is_connected = True - self.log.msg('WS: Connected', action='kernel-connect', phase='complete') - start_time = time.monotonic() - self.log.msg('Code Execute: Started', action='code-execute', phase='start') - exec_start_time = time.monotonic() - msg_id = str(uuid.uuid4()) - await ws.send_json(self.request_execute_code(msg_id, code)) - - stdout = '' - stderr = '' - - async for msg_text in ws: - if msg_text.type != aiohttp.WSMsgType.TEXT: - self.log.msg( - 'WS: Unexpected message type', - action='code-execute', phase='failure', - message_type=msg_text.type, message=str(msg_text), - duration=time.monotonic() - exec_start_time - ) - raise OperationError() - - msg = msg_text.json() - - if 'parent_header' in msg and msg['parent_header'].get('msg_id') == msg_id: - # These are responses to our request - self.log.msg(f'Code Execute: Receive response', action='code-execute', phase='receive-stream', - channel=msg['channel'], msg_type=msg['msg_type']) - if msg['channel'] == 'shell': - if msg['msg_type'] == 'execute_reply': - status = msg['content']['status'] - if status == 'ok': - self.log.msg('Code Execute: Status OK', action='code-execute', phase='success') - break - else: - self.log.msg('Code Execute: Status {status}', action='code-execute', phase='error') - raise OperationError() - if msg['channel'] == 'iopub': - response = None - msg_type = msg.get('msg_type') - # don't really know what this is doing - #if msg_type == 'execute_result': - # response = msg['content']['data']['text/plain'] - if msg_type == 'error': - traceback = _ansi_escape('\n'.join(msg['content']['traceback'])) - self.log.msg('Code Execute: Error', action='code-execute', - phase='error', - traceback=traceback) - raise OperationError() - elif msg_type == 'stream': - response = msg['content']['text'] - name = msg['content']['name'] - if name == 'stdout': - stdout += response - elif name == 'stderr': - stderr += response - #print(response) - self.log.msg( - 'Code Execute: complete', - action='code-execute', phase='complete', - duration=time.monotonic() - exec_start_time) - - return stdout, stderr - - except Exception as e: - if type(e) is OperationError: - raise - if is_connected: - self.log.msg('Code Execute: Failed {}'.format(str(e)), action='code-execute', phase='failure') - else: - self.log.msg('WS: Failed {}'.format(str(e)), action='kernel-connect', phase='failure') - raise OperationError() - async def list_notebooks(self): code = """ @@ -283,28 +170,6 @@ async def list_notebooks(self): stdout, stderr = await self.run_code(code) return json.loads(stdout) - async def execute_notebook(self, notebook_filename, timeout=600, - env_vars={}): - env_var_str = str(env_vars) - # https://nbconvert.readthedocs.io/en/latest/execute_api.html - code = f""" - import os - import nbformat - os.environ.update({env_var_str}) - from nbconvert.preprocessors import ExecutePreprocessor - ep = ExecutePreprocessor(timeout={timeout}) - print("Processing {notebook_filename}") - with open("{notebook_filename}") as f: - nb = nbformat.read(f, as_version=4) - ep.preprocess(nb, dict()) - print("OK") - print("Saving {notebook_filename}") - with open("{notebook_filename}", 'w', encoding='utf-8') as f: - nbformat.write(nb, f) - print("OK") - """ - return await self.run_code(code) - async def upload_local_notebook(self, notebook_filename): nb = open_nb_and_strip_output(notebook_filename) # probably want to use basename instead diff --git a/binderbot/cli.py b/binderbot/cli.py index cabed1e..9ce0fa7 100644 --- a/binderbot/cli.py +++ b/binderbot/cli.py @@ -57,32 +57,30 @@ async def main(binder_url, repo, ref, output_dir, nb_timeout, async with BinderUser(binder_url, repo, ref) as jovyan: await jovyan.start_binder(timeout=binder_start_timeout) nb = NotebookClient(jovyan.session, jovyan.notebook_url, jovyan.token, jovyan.log) - await nb.start_kernel() - click.echo(f"✅ Binder and kernel started successfully.") - # could think about asyncifying this whole loop - # for now, we run one notebook at a time to avoid overloading the binder - errors = {} - for fname in filenames: - try: - click.echo(f"⌛️ Uploading {fname}...", nl=False) - await nb.upload_local_notebook(fname) - click.echo("✅") - click.echo(f"⌛️ Executing {fname}...", nl=False) - await nb.execute_notebook(fname, timeout=nb_timeout, - env_vars=extra_env_vars) - click.echo("✅") - click.echo(f"⌛️ Downloading and saving {fname}...", nl=False) - nb_data = await nb.get_contents(fname) - nb = nbformat.from_dict(nb_data) - output_fname = os.path.join(output_dir, fname) if output_dir else fname - with open(output_fname, 'w', encoding='utf-8') as f: - nbformat.write(nb, f) - click.echo("✅") - except Exception as e: - errors[fname] = e - click.echo(f'❌ error running {fname}: {e}') - - await nb.stop_kernel() + async with nb.start_kernel() as kernel: + click.echo(f"✅ Binder and kernel started successfully.") + # could think about asyncifying this whole loop + # for now, we run one notebook at a time to avoid overloading the binder + errors = {} + for fname in filenames: + try: + click.echo(f"⌛️ Uploading {fname}...", nl=False) + await nb.upload_local_notebook(fname) + click.echo("✅") + click.echo(f"⌛️ Executing {fname}...", nl=False) + await kernel.execute_notebook(fname, timeout=nb_timeout, + env_vars=extra_env_vars) + click.echo("✅") + click.echo(f"⌛️ Downloading and saving {fname}...", nl=False) + nb_data = await nb.get_contents(fname) + nb = nbformat.from_dict(nb_data) + output_fname = os.path.join(output_dir, fname) if output_dir else fname + with open(output_fname, 'w', encoding='utf-8') as f: + nbformat.write(nb, f) + click.echo("✅") + except Exception as e: + errors[fname] = e + click.echo(f'❌ error running {fname}: {e}') if len(errors) > 0: raise RuntimeError(str(errors)) diff --git a/binderbot/kernel.py b/binderbot/kernel.py new file mode 100644 index 0000000..f2fd2ea --- /dev/null +++ b/binderbot/kernel.py @@ -0,0 +1,156 @@ +from . import OperationError +import uuid +import time +import aiohttp +import textwrap + +class Kernel: + """ + Represents a running jupyter kernel + """ + + def __init__(self, nbclient, kernel_id: str): + self.nbclient = nbclient + self.kernel_id = kernel_id + self.log = nbclient.log + + def request_execute_code(self, msg_id, code): + return { + "header": { + "msg_id": msg_id, + "username": "jovyan", + "msg_type": "execute_request", + "version": "5.2" + }, + "metadata": {}, + "content": { + "code": textwrap.dedent(code), + "silent": False, + "store_history": True, + "user_expressions": {}, + "allow_stdin": True, + "stop_on_error": True + }, + "buffers": [], + "parent_header": {}, + "channel": "shell" + } + + async def run_code(self, code): + """Run code and return stdout, stderr.""" + channel_url = self.nbclient.url / 'api/kernels' / self.kernel_id / 'channels' + self.log.msg('WS: Connecting', action='kernel-connect', phase='start') + is_connected = False + try: + async with self.nbclient.session.ws_connect(channel_url) as ws: + is_connected = True + self.log.msg('WS: Connected', action='kernel-connect', phase='complete') + start_time = time.monotonic() + self.log.msg('Code Execute: Started', action='code-execute', phase='start') + exec_start_time = time.monotonic() + msg_id = str(uuid.uuid4()) + await ws.send_json(self.request_execute_code(msg_id, code)) + + stdout = '' + stderr = '' + + async for msg_text in ws: + if msg_text.type != aiohttp.WSMsgType.TEXT: + self.log.msg( + 'WS: Unexpected message type', + action='code-execute', phase='failure', + message_type=msg_text.type, message=str(msg_text), + duration=time.monotonic() - exec_start_time + ) + raise OperationError() + + msg = msg_text.json() + + if 'parent_header' in msg and msg['parent_header'].get('msg_id') == msg_id: + # These are responses to our request + self.log.msg(f'Code Execute: Receive response', action='code-execute', phase='receive-stream', + channel=msg['channel'], msg_type=msg['msg_type']) + if msg['channel'] == 'shell': + if msg['msg_type'] == 'execute_reply': + status = msg['content']['status'] + if status == 'ok': + self.log.msg('Code Execute: Status OK', action='code-execute', phase='success') + break + else: + self.log.msg('Code Execute: Status {status}', action='code-execute', phase='error') + raise OperationError() + if msg['channel'] == 'iopub': + response = None + msg_type = msg.get('msg_type') + # don't really know what this is doing + #if msg_type == 'execute_result': + # response = msg['content']['data']['text/plain'] + if msg_type == 'error': + traceback = _ansi_escape('\n'.join(msg['content']['traceback'])) + self.log.msg('Code Execute: Error', action='code-execute', + phase='error', + traceback=traceback) + raise OperationError() + elif msg_type == 'stream': + response = msg['content']['text'] + name = msg['content']['name'] + if name == 'stdout': + stdout += response + elif name == 'stderr': + stderr += response + #print(response) + self.log.msg( + 'Code Execute: complete', + action='code-execute', phase='complete', + duration=time.monotonic() - exec_start_time) + + return stdout, stderr + + except Exception as e: + if type(e) is OperationError: + raise + if is_connected: + self.log.msg('Code Execute: Failed {}'.format(str(e)), action='code-execute', phase='failure') + else: + self.log.msg('WS: Failed {}'.format(str(e)), action='kernel-connect', phase='failure') + raise OperationError() + + + + async def execute_notebook(self, notebook_filename, timeout=600, + env_vars={}): + env_var_str = str(env_vars) + # https://nbconvert.readthedocs.io/en/latest/execute_api.html + code = f""" + import os + import nbformat + os.environ.update({env_var_str}) + from nbconvert.preprocessors import ExecutePreprocessor + ep = ExecutePreprocessor(timeout={timeout}) + print("Processing {notebook_filename}") + with open("{notebook_filename}") as f: + nb = nbformat.read(f, as_version=4) + ep.preprocess(nb, dict()) + print("OK") + print("Saving {notebook_filename}") + with open("{notebook_filename}", 'w', encoding='utf-8') as f: + nbformat.write(nb, f) + print("OK") + """ + return await self.run_code(code) + + + async def stop_kernel(self): + self.log.msg('Kernel: Stopping', action='kernel-stop', phase='start') + start_time = time.monotonic() + try: + resp = await self.nbclient.session.delete(self.nbclient.url / 'api/kernels' / self.kernel_id, headers=self.nbclient.auth_headers) + except Exception as e: + self.log.msg('Kernel:Failed Stopped {}'.format(str(e)), action='kernel-stop', phase='failed') + raise OperationError() + + if resp.status != 204: + self.log.msg('Kernel:Failed Stopped {}'.format(str(resp)), action='kernel-stop', phase='failed') + raise OperationError() + + self.log.msg('Kernel: Stopped', action='kernel-stop', phase='complete') diff --git a/tests/test_binderbot.py b/tests/test_binderbot.py index d53cf67..0e62207 100644 --- a/tests/test_binderbot.py +++ b/tests/test_binderbot.py @@ -130,15 +130,14 @@ async def test_nbclient_run_code(local_notebook: LocalNotebookServer): async with aiohttp.ClientSession() as session: nbclient = binderbot.NotebookClient(session, local_notebook.url, local_notebook.token, log) - await nbclient.start_kernel() - stdout, stderr = await nbclient.run_code(f""" - print('hi') - """) + async with nbclient.start_kernel() as kernel: + stdout, stderr = await kernel.run_code(f""" + print('hi') + """) assert stderr.strip() == "" assert stdout.strip() == 'hi' - await nbclient.stop_kernel() @pytest.mark.asyncio async def test_upload(local_notebook: LocalNotebookServer): @@ -146,21 +145,19 @@ async def test_upload(local_notebook: LocalNotebookServer): async with aiohttp.ClientSession() as session: nbclient = binderbot.NotebookClient(session, local_notebook.url, local_notebook.token, log) - await nbclient.start_kernel() fname = "example-notebook.ipynb" filepath = local_notebook.cwd / fname input_notebook = make_code_notebook(["print('hello')"]) with open(filepath, 'w', encoding='utf-8') as f: nbformat.write(input_notebook, f) - await nbclient.execute_notebook( - fname, - timeout=60, - ) + async with nbclient.start_kernel() as kernel: + await kernel.execute_notebook( + fname, + timeout=60, + ) nb_data = await nbclient.get_contents(fname) nb = nbformat.from_dict(nb_data) cell1 = nb['cells'][0]['outputs'][0]['text'] - assert cell1 == "hello\n" - - await nbclient.stop_kernel() \ No newline at end of file + assert cell1 == "hello\n" \ No newline at end of file