Skip to content

Commit

Permalink
Merge pull request #165 from minrk/tweak-perf
Browse files Browse the repository at this point in the history
Improve performance, scaling
  • Loading branch information
minrk authored Mar 10, 2023
2 parents 79c6ef0 + 242e40b commit f2e3d4b
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 26 deletions.
3 changes: 2 additions & 1 deletion jupyterhub_traefik_proxy/fileprovider.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ async def add_route(self, routespec, target, data):
)
raise
try:
await self._wait_for_route(traefik_routespec)
async with self.semaphore:
await self._wait_for_route(traefik_routespec)
except TimeoutError:
self.log.error(
f"Is Traefik configured to watch {self.dynamic_config_file}?"
Expand Down
57 changes: 40 additions & 17 deletions jupyterhub_traefik_proxy/kv_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

import asyncio
import json
from collections.abc import MutableMapping
from functools import wraps

import escapism
from traitlets import Unicode
Expand All @@ -28,6 +30,32 @@
from .proxy import TraefikProxy


def _one_at_a_time(method):
"""decorator to limit an async method to be called only once
If multiple concurrent calls to this method are made,
piggy-back on the outstanding call instead of queuing
or letting requests pile up.
"""

@wraps(method)
async def locked_method(*args, **kwargs):
if getattr(method, "_shared_future", None) is not None:
f = method._shared_future
if f.done():
method._shared_future = None
else:
return await f

method._shared_future = f = asyncio.ensure_future(method(*args, **kwargs))
try:
return await f
finally:
method._shared_future = None

return locked_method


class TKvProxy(TraefikProxy):
"""
JupyterHub Proxy implementation using traefik and a key-value store.
Expand Down Expand Up @@ -218,19 +246,11 @@ async def add_route(self, routespec, target, data):
[self.kv_jupyterhub_prefix, "routes", escapism.escape(routespec)]
)

status, response = await self._kv_atomic_add_route_parts(
jupyterhub_routespec, target, data, route_keys, rule
)
async with self.semaphore:
status, response = await self._kv_atomic_add_route_parts(
jupyterhub_routespec, target, data, route_keys, rule
)

if self.should_start:
try:
# Check if traefik was launched
self.traefik_process.pid
except AttributeError:
self.log.error(
"You cannot add routes if the proxy isn't running! Please start the proxy: proxy.start()"
)
raise
if status:
self.log.debug(
"Added service %s with the alias %s.", target, route_keys.service_alias
Expand All @@ -245,8 +265,9 @@ async def add_route(self, routespec, target, data):
self.log.error(
"Couldn't add route for %s. Response: %s", routespec, response
)

await self._wait_for_route(routespec)
raise RuntimeError(f"Couldn't add route for {routespec}")
async with self.semaphore:
await self._wait_for_route(routespec)

async def delete_route(self, routespec):
"""Delete a route and all the traefik related info associated given a routespec,
Expand All @@ -260,16 +281,18 @@ async def delete_route(self, routespec):
self, routespec, separator=self.kv_separator
)

status, response = await self._kv_atomic_delete_route_parts(
jupyterhub_routespec, route_keys
)
async with self.semaphore:
status, response = await self._kv_atomic_delete_route_parts(
jupyterhub_routespec, route_keys
)
if status:
self.log.debug("Routespec %s was deleted.", routespec)
else:
self.log.error(
"Couldn't delete route %s. Response: %s", routespec, response
)

@_one_at_a_time
async def get_all_routes(self):
"""Fetch and return all the routes associated by JupyterHub from the
proxy.
Expand Down
56 changes: 48 additions & 8 deletions jupyterhub_traefik_proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

import asyncio
import json
import os
from os.path import abspath
Expand All @@ -27,7 +28,7 @@
from jupyterhub.proxy import Proxy
from jupyterhub.utils import exponential_backoff, new_token, url_path_join
from tornado.httpclient import AsyncHTTPClient, HTTPClientError
from traitlets import Any, Bool, Dict, Integer, Unicode, default, validate
from traitlets import Any, Bool, Dict, Integer, Unicode, default, observe, validate

from . import traefik_utils

Expand All @@ -37,6 +38,26 @@ class TraefikProxy(Proxy):

traefik_process = Any()

concurrency = Integer(
10,
config=True,
help="""
The number of requests allowed to be concurrently outstanding to the proxy
Limiting this number avoids potential timeout errors
by sending too many requests to update the proxy at once
""",
)
semaphore = Any()

@default('semaphore')
def _default_semaphore(self):
return asyncio.BoundedSemaphore(self.concurrency)

@observe('concurrency')
def _concurrency_changed(self, change):
self.semaphore = asyncio.BoundedSemaphore(change.new)

static_config_file = Unicode(
"traefik.toml", config=True, help="""traefik's static configuration file"""
)
Expand Down Expand Up @@ -86,6 +107,20 @@ def __init__(self, **kwargs):
static_config = Dict()
dynamic_config = Dict()

traefik_providers_throttle_duration = Unicode(
"0s",
config=True,
help="""
throttle traefik reloads of configuration.
When traefik sees a change in configuration,
it will wait this long before applying the next one.
This affects how long adding a user to the proxy will take.
See https://doc.traefik.io/traefik/providers/overview/#providersprovidersthrottleduration
""",
)

traefik_api_url = Unicode(
"http://localhost:8099",
config=True,
Expand Down Expand Up @@ -235,17 +270,18 @@ async def _check_for_traefik_service(self, routespec, kind):
expected = (
traefik_utils.generate_alias(routespec, kind) + "@" + self.provider_name
)
path = f"/api/http/{kind}s"
path = f"/api/http/{kind}s/{expected}"
try:
resp = await self._traefik_api_request(path)
json_data = json.loads(resp.body)
except Exception:
json.loads(resp.body)
except HTTPClientError as e:
if e.code == 404:
self.log.debug(f"traefik {expected} not yet in {kind}")
return False
self.log.exception(f"Error checking traefik api for {kind} {routespec}")
return False

service_names = [service['name'] for service in json_data]
if expected not in service_names:
self.log.debug(f"traefik {expected} not yet in {kind}")
except Exception:
self.log.exception(f"Error checking traefik api for {kind} {routespec}")
return False

# found the expected endpoint
Expand All @@ -266,6 +302,7 @@ async def _check_traefik_dynamic_conf_ready():
await exponential_backoff(
_check_traefik_dynamic_conf_ready,
f"Traefik route for {routespec} configuration not available",
scale_factor=1.2,
timeout=self.check_route_timeout,
)

Expand Down Expand Up @@ -358,6 +395,9 @@ async def _setup_traefik_static_config(self):
Subclasses should specify any traefik providers themselves, in
:attrib:`self.static_config["providers"]`
"""
self.static_config["providers"][
"providersThrottleDuration"
] = self.traefik_providers_throttle_duration

if self.traefik_log_level:
self.static_config["log"] = {"level": self.traefik_log_level}
Expand Down

0 comments on commit f2e3d4b

Please sign in to comment.