From 4a6fe7b54184d509c18ad3fc9682248c02971285 Mon Sep 17 00:00:00 2001 From: Min RK Date: Tue, 4 Feb 2020 11:41:13 +0100 Subject: [PATCH 01/10] add skeleton v2 handlers --- binderhub/v2/__init__.py | 0 binderhub/v2/handlers.py | 50 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 binderhub/v2/__init__.py create mode 100644 binderhub/v2/handlers.py diff --git a/binderhub/v2/__init__.py b/binderhub/v2/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/binderhub/v2/handlers.py b/binderhub/v2/handlers.py new file mode 100644 index 000000000..a7b2008de --- /dev/null +++ b/binderhub/v2/handlers.py @@ -0,0 +1,50 @@ +from ..base import BaseHandler + + +class V2BuildHandler(BaseHandler): + """POST /v2/build/:provider/:spec triggers a build, + + responding with a JWT token containing info + + JWT contents: + + { + username: Hub user name + servername: Hub server name (usually '') + image: The image to be launched + build-id: The build id for logging + } + + Response (JSON): + + { + launch_token: opaque token, + events_url: opaque url, + + } + """ + + def post(self, provider, spec): + # generate username + # check for image + # trigger build (if needed) + # reply with: + pass + + +class V2EventsHandler(BaseHandler): + """GET /v2/events/:token logs""" + + def get(self, token): + # decode_jwt to get build-id, image + # get/relay build logs + pass + + +class V2LaunchHandler(BaseHandler): + def post(self, token): + # decode_jwt to get username, servername, image + # validate??? + # request launch + # redirect to launch URL + pass From 73ed68113ee7f8c384cdc19bdd9ea588741fb180 Mon Sep 17 00:00:00 2001 From: Min RK Date: Tue, 4 Feb 2020 11:48:31 +0100 Subject: [PATCH 02/10] skeleton of v2 handlers --- binderhub/v2/__init__.py | 1 + binderhub/v2/handlers.py | 28 +++++++++++++++++++++------- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/binderhub/v2/__init__.py b/binderhub/v2/__init__.py index e69de29bb..2b37d5cd3 100644 --- a/binderhub/v2/__init__.py +++ b/binderhub/v2/__init__.py @@ -0,0 +1 @@ +from .handlers import default_handlers # noqa diff --git a/binderhub/v2/handlers.py b/binderhub/v2/handlers.py index a7b2008de..fc93faded 100644 --- a/binderhub/v2/handlers.py +++ b/binderhub/v2/handlers.py @@ -1,7 +1,7 @@ from ..base import BaseHandler -class V2BuildHandler(BaseHandler): +class BuildHandler(BaseHandler): """POST /v2/build/:provider/:spec triggers a build, responding with a JWT token containing info @@ -19,32 +19,46 @@ class V2BuildHandler(BaseHandler): { launch_token: opaque token, - events_url: opaque url, - + events_url: opaque url with token, } """ + path = r"/v2/build/([^/]+)/(.+)" + def post(self, provider, spec): # generate username # check for image # trigger build (if needed) - # reply with: + # reply with event url, containing jwt token pass -class V2EventsHandler(BaseHandler): - """GET /v2/events/:token logs""" +class EventsHandler(BaseHandler): + """GET /v2/events/:token streams logs + + No transactions should occur as a result of making this request. + Reconnects should always be stateless and harmless. + """ + + path = r"/v2/events/([^/]+)" def get(self, token): # decode_jwt to get build-id, image # get/relay build logs + # final event sends to the launch url pass -class V2LaunchHandler(BaseHandler): +class LaunchHandler(BaseHandler): + """POST /v2/launch/:token triggers the actual launch""" + path = r"/v2/launch/([^/]+)" + def post(self, token): # decode_jwt to get username, servername, image # validate??? # request launch # redirect to launch URL pass + + +default_handlers = [BuildHandler, EventsHandler, LaunchHandler] From 5859f84f1249eb1b33ac4f6194aeee945c92dbe3 Mon Sep 17 00:00:00 2001 From: Min RK Date: Tue, 4 Feb 2020 11:58:06 +0100 Subject: [PATCH 03/10] create base EventStreamHandler class will make things easier when we have two event stream emitters --- binderhub/base.py | 64 ++++++++++++++++++++++++++++++++++++++++++ binderhub/builder.py | 66 ++++---------------------------------------- 2 files changed, 69 insertions(+), 61 deletions(-) diff --git a/binderhub/base.py b/binderhub/base.py index 87f35df10..2288f35dc 100644 --- a/binderhub/base.py +++ b/binderhub/base.py @@ -1,9 +1,12 @@ """Base classes for request handlers""" +import asyncio import json from http.client import responses from tornado import web +from tornado.log import app_log +from tornado.iostream import StreamClosedError from jupyterhub.services.auth import HubOAuthenticated, HubOAuth from . import __version__ as binder_version @@ -103,6 +106,7 @@ def prepare(self): class AboutHandler(BaseHandler): """Serve the about page""" + async def get(self): self.render_template( "about.html", @@ -126,3 +130,63 @@ async def get(self): "binderhub": binder_version, } )) + + +class EventStreamHandler(BaseHandler): + """Base class for event-stream handlers""" + + # emit keepalives every 25 seconds to avoid idle connections being closed + KEEPALIVE_INTERVAL = 25 + build = None + + async def emit(self, data): + """Emit an eventstream event""" + if type(data) is not str: + serialized_data = json.dumps(data) + else: + serialized_data = data + try: + self.write('data: {}\n\n'.format(serialized_data)) + await self.flush() + except StreamClosedError: + app_log.warning("Stream closed while handling %s", self.request.uri) + # raise Finish to halt the handler + raise web.Finish() + + def on_finish(self): + """Stop keepalive when finish has been called""" + self._keepalive = False + + async def keep_alive(self): + """Constantly emit keepalive events + + So that intermediate proxies don't terminate an idle connection + """ + self._keepalive = True + while True: + await asyncio.sleep(self.KEEPALIVE_INTERVAL) + if not self._keepalive: + return + try: + # lines that start with : are comments + # and should be ignored by event consumers + self.write(':keepalive\n\n') + await self.flush() + except StreamClosedError: + return + + def send_error(self, status_code, **kwargs): + """event stream cannot set an error code, so send an error event""" + exc_info = kwargs.get('exc_info') + message = '' + if exc_info: + message = self.extract_message(exc_info) + if not message: + message = responses.get(status_code, 'Unknown HTTP Error') + + # this cannot be async + evt = json.dumps( + {'phase': 'failed', 'status_code': status_code, 'message': message + '\n'} + ) + self.write('data: {}\n\n'.format(evt)) + self.finish() diff --git a/binderhub/builder.py b/binderhub/builder.py index 0ad3e26ef..32698e873 100644 --- a/binderhub/builder.py +++ b/binderhub/builder.py @@ -3,23 +3,21 @@ """ import hashlib -from http.client import responses import json import string import time -import escapism import docker +import escapism from tornado.concurrent import chain_future, Future from tornado import gen -from tornado.web import Finish, authenticated +from tornado.web import authenticated from tornado.queues import Queue -from tornado.iostream import StreamClosedError from tornado.ioloop import IOLoop from tornado.log import app_log from prometheus_client import Counter, Histogram, Gauge -from .base import BaseHandler +from .base import EventStreamHandler from .build import Build, FakeBuild # Separate buckets for builds and launches. @@ -52,71 +50,17 @@ BUILDS_INPROGRESS = Gauge('binderhub_inprogress_builds', 'Builds currently in progress') LAUNCHES_INPROGRESS = Gauge('binderhub_inprogress_launches', 'Launches currently in progress') - -class BuildHandler(BaseHandler): +class BuildHandler(EventStreamHandler): """A handler for working with GitHub.""" - # emit keepalives every 25 seconds to avoid idle connections being closed - KEEPALIVE_INTERVAL = 25 build = None - async def emit(self, data): - """Emit an eventstream event""" - if type(data) is not str: - serialized_data = json.dumps(data) - else: - serialized_data = data - try: - self.write('data: {}\n\n'.format(serialized_data)) - await self.flush() - except StreamClosedError: - app_log.warning("Stream closed while handling %s", self.request.uri) - # raise Finish to halt the handler - raise Finish() - def on_finish(self): - """Stop keepalive when finish has been called""" - self._keepalive = False + super().on_finish() if self.build: # if we have a build, tell it to stop watching self.build.stop() - async def keep_alive(self): - """Constantly emit keepalive events - - So that intermediate proxies don't terminate an idle connection - """ - self._keepalive = True - while True: - await gen.sleep(self.KEEPALIVE_INTERVAL) - if not self._keepalive: - return - try: - # lines that start with : are comments - # and should be ignored by event consumers - self.write(':keepalive\n\n') - await self.flush() - except StreamClosedError: - return - - def send_error(self, status_code, **kwargs): - """event stream cannot set an error code, so send an error event""" - exc_info = kwargs.get('exc_info') - message = '' - if exc_info: - message = self.extract_message(exc_info) - if not message: - message = responses.get(status_code, 'Unknown HTTP Error') - - # this cannot be async - evt = json.dumps({ - 'phase': 'failed', - 'status_code': status_code, - 'message': message + '\n', - }) - self.write('data: {}\n\n'.format(evt)) - self.finish() - def initialize(self): super().initialize() if self.settings['use_registry']: From 601e08853fdb6b9706b008175b82a09eb2349e8c Mon Sep 17 00:00:00 2001 From: Min RK Date: Tue, 4 Feb 2020 13:18:57 +0100 Subject: [PATCH 04/10] let install-hub work with helm 3 --- testing/minikube/install-hub | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/testing/minikube/install-hub b/testing/minikube/install-hub index 246973133..f533a378f 100755 --- a/testing/minikube/install-hub +++ b/testing/minikube/install-hub @@ -43,7 +43,7 @@ jupyterhub = get_hub_chart_dependency() # update the helm repo check_call(['helm', 'repo', 'add', 'jupyterhub', jupyterhub['repository']]) -check_call(['helm', 'repo', 'update', 'jupyterhub']) +check_call(['helm', 'repo', 'update']) # Deploying BinderHub normally automatically deploys JupyterHub from the same # configuration file. @@ -70,11 +70,17 @@ if auth_enabled: auth_conf_file = os.path.join(here, 'jupyterhub-helm-auth-config.yaml') args.extend(['-f', auth_conf_file]) +# ensure namespace exists +namespace_exists = namespace in check_output(['kubectl', 'get', 'namespace']).decode('utf8', 'replace').split() +if not namespace_exists: + check_call(['kubecl', 'create', 'namespace', namespace]) + is_running = name in check_output(['helm', 'list', '-q']).decode('utf8', 'replace').split() if is_running: cmd = ['helm', 'upgrade', name] else: - cmd = ['helm', 'install', f'--name={name}'] + + cmd = ['helm', 'install', f'{name}'] cmd.extend(args) print("\n %s\n" % ' '.join(map(pipes.quote, cmd))) From e4098d7dc467c9bd4e7b7e41c9d4026a55175d5a Mon Sep 17 00:00:00 2001 From: Min RK Date: Tue, 4 Feb 2020 13:44:23 +0100 Subject: [PATCH 05/10] set eventstream headers on event stream base handler --- binderhub/base.py | 6 ++++++ binderhub/builder.py | 3 --- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/binderhub/base.py b/binderhub/base.py index 2288f35dc..05aae3345 100644 --- a/binderhub/base.py +++ b/binderhub/base.py @@ -190,3 +190,9 @@ def send_error(self, status_code, **kwargs): ) self.write('data: {}\n\n'.format(evt)) self.finish() + + def set_default_headers(self): + super().set_default_headers() + self.set_header('content-type', 'text/event-stream') + self.set_header('cache-control', 'no-cache') + diff --git a/binderhub/builder.py b/binderhub/builder.py index 32698e873..090b9b506 100644 --- a/binderhub/builder.py +++ b/binderhub/builder.py @@ -150,9 +150,6 @@ async def get(self, provider_prefix, _unescaped_spec): prefix = '/build/' + provider_prefix spec = self.get_spec_from_request(prefix) - # set up for sending event streams - self.set_header('content-type', 'text/event-stream') - self.set_header('cache-control', 'no-cache') # Verify if the provider is valid for EventSource. # EventSource cannot handle HTTP errors, so we must validate and send From b856d5d34c35c460a12aac905835fa37bf55127b Mon Sep 17 00:00:00 2001 From: Min RK Date: Tue, 4 Feb 2020 14:24:19 +0100 Subject: [PATCH 06/10] split Build into Build and lighter base BuildWatcher --- binderhub/build.py | 200 +++++++++++++++++++++++-------------------- binderhub/builder.py | 9 +- 2 files changed, 115 insertions(+), 94 deletions(-) diff --git a/binderhub/build.py b/binderhub/build.py index f26cc4ba2..d53b74d23 100644 --- a/binderhub/build.py +++ b/binderhub/build.py @@ -15,7 +15,97 @@ from .utils import rendezvous_rank -class Build: +class BuildWatcher: + """Represents what is needed to watch the progress of a build""" + def __init__(self, q, api, name, namespace='default', log_tail_lines=100): + self.q = q + self.api = api + self.name = name + self.namespace = namespace + self.log_tail_lines = log_tail_lines + + self.main_loop = IOLoop.current() + self.stop_event = threading.Event() + + def progress(self, kind, obj): + """Put the current action item into the queue for execution.""" + self.main_loop.add_callback(self.q.put, {'kind': kind, 'payload': obj}) + + def watch(self): + app_log.info("Watching build pod %s", self.name) + while not self.stop_event.is_set(): + w = watch.Watch() + try: + for f in w.stream( + self.api.list_namespaced_pod, + self.namespace, + label_selector="name={}".format(self.name), + timeout_seconds=30, + ): + if f['type'] == 'DELETED': + self.progress('pod.phasechange', 'Deleted') + return + self.pod = f['object'] + if not self.stop_event.is_set(): + self.progress('pod.phasechange', self.pod.status.phase) + if self.pod.status.phase == 'Succeeded': + self.cleanup() + elif self.pod.status.phase == 'Failed': + self.cleanup() + except Exception: + app_log.exception("Error in watch stream for %s", self.name) + raise + finally: + w.stop() + if self.stop_event.is_set(): + app_log.info("Stopping watch of %s", self.name) + return + + def stream_logs(self): + """Stream a pod's logs""" + app_log.info("Watching logs of %s", self.name) + for line in self.api.read_namespaced_pod_log( + self.name, + self.namespace, + follow=True, + tail_lines=self.log_tail_lines, + _preload_content=False): + if self.stop_event.is_set(): + app_log.info("Stopping logs of %s", self.name) + return + # verify that the line is JSON + line = line.decode('utf-8') + try: + json.loads(line) + except ValueError: + # log event wasn't JSON. + # use the line itself as the message with unknown phase. + # We don't know what the right phase is, use 'unknown'. + # If it was a fatal error, presumably a 'failure' + # message will arrive shortly. + app_log.error("log event not json: %r", line) + line = json.dumps({ + 'phase': 'unknown', + 'message': line, + }) + + self.progress('log', line) + else: + app_log.info("Finished streaming logs of %s", self.name) + + def stop(self): + """Stop watching a build""" + self.stop_event.set() + + def cleanup(self): + """Cleanup is called when watch notices that a pod stops + + No-op for BuildWatcher, Build actually removes the pod + """ + pass + + +class Build(BuildWatcher): """Represents a build of a git repository into a docker image. This ultimately maps to a single pod on a kubernetes cluster. Many @@ -39,25 +129,19 @@ class Build: def __init__(self, q, api, name, namespace, repo_url, ref, git_credentials, build_image, image_name, push_secret, memory_limit, docker_host, node_selector, appendix='', log_tail_lines=100, sticky_builds=False): - self.q = q - self.api = api + + super().__init__(q=q, api=api, name=name, namespace=namespace) self.repo_url = repo_url self.ref = ref - self.name = name - self.namespace = namespace self.image_name = image_name self.push_secret = push_secret self.build_image = build_image - self.main_loop = IOLoop.current() self.memory_limit = memory_limit self.docker_host = docker_host self.node_selector = node_selector self.appendix = appendix self.log_tail_lines = log_tail_lines - - self.stop_event = threading.Event() self.git_credentials = git_credentials - self.sticky_builds = sticky_builds self._component_label = "binderhub-build" @@ -89,6 +173,20 @@ def get_cmd(self): return cmd + def cleanup(self): + """Delete a kubernetes pod.""" + try: + self.api.delete_namespaced_pod( + name=self.name, + namespace=self.namespace, + body=client.V1DeleteOptions(grace_period_seconds=0)) + except client.rest.ApiException as e: + if e.status == 404: + # Is ok, someone else has already deleted it + pass + else: + raise + @classmethod def cleanup_builds(cls, kube, namespace, max_age): """Delete stopped build pods and build pods that have aged out""" @@ -146,10 +244,6 @@ def cleanup_builds(cls, kube, namespace, max_age): app_log.info("Deleted %i/%i build pods", deleted, len(builds)) app_log.debug("Build phase summary: %s", json.dumps(phases, sort_keys=True, indent=1)) - def progress(self, kind, obj): - """Put the current action item into the queue for execution.""" - self.main_loop.add_callback(self.q.put, {'kind': kind, 'payload': obj}) - def get_affinity(self): """Determine the affinity term for the build pod. @@ -284,7 +378,7 @@ def submit(self): ) try: - ret = self.api.create_namespaced_pod(self.namespace, self.pod) + self.api.create_namespaced_pod(self.namespace, self.pod) except client.rest.ApiException as e: if e.status == 409: # Someone else created it! @@ -295,84 +389,6 @@ def submit(self): else: app_log.info("Started build %s", self.name) - app_log.info("Watching build pod %s", self.name) - while not self.stop_event.is_set(): - w = watch.Watch() - try: - for f in w.stream( - self.api.list_namespaced_pod, - self.namespace, - label_selector="name={}".format(self.name), - timeout_seconds=30, - ): - if f['type'] == 'DELETED': - self.progress('pod.phasechange', 'Deleted') - return - self.pod = f['object'] - if not self.stop_event.is_set(): - self.progress('pod.phasechange', self.pod.status.phase) - if self.pod.status.phase == 'Succeeded': - self.cleanup() - elif self.pod.status.phase == 'Failed': - self.cleanup() - except Exception as e: - app_log.exception("Error in watch stream for %s", self.name) - raise - finally: - w.stop() - if self.stop_event.is_set(): - app_log.info("Stopping watch of %s", self.name) - return - - def stream_logs(self): - """Stream a pod's logs""" - app_log.info("Watching logs of %s", self.name) - for line in self.api.read_namespaced_pod_log( - self.name, - self.namespace, - follow=True, - tail_lines=self.log_tail_lines, - _preload_content=False): - if self.stop_event.is_set(): - app_log.info("Stopping logs of %s", self.name) - return - # verify that the line is JSON - line = line.decode('utf-8') - try: - json.loads(line) - except ValueError: - # log event wasn't JSON. - # use the line itself as the message with unknown phase. - # We don't know what the right phase is, use 'unknown'. - # If it was a fatal error, presumably a 'failure' - # message will arrive shortly. - app_log.error("log event not json: %r", line) - line = json.dumps({ - 'phase': 'unknown', - 'message': line, - }) - - self.progress('log', line) - else: - app_log.info("Finished streaming logs of %s", self.name) - - def cleanup(self): - """Delete a kubernetes pod.""" - try: - self.api.delete_namespaced_pod( - name=self.name, - namespace=self.namespace, - body=client.V1DeleteOptions(grace_period_seconds=0)) - except client.rest.ApiException as e: - if e.status == 404: - # Is ok, someone else has already deleted it - pass - else: - raise - - def stop(self): - """Stop watching a build""" - self.stop_event.set() class FakeBuild(Build): """ diff --git a/binderhub/builder.py b/binderhub/builder.py index 090b9b506..6f7ce7513 100644 --- a/binderhub/builder.py +++ b/binderhub/builder.py @@ -303,10 +303,15 @@ async def get(self, provider_prefix, _unescaped_spec): with BUILDS_INPROGRESS.track_inprogress(): build_starttime = time.perf_counter() pool = self.settings['build_pool'] + # Start building - submit_future = pool.submit(build.submit) + def submit_and_watch(): + build.submit() + build.watch() + + submit_future = pool.submit(submit_and_watch) # TODO: hook up actual error handling when this fails - IOLoop.current().add_callback(lambda : submit_future) + IOLoop.current().add_callback(lambda: submit_future) log_future = None From 092be68aa1a458877382b5b2e519c33de8d0281d Mon Sep 17 00:00:00 2001 From: Min RK Date: Tue, 4 Feb 2020 14:41:41 +0100 Subject: [PATCH 07/10] fixup: install-hub helm 3 --- testing/minikube/install-hub | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/minikube/install-hub b/testing/minikube/install-hub index f533a378f..6d7ebc360 100755 --- a/testing/minikube/install-hub +++ b/testing/minikube/install-hub @@ -73,7 +73,7 @@ if auth_enabled: # ensure namespace exists namespace_exists = namespace in check_output(['kubectl', 'get', 'namespace']).decode('utf8', 'replace').split() if not namespace_exists: - check_call(['kubecl', 'create', 'namespace', namespace]) + check_call(['kubectl', 'create', 'namespace', namespace]) is_running = name in check_output(['helm', 'list', '-q']).decode('utf8', 'replace').split() if is_running: From f0dd9e2f6dbf036eef23d717b18b8c56811068c6 Mon Sep 17 00:00:00 2001 From: Min RK Date: Tue, 4 Feb 2020 15:56:34 +0100 Subject: [PATCH 08/10] make Builder a re-usable class move v1 BuildHandler to binderhub.v1 --- binderhub/app.py | 2 +- binderhub/build.py | 190 +++++++++------ binderhub/builder.py | 487 ++++++++++++++++++++------------------- binderhub/v1/__init__.py | 1 + binderhub/v1/handlers.py | 153 ++++++++++++ 5 files changed, 516 insertions(+), 317 deletions(-) create mode 100644 binderhub/v1/__init__.py create mode 100644 binderhub/v1/handlers.py diff --git a/binderhub/app.py b/binderhub/app.py index 0aba78196..cf887802e 100755 --- a/binderhub/app.py +++ b/binderhub/app.py @@ -27,7 +27,7 @@ from .base import AboutHandler, Custom404, VersionHandler from .build import Build -from .builder import BuildHandler +from .v1.handlers import BuildHandler from .health import HealthHandler from .launcher import Launcher from .registry import DockerRegistry diff --git a/binderhub/build.py b/binderhub/build.py index d53b74d23..a7749175a 100644 --- a/binderhub/build.py +++ b/binderhub/build.py @@ -2,10 +2,13 @@ Contains build of a docker image from a git repository. """ +from asyncio import Future from collections import defaultdict import datetime +from functools import partial import json import threading +import time from urllib.parse import urlparse from kubernetes import client, watch @@ -17,6 +20,7 @@ class BuildWatcher: """Represents what is needed to watch the progress of a build""" + def __init__(self, q, api, name, namespace='default', log_tail_lines=100): self.q = q self.api = api @@ -26,21 +30,32 @@ def __init__(self, q, api, name, namespace='default', log_tail_lines=100): self.main_loop = IOLoop.current() self.stop_event = threading.Event() + self.running_future = Future() + self.stopped_future = Future() def progress(self, kind, obj): """Put the current action item into the queue for execution.""" self.main_loop.add_callback(self.q.put, {'kind': kind, 'payload': obj}) + def _signal_stopped(self, status): + if not self.stopped_future.done(): + self.stopped_future.set_result(self.pod.status.phase) + + def _signal_running(self): + if not self.running_future.done(): + self.running_future.set_result(None) + def watch(self): app_log.info("Watching build pod %s", self.name) + while not self.stop_event.is_set(): w = watch.Watch() try: for f in w.stream( - self.api.list_namespaced_pod, - self.namespace, - label_selector="name={}".format(self.name), - timeout_seconds=30, + self.api.list_namespaced_pod, + self.namespace, + label_selector="name={}".format(self.name), + timeout_seconds=30, ): if f['type'] == 'DELETED': self.progress('pod.phasechange', 'Deleted') @@ -48,9 +63,12 @@ def watch(self): self.pod = f['object'] if not self.stop_event.is_set(): self.progress('pod.phasechange', self.pod.status.phase) - if self.pod.status.phase == 'Succeeded': - self.cleanup() - elif self.pod.status.phase == 'Failed': + if self.pod.status.phase in {'Running'}: + self.main_loop.add_callback(self._signal_running) + if self.pod.status.phase in {'Succeeded', 'Failed'}: + self.main_loop.add_callback( + partial(self._signal_stopped, self.pod.status.phase) + ) self.cleanup() except Exception: app_log.exception("Error in watch stream for %s", self.name) @@ -65,11 +83,12 @@ def stream_logs(self): """Stream a pod's logs""" app_log.info("Watching logs of %s", self.name) for line in self.api.read_namespaced_pod_log( - self.name, - self.namespace, - follow=True, - tail_lines=self.log_tail_lines, - _preload_content=False): + self.name, + self.namespace, + follow=True, + tail_lines=self.log_tail_lines, + _preload_content=False, + ): if self.stop_event.is_set(): app_log.info("Stopping logs of %s", self.name) return @@ -84,10 +103,7 @@ def stream_logs(self): # If it was a fatal error, presumably a 'failure' # message will arrive shortly. app_log.error("log event not json: %r", line) - line = json.dumps({ - 'phase': 'unknown', - 'message': line, - }) + line = json.dumps({'phase': 'unknown', 'message': line}) self.progress('log', line) else: @@ -102,6 +118,7 @@ def cleanup(self): No-op for BuildWatcher, Build actually removes the pod """ + pass @@ -126,9 +143,26 @@ class Build(BuildWatcher): API instead of having to invent our own locking code. """ - def __init__(self, q, api, name, namespace, repo_url, ref, git_credentials, build_image, - image_name, push_secret, memory_limit, docker_host, node_selector, - appendix='', log_tail_lines=100, sticky_builds=False): + + def __init__( + self, + q, + api, + name, + namespace, + repo_url, + ref, + git_credentials, + build_image, + image_name, + push_secret, + memory_limit, + docker_host, + node_selector, + appendix='', + log_tail_lines=100, + sticky_builds=False, + ): super().__init__(q=q, api=api, name=name, namespace=namespace) self.repo_url = repo_url @@ -150,11 +184,17 @@ def get_cmd(self): """Get the cmd to run to build the image""" cmd = [ 'jupyter-repo2docker', - '--ref', self.ref, - '--image', self.image_name, - '--no-clean', '--no-run', '--json-logs', - '--user-name', 'jovyan', - '--user-id', '1000', + '--ref', + self.ref, + '--image', + self.image_name, + '--no-clean', + '--no-run', + '--json-logs', + '--user-name', + 'jovyan', + '--user-id', + '1000', ] if self.appendix: cmd.extend(['--appendix', self.appendix]) @@ -179,7 +219,8 @@ def cleanup(self): self.api.delete_namespaced_pod( name=self.name, namespace=self.namespace, - body=client.V1DeleteOptions(grace_period_seconds=0)) + body=client.V1DeleteOptions(grace_period_seconds=0), + ) except client.rest.ApiException as e: if e.status == 404: # Is ok, someone else has already deleted it @@ -191,8 +232,7 @@ def cleanup(self): def cleanup_builds(cls, kube, namespace, max_age): """Delete stopped build pods and build pods that have aged out""" builds = kube.list_namespaced_pod( - namespace=namespace, - label_selector='component=binderhub-build', + namespace=namespace, label_selector='component=binderhub-build' ).items phases = defaultdict(int) app_log.debug("%i build pods", len(builds)) @@ -232,7 +272,8 @@ def cleanup_builds(cls, kube, namespace, max_age): kube.delete_namespaced_pod( name=build.metadata.name, namespace=namespace, - body=client.V1DeleteOptions(grace_period_seconds=0)) + body=client.V1DeleteOptions(grace_period_seconds=0), + ) except client.rest.ApiException as e: if e.status == 404: # Is ok, someone else has already deleted it @@ -242,7 +283,9 @@ def cleanup_builds(cls, kube, namespace, max_age): if deleted: app_log.info("Deleted %i/%i build pods", deleted, len(builds)) - app_log.debug("Build phase summary: %s", json.dumps(phases, sort_keys=True, indent=1)) + app_log.debug( + "Build phase summary: %s", json.dumps(phases, sort_keys=True, indent=1) + ) def get_affinity(self): """Determine the affinity term for the build pod. @@ -258,8 +301,7 @@ def get_affinity(self): docker layer cache of previous builds. """ dind_pods = self.api.list_namespaced_pod( - self.namespace, - label_selector="component=dind,app=binder", + self.namespace, label_selector="component=dind,app=binder" ) if self.sticky_builds and dind_pods: @@ -295,11 +337,9 @@ def get_affinity(self): pod_affinity_term=client.V1PodAffinityTerm( topology_key="kubernetes.io/hostname", label_selector=client.V1LabelSelector( - match_labels=dict( - component=self._component_label - ) - ) - ) + match_labels=dict(component=self._component_label) + ), + ), ) ] ) @@ -310,35 +350,44 @@ def get_affinity(self): def submit(self): """Submit a build pod to create the image for the repository.""" volume_mounts = [ - client.V1VolumeMount(mount_path="/var/run/docker.sock", name="docker-socket") + client.V1VolumeMount( + mount_path="/var/run/docker.sock", name="docker-socket" + ) ] docker_socket_path = urlparse(self.docker_host).path - volumes = [client.V1Volume( - name="docker-socket", - host_path=client.V1HostPathVolumeSource(path=docker_socket_path, type='Socket') - )] + volumes = [ + client.V1Volume( + name="docker-socket", + host_path=client.V1HostPathVolumeSource( + path=docker_socket_path, type='Socket' + ), + ) + ] if self.push_secret: - volume_mounts.append(client.V1VolumeMount(mount_path="/root/.docker", name='docker-push-secret')) - volumes.append(client.V1Volume( - name='docker-push-secret', - secret=client.V1SecretVolumeSource(secret_name=self.push_secret) - )) + volume_mounts.append( + client.V1VolumeMount( + mount_path="/root/.docker", name='docker-push-secret' + ) + ) + volumes.append( + client.V1Volume( + name='docker-push-secret', + secret=client.V1SecretVolumeSource(secret_name=self.push_secret), + ) + ) env = [] if self.git_credentials: - env.append(client.V1EnvVar(name='GIT_CREDENTIAL_ENV', value=self.git_credentials)) + env.append( + client.V1EnvVar(name='GIT_CREDENTIAL_ENV', value=self.git_credentials) + ) self.pod = client.V1Pod( metadata=client.V1ObjectMeta( name=self.name, - labels={ - "name": self.name, - "component": self._component_label, - }, - annotations={ - "binder-repo": self.repo_url, - }, + labels={"name": self.name, "component": self._component_label}, + annotations={"binder-repo": self.repo_url}, ), spec=client.V1PodSpec( containers=[ @@ -349,9 +398,9 @@ def submit(self): volume_mounts=volume_mounts, resources=client.V1ResourceRequirements( limits={'memory': self.memory_limit}, - requests={'memory': self.memory_limit} + requests={'memory': self.memory_limit}, ), - env=env + env=env, ) ], tolerations=[ @@ -373,8 +422,8 @@ def submit(self): node_selector=self.node_selector, volumes=volumes, restart_policy="Never", - affinity=self.get_affinity() - ) + affinity=self.get_affinity(), + ), ) try: @@ -389,42 +438,37 @@ def submit(self): else: app_log.info("Started build %s", self.name) + self.watch() + class FakeBuild(Build): """ Fake Building process to be able to work on the UI without a running Minikube. """ + def submit(self): self.progress('pod.phasechange', 'Running') return def stream_logs(self): - import time + time.sleep(3) for phase in ('Pending', 'Running', 'Succeed', 'Building'): if self.stop_event.is_set(): app_log.warning("Stopping logs of %s", self.name) return - self.progress('log', - json.dumps({ - 'phase': phase, - 'message': f"{phase}...\n", - }) + self.progress( + 'log', json.dumps({'phase': phase, 'message': f"{phase}...\n"}) ) for i in range(5): if self.stop_event.is_set(): app_log.warning("Stopping logs of %s", self.name) return time.sleep(1) - self.progress('log', - json.dumps({ - 'phase': 'unknown', - 'message': f"Step {i+1}/10\n", - }) + self.progress( + 'log', json.dumps({'phase': 'unknown', 'message': f"Step {i+1}/10\n"}) ) self.progress('pod.phasechange', 'Succeeded') - self.progress('log', json.dumps({ - 'phase': 'Deleted', - 'message': f"Deleted...\n", - }) + self.progress( + 'log', json.dumps({'phase': 'Deleted', 'message': f"Deleted...\n"}) ) diff --git a/binderhub/builder.py b/binderhub/builder.py index 6f7ce7513..67168da12 100644 --- a/binderhub/builder.py +++ b/binderhub/builder.py @@ -2,7 +2,9 @@ Handlers for working with version control services (i.e. GitHub) for builds. """ +import asyncio import hashlib +from functools import partial import json import string import time @@ -11,14 +13,16 @@ import escapism from tornado.concurrent import chain_future, Future from tornado import gen -from tornado.web import authenticated from tornado.queues import Queue from tornado.ioloop import IOLoop from tornado.log import app_log +from traitlets import Dict, HasTraits, Instance, Unicode from prometheus_client import Counter, Histogram, Gauge -from .base import EventStreamHandler -from .build import Build, FakeBuild +from .build import Build, BuildWatcher, FakeBuild +from .events import EventLog +from .registry import DockerRegistry +from .repoproviders import RepoProvider # Separate buckets for builds and launches. # Builds and launches have very different characteristic times, @@ -38,9 +42,7 @@ buckets=LAUNCH_BUCKETS, ) BUILD_COUNT = Counter( - 'binderhub_build_count', - 'Counter of builds by repo', - ['status', 'provider', 'repo'], + 'binderhub_build_count', 'Counter of builds by repo', ['status', 'provider', 'repo'] ) LAUNCH_COUNT = Counter( 'binderhub_launch_count', @@ -48,25 +50,34 @@ ['status', 'provider', 'repo'], ) BUILDS_INPROGRESS = Gauge('binderhub_inprogress_builds', 'Builds currently in progress') -LAUNCHES_INPROGRESS = Gauge('binderhub_inprogress_launches', 'Launches currently in progress') +LAUNCHES_INPROGRESS = Gauge( + 'binderhub_inprogress_launches', 'Launches currently in progress' +) -class BuildHandler(EventStreamHandler): - """A handler for working with GitHub.""" - build = None +class BuildFailed(Exception): + """Exception to raise when a build fails""" - def on_finish(self): - super().on_finish() - if self.build: - # if we have a build, tell it to stop watching - self.build.stop() - def initialize(self): - super().initialize() - if self.settings['use_registry']: - self.registry = self.settings['registry'] +class LaunchFailed(Exception): + """Exception to raise when a launch fails""" + + +class Builder(HasTraits): + """A handler for working with GitHub.""" + + # general application state + registry = Instance(DockerRegistry, allow_none=True) + event_log = Instance(EventLog) + settings = Dict() + binder_launch_host = Unicode() - self.event_log = self.settings['event_log'] + # per-build + provider_prefix = Unicode() + provider = Instance(RepoProvider) + spec = Unicode() + build = Instance(BuildWatcher) + origin = Unicode() def _generate_build_name(self, build_slug, ref, prefix='', limit=63, ref_length=6): """ @@ -94,16 +105,17 @@ def _generate_build_name(self, build_slug, ref, prefix='', limit=63, ref_length= # build names are case-insensitive because ascii_letters are allowed, # and `.lower()` is called at the end safe_chars = set(string.ascii_letters + string.digits) + def escape(s): return escapism.escape(s, safe=safe_chars, escape_char='-') - build_slug = self._safe_build_slug(build_slug, limit=limit - len(prefix) - ref_length - 1) + build_slug = self._safe_build_slug( + build_slug, limit=limit - len(prefix) - ref_length - 1 + ) ref = escape(ref) return '{prefix}{safe_slug}-{ref}'.format( - prefix=prefix, - safe_slug=build_slug, - ref=ref[:ref_length], + prefix=prefix, safe_slug=build_slug, ref=ref[:ref_length] ).lower() def _safe_build_slug(self, build_slug, limit, hash_length=6): @@ -116,122 +128,71 @@ def _safe_build_slug(self, build_slug, limit, hash_length=6): """ build_slug_hash = hashlib.sha256(build_slug.encode('utf-8')).hexdigest() safe_chars = set(string.ascii_letters + string.digits) + def escape(s): return escapism.escape(s, safe=safe_chars, escape_char='-') + build_slug = escape(build_slug) return '{name}-{hash}'.format( - name=build_slug[:limit - hash_length - 1], + name=build_slug[: limit - hash_length - 1], hash=build_slug_hash[:hash_length], ).lower() - async def fail(self, message): - await self.emit({ - 'phase': 'failed', - 'message': message + '\n', - }) - - @authenticated - async def get(self, provider_prefix, _unescaped_spec): - """Get a built image for a given spec and repo provider. - - Different repo providers will require different spec information. This - function relies on the functionality of the tornado `GET` request. - - Parameters - ---------- - provider_prefix : str - the nickname for a repo provider (i.e. 'gh') - spec: - specifies information needed by the repo provider (i.e. user, - repo, ref, etc.) - - """ - prefix = '/build/' + provider_prefix - spec = self.get_spec_from_request(prefix) + await self.emit({'phase': 'failed', 'message': message + '\n'}) + async def resolve_provider(self): - # Verify if the provider is valid for EventSource. - # EventSource cannot handle HTTP errors, so we must validate and send - # error messages on the eventsource. - if provider_prefix not in self.settings['repo_providers']: - await self.fail("No provider found for prefix %s" % provider_prefix) - return - - # create a heartbeat - IOLoop.current().spawn_callback(self.keep_alive) - - spec = spec.rstrip("/") - key = '%s:%s' % (provider_prefix, spec) - - # get a provider object that encapsulates the provider and the spec - try: - provider = self.get_provider(provider_prefix, spec=spec) - except Exception as e: - app_log.exception("Failed to get provider for %s", key) - await self.fail(str(e)) - return - - if provider.is_banned(): - await self.emit({ - 'phase': 'failed', - 'message': 'Sorry, {} has been temporarily disabled from launching. Please contact admins for more info!'.format(spec) - }) - return - + provider_prefix = self.provider_prefix + provider = self.provider + spec = self.spec repo_url = self.repo_url = provider.get_repo_url() # labels to apply to build/launch metrics - self.repo_metric_labels = { - 'provider': provider.name, - 'repo': repo_url, - } + self.repo_metric_labels = {'provider': provider.name, 'repo': repo_url} - try: - ref = await provider.get_resolved_ref() - except Exception as e: - await self.fail("Error resolving ref for %s: %s" % (key, e)) - return - if ref is None: - await self.fail("Could not resolve ref for %s. Double check your URL." % key) - return + ref = self.resolved_ref = await provider.get_resolved_ref() self.ref_url = await provider.get_resolved_ref_url() resolved_spec = await provider.get_resolved_spec() - badge_base_url = self.get_badge_base_url() - self.binder_launch_host = badge_base_url or '{proto}://{host}{base_url}'.format( - proto=self.request.protocol, - host=self.request.host, - base_url=self.settings['base_url'], - ) # These are relative URLs so do not have a leading / self.binder_request = 'v2/{provider}/{spec}'.format( - provider=provider_prefix, - spec=spec, + provider=provider_prefix, spec=spec ) self.binder_persistent_request = 'v2/{provider}/{spec}'.format( - provider=provider_prefix, - spec=resolved_spec, + provider=provider_prefix, spec=resolved_spec ) + # resolve build name as well + self.build_name = self._generate_build_name( + provider.get_build_slug(), ref, prefix='build-' + ) + return ref + + async def image_needs_building(self): + # generate a complete build name (for GitHub: `build-{user}-{repo}-{ref}`) image_prefix = self.settings['image_prefix'] # Enforces max 255 characters before image - safe_build_slug = self._safe_build_slug(provider.get_build_slug(), limit=255 - len(image_prefix)) - - build_name = self._generate_build_name(provider.get_build_slug(), ref, prefix='build-') + safe_build_slug = self._safe_build_slug( + self.provider.get_build_slug(), limit=255 - len(image_prefix) + ) - image_name = self.image_name = '{prefix}{build_slug}:{ref}'.format( - prefix=image_prefix, - build_slug=safe_build_slug, - ref=ref - ).replace('_', '-').lower() + image_name = self.image_name = ( + '{prefix}{build_slug}:{ref}'.format( + prefix=image_prefix, build_slug=safe_build_slug, ref=self.resolved_ref + ) + .replace('_', '-') + .lower() + ) if self.settings['use_registry']: - image_manifest = await self.registry.get_image_manifest(*'/'.join(image_name.split('/')[-2:]).split(':', 1)) + image_manifest = await self.registry.get_image_manifest( + *'/'.join(image_name.split('/')[-2:]).split(':', 1) + ) image_found = bool(image_manifest) else: # Check if the image exists locally! @@ -245,27 +206,19 @@ async def get(self, provider_prefix, _unescaped_spec): else: image_found = True + return image_found + + async def request_build(self): + # Launch a notebook server if the image already is built kube = self.settings['kubernetes_client'] - - if image_found: - await self.emit({ - 'phase': 'built', - 'imageName': image_name, - 'message': 'Found built image, launching...\n' - }) - with LAUNCHES_INPROGRESS.track_inprogress(): - await self.launch(kube, provider) - self.event_log.emit('binderhub.jupyter.org/launch', 3, { - 'provider': provider.name, - 'spec': spec, - 'status': 'success', - 'origin': self.settings['normalized_origin'] if self.settings['normalized_origin'] else self.request.host - }) - return + repo_url = self.repo_url + ref = self.resolved_ref + image_name = self.image_name + provider = self.provider # Prepare to build - q = Queue() + q = self.event_queue = Queue() if self.settings['use_registry']: push_secret = self.settings['push_secret'] @@ -276,7 +229,8 @@ async def get(self, provider_prefix, _unescaped_spec): appendix = self.settings['appendix'].format( binder_url=self.binder_launch_host + self.binder_request, - persistent_binder_url=self.binder_launch_host + self.binder_persistent_request, + persistent_binder_url=self.binder_launch_host + + self.binder_persistent_request, repo_url=repo_url, ref_url=self.ref_url, ) @@ -284,7 +238,7 @@ async def get(self, provider_prefix, _unescaped_spec): self.build = build = BuildClass( q=q, api=kube, - name=build_name, + name=self.build_name, namespace=self.settings["build_namespace"], repo_url=repo_url, ref=ref, @@ -299,93 +253,133 @@ async def get(self, provider_prefix, _unescaped_spec): git_credentials=provider.git_credentials, sticky_builds=self.settings['sticky_builds'], ) + pool = self.settings['build_pool'] + + async def submit_with_timing(): + with BUILDS_INPROGRESS.track_inprogress(): + self.build_starttime = time.perf_counter() + await asyncio.wrap_future(pool.submit(build.submit)) + + IOLoop.current().add_callback(submit_with_timing) + + return build + + async def watch(self, stream_logs=True): + pool = self.settings['build_pool'] + if self.build is None: + BuildWatcherClass = ( + FakeBuild if self.settings.get('fake_build') else BuildWatcher + ) + q = Queue() + self.build = BuildWatcherClass( + q=q, + api=self.settings['kubernetes'], + name=self.build_name, + namespace=self.settings['build_namespace'], + ) + # call watch when we are instantiating a Watcher + # + IOLoop.current().add_callback(partial(pool.submit, self.build.watch)) + else: + # if this happens, we already invoked .request_build() + # which itself called build.watch() + pass + + build = self.build + + # initial waiting event + yield {'phase': 'waiting', 'message': 'Waiting for build to start...\n'} + + # wait for the first of: + # 1. build is ready to start streaming logs, or + # 2. build has stopped (success or failure) + + await asyncio.wait([self.build.stopped_future, self.build.running_future]) + if self.build.stopped_future.done(): + # ... + return + + q = self.build.q + + log_future = None + + done = False + failed = False + while not done: + progress = await q.get() + + # FIXME: If pod goes into an unrecoverable stage, such as ImagePullBackoff or + # whatever, we should fail properly. + if progress['kind'] == 'pod.phasechange': + if progress['payload'] == 'Pending': + # nothing to do, just waiting + continue + elif progress['payload'] == 'Deleted': + event = { + 'phase': 'built', + 'message': 'Built image, launching...\n', + 'imageName': self.image_name, + } + done = True + elif progress['payload'] == 'Running': + # start capturing build logs once the pod is running + if log_future is None and stream_logs: + log_future = pool.submit(build.stream_logs) + continue + elif progress['payload'] == 'Succeeded': + # Do nothing, is ok! + continue + else: + # FIXME: message? debug? + event = {'phase': progress['payload']} + elif progress['kind'] == 'log': + # We expect logs to be already JSON structured anyway + event = progress['payload'] + payload = json.loads(event) + if payload.get('phase') in ('failure', 'failed'): + failed = True + BUILD_TIME.labels(status='failure').observe( + time.perf_counter() - self.build_starttime + ) + BUILD_COUNT.labels( + status='failure', **self.repo_metric_labels + ).inc() - with BUILDS_INPROGRESS.track_inprogress(): - build_starttime = time.perf_counter() - pool = self.settings['build_pool'] - - # Start building - def submit_and_watch(): - build.submit() - build.watch() - - submit_future = pool.submit(submit_and_watch) - # TODO: hook up actual error handling when this fails - IOLoop.current().add_callback(lambda: submit_future) - - log_future = None - - # initial waiting event - await self.emit({ - 'phase': 'waiting', - 'message': 'Waiting for build to start...\n', - }) - - done = False - failed = False - while not done: - progress = await q.get() - - # FIXME: If pod goes into an unrecoverable stage, such as ImagePullBackoff or - # whatever, we should fail properly. - if progress['kind'] == 'pod.phasechange': - if progress['payload'] == 'Pending': - # nothing to do, just waiting - continue - elif progress['payload'] == 'Deleted': - event = { - 'phase': 'built', - 'message': 'Built image, launching...\n', - 'imageName': image_name, - } - done = True - elif progress['payload'] == 'Running': - # start capturing build logs once the pod is running - if log_future is None: - log_future = pool.submit(build.stream_logs) - continue - elif progress['payload'] == 'Succeeded': - # Do nothing, is ok! - continue - else: - # FIXME: message? debug? - event = {'phase': progress['payload']} - elif progress['kind'] == 'log': - # We expect logs to be already JSON structured anyway - event = progress['payload'] - payload = json.loads(event) - if payload.get('phase') in ('failure', 'failed'): - failed = True - BUILD_TIME.labels(status='failure').observe(time.perf_counter() - build_starttime) - BUILD_COUNT.labels(status='failure', **self.repo_metric_labels).inc() - - await self.emit(event) - - # Launch after building an image - if not failed: - BUILD_TIME.labels(status='success').observe(time.perf_counter() - build_starttime) + yield event + + if failed: + raise BuildFailed() + else: + BUILD_TIME.labels(status='success').observe( + time.perf_counter() - self.build_starttime + ) BUILD_COUNT.labels(status='success', **self.repo_metric_labels).inc() - with LAUNCHES_INPROGRESS.track_inprogress(): - await self.launch(kube, provider) - self.event_log.emit('binderhub.jupyter.org/launch', 3, { - 'provider': provider.name, - 'spec': spec, + + async def launch(self): + """Ask JupyterHub to launch the image. + + Wraps _launch in timing metrics + """ + with LAUNCHES_INPROGRESS.track_inprogress(): + async for event in self._launch( + self.settings['kubernetes_client'], self.provider + ): + yield event + self.event_log.emit( + 'binderhub.jupyter.org/launch', + 3, + { + 'provider': self.provider.name, + 'spec': self.spec, 'status': 'success', - 'origin': self.settings['normalized_origin'] if self.settings['normalized_origin'] else self.request.host - }) - - # Don't close the eventstream immediately. - # (javascript) eventstream clients reconnect automatically on dropped connections, - # so if the server closes the connection first, - # the client will reconnect which starts a new build. - # If we sleep here, that makes it more likely that a well-behaved - # client will close its connection first. - # The duration of this shouldn't matter because - # well-behaved clients will close connections after they receive the launch event. - await gen.sleep(60) - - async def launch(self, kube, provider): - """Ask JupyterHub to launch the image.""" + 'origin': self.origin, + }, + ) + + async def _launch(self, kube, provider): + """Ask JupyterHub to launch the image. + + This private method""" # Load the spec-specific configuration if it has been overridden repo_config = provider.repo_config(self.settings) @@ -399,7 +393,8 @@ async def launch(self, kube, provider): # TODO: run a watch to keep this up to date in the background pool = self.settings['executor'] - f = pool.submit(kube.list_namespaced_pod, + f = pool.submit( + kube.list_namespaced_pod, self.settings["build_namespace"], label_selector='app=jupyterhub,component=singleuser-server', ) @@ -423,22 +418,29 @@ async def launch(self, kube, provider): # That would be hard to do without in-memory state. quota = repo_config.get('quota') if quota and matching_pods >= quota: - app_log.error("%s has exceeded quota: %s/%s (%s total)", - self.repo_url, matching_pods, quota, total_pods) - await self.fail("Too many users running %s! Try again soon." % self.repo_url) - return + app_log.error( + "%s has exceeded quota: %s/%s (%s total)", + self.repo_url, + matching_pods, + quota, + total_pods, + ) + raise LaunchFailed( + "Too many users running %s! Try again soon." % self.repo_url + ) if quota and matching_pods >= 0.5 * quota: log = app_log.warning else: log = app_log.info - log("Launching pod for %s: %s other pods running this repo (%s total)", - self.repo_url, matching_pods, total_pods) + log( + "Launching pod for %s: %s other pods running this repo (%s total)", + self.repo_url, + matching_pods, + total_pods, + ) - await self.emit({ - 'phase': 'launching', - 'message': 'Launching server...\n', - }) + yield {'phase': 'launching', 'message': 'Launching server...\n'} launcher = self.settings['launcher'] retry_delay = launcher.retry_delay @@ -464,17 +466,17 @@ async def launch(self, kube, provider): 'binder_request': self.binder_request, 'binder_persistent_request': self.binder_persistent_request, } - server_info = await launcher.launch(image=self.image_name, - username=username, - server_name=server_name, - repo_url=self.repo_url, - extra_args=extra_args) - LAUNCH_TIME.labels( - status='success', retries=i, - ).observe(time.perf_counter() - launch_starttime) - LAUNCH_COUNT.labels( - status='success', **self.repo_metric_labels, - ).inc() + server_info = await launcher.launch( + image=self.image_name, + username=username, + server_name=server_name, + repo_url=self.repo_url, + extra_args=extra_args, + ) + LAUNCH_TIME.labels(status='success', retries=i).observe( + time.perf_counter() - launch_starttime + ) + LAUNCH_COUNT.labels(status='success', **self.repo_metric_labels).inc() except Exception as e: if i + 1 == launcher.retries: @@ -483,14 +485,12 @@ async def launch(self, kube, provider): status = 'retry' # don't count retries in failure/retry # retry count is only interesting in success - LAUNCH_TIME.labels( - status=status, retries=-1, - ).observe(time.perf_counter() - launch_starttime) + LAUNCH_TIME.labels(status=status, retries=-1).observe( + time.perf_counter() - launch_starttime + ) if status == 'failure': # don't count retries per repo - LAUNCH_COUNT.labels( - status=status, **self.repo_metric_labels, - ).inc() + LAUNCH_COUNT.labels(status=status, **self.repo_metric_labels).inc() if i + 1 == launcher.retries: # last attempt failed, let it raise @@ -498,11 +498,12 @@ async def launch(self, kube, provider): # not the last attempt, try again app_log.error("Retrying launch after error: %s", e) - await self.emit({ + yield { 'phase': 'launching', 'message': 'Launch attempt {} failed, retrying...\n'.format(i + 1), - }) - await gen.sleep(retry_delay) + } + + await asyncio.sleep(retry_delay) # exponential backoff for consecutive failures retry_delay *= 2 continue @@ -514,4 +515,4 @@ async def launch(self, kube, provider): 'message': 'server running at %s\n' % server_info['url'], } event.update(server_info) - await self.emit(event) + yield event diff --git a/binderhub/v1/__init__.py b/binderhub/v1/__init__.py new file mode 100644 index 000000000..2b37d5cd3 --- /dev/null +++ b/binderhub/v1/__init__.py @@ -0,0 +1 @@ +from .handlers import default_handlers # noqa diff --git a/binderhub/v1/handlers.py b/binderhub/v1/handlers.py new file mode 100644 index 000000000..0b279cf66 --- /dev/null +++ b/binderhub/v1/handlers.py @@ -0,0 +1,153 @@ +import asyncio + +from tornado.web import authenticated +from tornado.ioloop import IOLoop +from tornado.log import app_log + +from ..build import Build, FakeBuild +from ..builder import Builder, BuildFailed, LaunchFailed + +from ..base import EventStreamHandler + + +class BuildHandler(EventStreamHandler): + """GET /build/:provider/:spec triggers a build and launch + + response is an event stream watching progress as the build and launch proceed + """ + + path = r"/build/([^/]+)/(.+)" + + build = None + + def on_finish(self): + super().on_finish() + if self.build: + # if we have a build, tell it to stop watching + self.build.stop() + + def initialize(self): + super().initialize() + if self.settings['use_registry']: + self.registry = self.settings['registry'] + + self.event_log = self.settings['event_log'] + + async def fail(self, message): + await self.emit({'phase': 'failed', 'message': message + '\n'}) + + @authenticated + async def get(self, provider_prefix, _unescaped_spec): + """Get a built image for a given spec and repo provider. + + Different repo providers will require different spec information. This + function relies on the functionality of the tornado `GET` request. + + Parameters + ---------- + provider_prefix : str + the nickname for a repo provider (i.e. 'gh') + spec: + specifies information needed by the repo provider (i.e. user, + repo, ref, etc.) + + """ + prefix = '/build/' + provider_prefix + spec = self.get_spec_from_request(prefix) + + # create a heartbeat + IOLoop.current().spawn_callback(self.keep_alive) + + spec = spec.rstrip("/") + key = '%s:%s' % (provider_prefix, spec) + + # get a provider object that encapsulates the provider and the spec + try: + provider = self.get_provider(provider_prefix, spec=spec) + except KeyError: + await self.fail("No provider found for prefix %r" % provider_prefix) + return + except Exception as e: + app_log.exception("Failed to get provider for %s", key) + await self.fail(str(e)) + return + + if provider.is_banned(): + await self.emit( + { + 'phase': 'failed', + 'message': 'Sorry, {} has been temporarily disabled from launching. Please contact admins for more info!'.format( + spec + ), + } + ) + return + + origin = ( + self.settings['normalized_origin'] + if self.settings['normalized_origin'] + else self.request.host + ) + + binder_launch_host = self.get_badge_base_url() or '{proto}://{host}{base_url}'.format( + proto=self.request.protocol, + host=self.request.host, + base_url=self.settings['base_url'], + ) + + self.builder = Builder( + settings=self.settings, + provider=provider, + provider_prefix=provider_prefix, + spec=spec, + event_log=self.settings['event_log'], + registry=self.registry if self.settings['use_registry'] else None, + origin=origin, + binder_launch_host=binder_launch_host, + ) + + ref = await self.builder.resolve_provider() + image_found = not await self.builder.image_needs_building() + image_name = self.builder.image_name + + # Launch a notebook server if the image already is built + kube = self.settings['kubernetes_client'] + + if image_found: + await self.emit( + { + 'phase': 'built', + 'imageName': image_name, + 'message': 'Found built image, launching...\n', + } + ) + await self.builder.launch() + return + + # Prepare to build + build = await self.builder.request_build() + failed = False + try: + async for event in self.builder.watch(stream_logs=True): + await self.emit(event) + except BuildFailed: + # failed event was already emitted! + print("build failed!") + failed = True + + # Launch after building an image + if not failed: + async for event in self.builder.launch(): + await self.emit(event) + + # Don't close the eventstream immediately. + # (javascript) eventstream clients reconnect automatically on dropped connections, + # so if the server closes the connection first, + # the client will reconnect which starts a new build. + # If we sleep here, that makes it more likely that a well-behaved + # client will close its connection first. + # The duration of this shouldn't matter because + # well-behaved clients will close connections after they receive the launch event. + await asyncio.sleep(60) + +default_handlers = [BuildHandler] From 1415adfcb05a72e375fe97353899f71f9040311e Mon Sep 17 00:00:00 2001 From: Min RK Date: Wed, 5 Feb 2020 10:48:05 +0100 Subject: [PATCH 09/10] keep install-hub working on helm 2 --- testing/minikube/install-hub | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/testing/minikube/install-hub b/testing/minikube/install-hub index 6d7ebc360..58d2d037d 100755 --- a/testing/minikube/install-hub +++ b/testing/minikube/install-hub @@ -7,10 +7,11 @@ for testing binderhub Gets the jupyterhub chart version from the binderhub helm chart to ensure we are testing against a reasonable version. """ -import sys import os import pipes +import re from subprocess import check_call, check_output +import sys import time from kubernetes import client, config @@ -41,6 +42,9 @@ def get_hub_chart_dependency(): jupyterhub = get_hub_chart_dependency() +helm_version = check_output(['helm', 'version', '--short']).decode('utf8') +helm_major = int(re.match(r'v?(\d+)', helm_version).group(1)) + # update the helm repo check_call(['helm', 'repo', 'add', 'jupyterhub', jupyterhub['repository']]) check_call(['helm', 'repo', 'update']) @@ -79,8 +83,8 @@ is_running = name in check_output(['helm', 'list', '-q']).decode('utf8', 'replac if is_running: cmd = ['helm', 'upgrade', name] else: - - cmd = ['helm', 'install', f'{name}'] + name_arg = name if helm_version >= 3 else f"--name={name}" + cmd = ['helm', 'install', name_arg] cmd.extend(args) print("\n %s\n" % ' '.join(map(pipes.quote, cmd))) From 355e28c675c599aaa6e6ecfa16ca5334c11be785 Mon Sep 17 00:00:00 2001 From: Min RK Date: Wed, 5 Feb 2020 10:59:17 +0100 Subject: [PATCH 10/10] re-merge BuildWatcher into Build not helpful to have separate classes --- binderhub/build.py | 228 +++++++++++++++++++++---------------------- binderhub/builder.py | 11 +-- 2 files changed, 115 insertions(+), 124 deletions(-) diff --git a/binderhub/build.py b/binderhub/build.py index a7749175a..78b2868d0 100644 --- a/binderhub/build.py +++ b/binderhub/build.py @@ -14,115 +14,12 @@ from kubernetes import client, watch from tornado.ioloop import IOLoop from tornado.log import app_log +from tornado.queues import Queue from .utils import rendezvous_rank -class BuildWatcher: - """Represents what is needed to watch the progress of a build""" - - def __init__(self, q, api, name, namespace='default', log_tail_lines=100): - self.q = q - self.api = api - self.name = name - self.namespace = namespace - self.log_tail_lines = log_tail_lines - - self.main_loop = IOLoop.current() - self.stop_event = threading.Event() - self.running_future = Future() - self.stopped_future = Future() - - def progress(self, kind, obj): - """Put the current action item into the queue for execution.""" - self.main_loop.add_callback(self.q.put, {'kind': kind, 'payload': obj}) - - def _signal_stopped(self, status): - if not self.stopped_future.done(): - self.stopped_future.set_result(self.pod.status.phase) - - def _signal_running(self): - if not self.running_future.done(): - self.running_future.set_result(None) - - def watch(self): - app_log.info("Watching build pod %s", self.name) - - while not self.stop_event.is_set(): - w = watch.Watch() - try: - for f in w.stream( - self.api.list_namespaced_pod, - self.namespace, - label_selector="name={}".format(self.name), - timeout_seconds=30, - ): - if f['type'] == 'DELETED': - self.progress('pod.phasechange', 'Deleted') - return - self.pod = f['object'] - if not self.stop_event.is_set(): - self.progress('pod.phasechange', self.pod.status.phase) - if self.pod.status.phase in {'Running'}: - self.main_loop.add_callback(self._signal_running) - if self.pod.status.phase in {'Succeeded', 'Failed'}: - self.main_loop.add_callback( - partial(self._signal_stopped, self.pod.status.phase) - ) - self.cleanup() - except Exception: - app_log.exception("Error in watch stream for %s", self.name) - raise - finally: - w.stop() - if self.stop_event.is_set(): - app_log.info("Stopping watch of %s", self.name) - return - - def stream_logs(self): - """Stream a pod's logs""" - app_log.info("Watching logs of %s", self.name) - for line in self.api.read_namespaced_pod_log( - self.name, - self.namespace, - follow=True, - tail_lines=self.log_tail_lines, - _preload_content=False, - ): - if self.stop_event.is_set(): - app_log.info("Stopping logs of %s", self.name) - return - # verify that the line is JSON - line = line.decode('utf-8') - try: - json.loads(line) - except ValueError: - # log event wasn't JSON. - # use the line itself as the message with unknown phase. - # We don't know what the right phase is, use 'unknown'. - # If it was a fatal error, presumably a 'failure' - # message will arrive shortly. - app_log.error("log event not json: %r", line) - line = json.dumps({'phase': 'unknown', 'message': line}) - - self.progress('log', line) - else: - app_log.info("Finished streaming logs of %s", self.name) - - def stop(self): - """Stop watching a build""" - self.stop_event.set() - - def cleanup(self): - """Cleanup is called when watch notices that a pod stops - - No-op for BuildWatcher, Build actually removes the pod - """ - - pass - - -class Build(BuildWatcher): +class Build: """Represents a build of a git repository into a docker image. This ultimately maps to a single pod on a kubernetes cluster. Many @@ -146,25 +43,35 @@ class Build(BuildWatcher): def __init__( self, - q, - api, + # all args are keyword-only + *, + # args used for watching + q=None, name, - namespace, - repo_url, - ref, - git_credentials, - build_image, - image_name, - push_secret, - memory_limit, - docker_host, - node_selector, + api=None, + namespace='default', + # required args for building + repo_url=None, + ref=None, + build_image=None, + image_name=None, + # fully optional args + git_credentials=None, + push_secret=None, + memory_limit=None, + docker_host=None, + node_selector=None, appendix='', log_tail_lines=100, sticky_builds=False, ): - super().__init__(q=q, api=api, name=name, namespace=namespace) + self.q = q or Queue() + self.api = api + self.name = name + self.namespace = namespace + self.log_tail_lines = log_tail_lines + self.repo_url = repo_url self.ref = ref self.image_name = image_name @@ -178,8 +85,29 @@ def __init__( self.git_credentials = git_credentials self.sticky_builds = sticky_builds + self.main_loop = IOLoop.current() + self.stop_event = threading.Event() + self.running_future = Future() + self.stopped_future = Future() + self._component_label = "binderhub-build" + def stop(self): + """Stop watching a build""" + self.stop_event.set() + + def progress(self, kind, obj): + """Put the current action item into the queue for execution.""" + self.main_loop.add_callback(self.q.put, {'kind': kind, 'payload': obj}) + + def _signal_stopped(self, status): + if not self.stopped_future.done(): + self.stopped_future.set_result(self.pod.status.phase) + + def _signal_running(self): + if not self.running_future.done(): + self.running_future.set_result(None) + def get_cmd(self): """Get the cmd to run to build the image""" cmd = [ @@ -440,6 +368,70 @@ def submit(self): self.watch() + def watch(self): + app_log.info("Watching build pod %s", self.name) + + while not self.stop_event.is_set(): + w = watch.Watch() + try: + for f in w.stream( + self.api.list_namespaced_pod, + self.namespace, + label_selector="name={}".format(self.name), + timeout_seconds=30, + ): + if f['type'] == 'DELETED': + self.progress('pod.phasechange', 'Deleted') + return + self.pod = f['object'] + if not self.stop_event.is_set(): + self.progress('pod.phasechange', self.pod.status.phase) + if self.pod.status.phase in {'Running'}: + self.main_loop.add_callback(self._signal_running) + if self.pod.status.phase in {'Succeeded', 'Failed'}: + self.main_loop.add_callback( + partial(self._signal_stopped, self.pod.status.phase) + ) + self.cleanup() + except Exception: + app_log.exception("Error in watch stream for %s", self.name) + raise + finally: + w.stop() + if self.stop_event.is_set(): + app_log.info("Stopping watch of %s", self.name) + return + + def stream_logs(self): + """Stream a pod's logs""" + app_log.info("Watching logs of %s", self.name) + for line in self.api.read_namespaced_pod_log( + self.name, + self.namespace, + follow=True, + tail_lines=self.log_tail_lines, + _preload_content=False, + ): + if self.stop_event.is_set(): + app_log.info("Stopping logs of %s", self.name) + return + # verify that the line is JSON + line = line.decode('utf-8') + try: + json.loads(line) + except ValueError: + # log event wasn't JSON. + # use the line itself as the message with unknown phase. + # We don't know what the right phase is, use 'unknown'. + # If it was a fatal error, presumably a 'failure' + # message will arrive shortly. + app_log.error("log event not json: %r", line) + line = json.dumps({'phase': 'unknown', 'message': line}) + + self.progress('log', line) + else: + app_log.info("Finished streaming logs of %s", self.name) + class FakeBuild(Build): """ diff --git a/binderhub/builder.py b/binderhub/builder.py index 67168da12..3b7f58e79 100644 --- a/binderhub/builder.py +++ b/binderhub/builder.py @@ -12,14 +12,13 @@ import docker import escapism from tornado.concurrent import chain_future, Future -from tornado import gen from tornado.queues import Queue from tornado.ioloop import IOLoop from tornado.log import app_log from traitlets import Dict, HasTraits, Instance, Unicode from prometheus_client import Counter, Histogram, Gauge -from .build import Build, BuildWatcher, FakeBuild +from .build import Build, FakeBuild from .events import EventLog from .registry import DockerRegistry from .repoproviders import RepoProvider @@ -76,7 +75,7 @@ class Builder(HasTraits): provider_prefix = Unicode() provider = Instance(RepoProvider) spec = Unicode() - build = Instance(BuildWatcher) + build = Instance(Build) origin = Unicode() def _generate_build_name(self, build_slug, ref, prefix='', limit=63, ref_length=6): @@ -267,11 +266,11 @@ async def submit_with_timing(): async def watch(self, stream_logs=True): pool = self.settings['build_pool'] if self.build is None: - BuildWatcherClass = ( - FakeBuild if self.settings.get('fake_build') else BuildWatcher + BuildClass = ( + FakeBuild if self.settings.get('fake_build') else Build ) q = Queue() - self.build = BuildWatcherClass( + self.build = Build( q=q, api=self.settings['kubernetes'], name=self.build_name,