From 823480c9cf89078146fb8eead99b5e433f51b2ea Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 24 Feb 2023 13:58:08 +0100 Subject: [PATCH 1/4] Limit concurrency of API requests and check for specific endpoints instead of getting everything, which gets expensive --- jupyterhub_traefik_proxy/fileprovider.py | 3 +- jupyterhub_traefik_proxy/kv_proxy.py | 55 ++++++++++++++++-------- jupyterhub_traefik_proxy/proxy.py | 40 +++++++++++++---- 3 files changed, 71 insertions(+), 27 deletions(-) diff --git a/jupyterhub_traefik_proxy/fileprovider.py b/jupyterhub_traefik_proxy/fileprovider.py index f7e00f1f..57ca2087 100644 --- a/jupyterhub_traefik_proxy/fileprovider.py +++ b/jupyterhub_traefik_proxy/fileprovider.py @@ -218,7 +218,8 @@ async def add_route(self, routespec, target, data): ) raise try: - await self._wait_for_route(traefik_routespec) + async with self.semaphore: + await self._wait_for_route(traefik_routespec) except TimeoutError: self.log.error( f"Is Traefik configured to watch {self.dynamic_config_file}?" diff --git a/jupyterhub_traefik_proxy/kv_proxy.py b/jupyterhub_traefik_proxy/kv_proxy.py index 9ec787b7..d240e612 100644 --- a/jupyterhub_traefik_proxy/kv_proxy.py +++ b/jupyterhub_traefik_proxy/kv_proxy.py @@ -18,9 +18,12 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import json import os from collections.abc import MutableMapping +from functools import wraps +from weakref import WeakKeyDictionary import escapism from traitlets import Unicode @@ -29,6 +32,29 @@ from .proxy import TraefikProxy +def _one_at_a_time(method): + """decorator to limit an async method to be called only once + + If multiple concurrent calls to this method are made, + queue them instead of allowing them to be concurrently outstanding. + """ + # use weak dict for locks + # so that the lock is always acquired within the current asyncio loop + # should only be relevant in testing, where eventloops are created and destroyed often + method._locks = WeakKeyDictionary() + + @wraps(method) + async def locked_method(*args, **kwargs): + loop = asyncio.get_event_loop() + lock = method._locks.get(loop, None) + if lock is None: + lock = method._locks[loop] = asyncio.Lock() + async with lock: + return await method(*args, **kwargs) + + return locked_method + + class TKvProxy(TraefikProxy): """ JupyterHub Proxy implementation using traefik and a key-value store. @@ -241,19 +267,11 @@ async def add_route(self, routespec, target, data): [self.kv_jupyterhub_prefix, "routes", escapism.escape(routespec)] ) - status, response = await self._kv_atomic_add_route_parts( - jupyterhub_routespec, target, data, route_keys, rule - ) + async with self.semaphore: + status, response = await self._kv_atomic_add_route_parts( + jupyterhub_routespec, target, data, route_keys, rule + ) - if self.should_start: - try: - # Check if traefik was launched - self.traefik_process.pid - except AttributeError: - self.log.error( - "You cannot add routes if the proxy isn't running! Please start the proxy: proxy.start()" - ) - raise if status: self.log.info( "Added service %s with the alias %s.", target, route_keys.service_alias @@ -268,8 +286,9 @@ async def add_route(self, routespec, target, data): self.log.error( "Couldn't add route for %s. Response: %s", routespec, response ) - - await self._wait_for_route(routespec) + raise RuntimeError(f"Couldn't add route for {routespec}") + async with self.semaphore: + await self._wait_for_route(routespec) async def delete_route(self, routespec): """Delete a route and all the traefik related info associated given a routespec, @@ -283,9 +302,10 @@ async def delete_route(self, routespec): self, routespec, separator=self.kv_separator ) - status, response = await self._kv_atomic_delete_route_parts( - jupyterhub_routespec, route_keys - ) + async with self.semaphore: + status, response = await self._kv_atomic_delete_route_parts( + jupyterhub_routespec, route_keys + ) if status: self.log.info("Routespec %s was deleted.", routespec) else: @@ -293,6 +313,7 @@ async def delete_route(self, routespec): "Couldn't delete route %s. Response: %s", routespec, response ) + @_one_at_a_time async def get_all_routes(self): """Fetch and return all the routes associated by JupyterHub from the proxy. diff --git a/jupyterhub_traefik_proxy/proxy.py b/jupyterhub_traefik_proxy/proxy.py index 58ee77ef..31df3233 100644 --- a/jupyterhub_traefik_proxy/proxy.py +++ b/jupyterhub_traefik_proxy/proxy.py @@ -18,6 +18,7 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import json import os from os.path import abspath @@ -26,8 +27,8 @@ from jupyterhub.proxy import Proxy from jupyterhub.utils import exponential_backoff, new_token, url_path_join -from tornado.httpclient import AsyncHTTPClient -from traitlets import Any, Bool, Dict, Integer, Unicode, default, validate +from tornado.httpclient import AsyncHTTPClient, HTTPClientError +from traitlets import Any, Bool, Dict, Integer, Unicode, default, observe, validate from . import traefik_utils @@ -37,6 +38,26 @@ class TraefikProxy(Proxy): traefik_process = Any() + concurrency = Integer( + 10, + config=True, + help=""" + The number of requests allowed to be concurrently outstanding to the proxy + + Limiting this number avoids potential timeout errors + by sending too many requests to update the proxy at once + """, + ) + semaphore = Any() + + @default('semaphore') + def _default_semaphore(self): + return asyncio.BoundedSemaphore(self.concurrency) + + @observe('concurrency') + def _concurrency_changed(self, change): + self.semaphore = asyncio.BoundedSemaphore(change.new) + static_config_file = Unicode( "traefik.toml", config=True, help="""traefik's static configuration file""" ) @@ -235,17 +256,18 @@ async def _check_for_traefik_service(self, routespec, kind): expected = ( traefik_utils.generate_alias(routespec, kind) + "@" + self.provider_name ) - path = f"/api/http/{kind}s" + path = f"/api/http/{kind}s/{expected}" try: resp = await self._traefik_api_request(path) - json_data = json.loads(resp.body) - except Exception: + json.loads(resp.body) + except HTTPClientError as e: + if e.code == 404: + self.log.debug(f"traefik {expected} not yet in {kind}") + return False self.log.exception(f"Error checking traefik api for {kind} {routespec}") return False - - service_names = [service['name'] for service in json_data] - if expected not in service_names: - self.log.debug(f"traefik {expected} not yet in {kind}") + except Exception: + self.log.exception(f"Error checking traefik api for {kind} {routespec}") return False # found the expected endpoint From 92672ddbdb7e6f4cdfd2b06ca0b891a1cb9a5fb4 Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 2 Mar 2023 13:33:21 +0100 Subject: [PATCH 2/4] isntead of queuing get_all, piggy-back same limit on the number of outstanding calls, but bunches results instead of starting an ever-growing queue --- jupyterhub_traefik_proxy/kv_proxy.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/jupyterhub_traefik_proxy/kv_proxy.py b/jupyterhub_traefik_proxy/kv_proxy.py index d240e612..8146e0da 100644 --- a/jupyterhub_traefik_proxy/kv_proxy.py +++ b/jupyterhub_traefik_proxy/kv_proxy.py @@ -23,7 +23,6 @@ import os from collections.abc import MutableMapping from functools import wraps -from weakref import WeakKeyDictionary import escapism from traitlets import Unicode @@ -36,21 +35,24 @@ def _one_at_a_time(method): """decorator to limit an async method to be called only once If multiple concurrent calls to this method are made, - queue them instead of allowing them to be concurrently outstanding. + piggy-back on the outstanding call instead of queuing + or letting requests pile up. """ - # use weak dict for locks - # so that the lock is always acquired within the current asyncio loop - # should only be relevant in testing, where eventloops are created and destroyed often - method._locks = WeakKeyDictionary() @wraps(method) async def locked_method(*args, **kwargs): - loop = asyncio.get_event_loop() - lock = method._locks.get(loop, None) - if lock is None: - lock = method._locks[loop] = asyncio.Lock() - async with lock: - return await method(*args, **kwargs) + if getattr(method, "_shared_future", None) is not None: + f = method._shared_future + if f.done(): + method._shared_future = None + else: + return await f + + method._shared_future = f = asyncio.ensure_future(method(*args, **kwargs)) + try: + return await f + finally: + method._shared_future = None return locked_method From cf93289f83953fda14f322f1af968b87812829c0 Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 2 Mar 2023 14:14:48 +0100 Subject: [PATCH 3/4] reduce scale factor on traefik wait increases responsiveness --- jupyterhub_traefik_proxy/proxy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/jupyterhub_traefik_proxy/proxy.py b/jupyterhub_traefik_proxy/proxy.py index 31df3233..f35b12d1 100644 --- a/jupyterhub_traefik_proxy/proxy.py +++ b/jupyterhub_traefik_proxy/proxy.py @@ -288,6 +288,7 @@ async def _check_traefik_dynamic_conf_ready(): await exponential_backoff( _check_traefik_dynamic_conf_ready, f"Traefik route for {routespec} configuration not available", + scale_factor=1.2, timeout=self.check_route_timeout, ) From 11d31579cc0d7d90d3a821bef8d83128ca68588a Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 2 Mar 2023 14:15:58 +0100 Subject: [PATCH 4/4] add traefik_providers_throttle_duration config and set to 0 by default and set a default of 100ms unlike a general purpose use case, we want routes to be reflected immediately --- jupyterhub_traefik_proxy/proxy.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/jupyterhub_traefik_proxy/proxy.py b/jupyterhub_traefik_proxy/proxy.py index f35b12d1..71d2edf0 100644 --- a/jupyterhub_traefik_proxy/proxy.py +++ b/jupyterhub_traefik_proxy/proxy.py @@ -107,6 +107,20 @@ def __init__(self, **kwargs): static_config = Dict() dynamic_config = Dict() + traefik_providers_throttle_duration = Unicode( + "0s", + config=True, + help=""" + throttle traefik reloads of configuration. + + When traefik sees a change in configuration, + it will wait this long before applying the next one. + This affects how long adding a user to the proxy will take. + + See https://doc.traefik.io/traefik/providers/overview/#providersprovidersthrottleduration + """, + ) + traefik_api_url = Unicode( "http://localhost:8099", config=True, @@ -371,6 +385,9 @@ async def _setup_traefik_static_config(self): Subclasses should specify any traefik providers themselves, in :attrib:`self.static_config["providers"]` """ + self.static_config["providers"][ + "providersThrottleDuration" + ] = self.traefik_providers_throttle_duration if self.traefik_log_level: self.static_config["log"] = {"level": self.traefik_log_level}