From 5c647704a93648dd3aba04a35e55eee3b8edc213 Mon Sep 17 00:00:00 2001 From: "P. Raj Kumar" Date: Fri, 11 Jan 2019 20:32:48 -0800 Subject: [PATCH 01/10] Add health monitor and failure checker --- uplink/circuit_breaker.py | 58 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 uplink/circuit_breaker.py diff --git a/uplink/circuit_breaker.py b/uplink/circuit_breaker.py new file mode 100644 index 00000000..c15b4dc3 --- /dev/null +++ b/uplink/circuit_breaker.py @@ -0,0 +1,58 @@ +# Local imports +from uplink.clients.io import RequestTemplate, transitions + + +class CircuitBreaker(object): + def report_success(self, response): + raise NotImplementedError + + def report_failure(self, response): + raise NotImplementedError + + def report_exception(self, exc_type, exc_val, exc_tb): + raise NotImplementedError + + def is_tripped(self): + raise NotImplementedError + + +class FailureChecker(object): + def is_failure(self, response): + raise NotImplementedError + + def is_expected_exception(self, exc_type, exc_val, exc_tb): + raise NotImplementedError + + +class BasicFailureChecker(FailureChecker): + def is_failure(self, response): + return False + + def is_expected_exception(self, exc_type, exc_val, exc_tb): + return False + + +class CircuitRequestTemplate(RequestTemplate): + def __init__(self, circuit_breaker, fallback_func, failure_checker): + self._circuit_breaker = circuit_breaker + self._fallback_func = fallback_func + self._failure_checker = failure_checker + + def before_request(self, request): + if self._circuit_breaker.is_tripped(): + # Short-circuit. + return transitions.finish(self._fallback_func(request)) + + def after_response(self, request, response): + if self._failure_checker.is_failure(response): + self._circuit_breaker.report_failure(response) + return transitions.finish(self._fallback_func(request)) + else: + self._circuit_breaker.report_success(response) + + def after_exception(self, request, exc_type, exc_val, exc_tb): + self._circuit_breaker.report_exception(exc_type, exc_val, exc_tb) + if self._failure_checker.is_expected_exception( + exc_type, exc_val, exc_tb + ): + return transitions.finish(self._fallback_func(request)) From 3a41f235c119deb00933cdb7018d1b2a2aefa3fd Mon Sep 17 00:00:00 2001 From: "P. Raj Kumar" Date: Sat, 19 Jan 2019 12:20:05 -0800 Subject: [PATCH 02/10] Update circuit breaker --- uplink/circuit_breaker.py | 92 ++++++++++++++++++++++++++++++++++----- 1 file changed, 81 insertions(+), 11 deletions(-) diff --git a/uplink/circuit_breaker.py b/uplink/circuit_breaker.py index c15b4dc3..2c7afeb8 100644 --- a/uplink/circuit_breaker.py +++ b/uplink/circuit_breaker.py @@ -1,19 +1,85 @@ +# Standard library imports +import contextlib +import threading + # Local imports from uplink.clients.io import RequestTemplate, transitions +class FailureTracker(object): + _failures = 0 + + def __init__(self, max_failures, circuit_breaker): + self._max_failures = max_failures + self._circuit_breaker = circuit_breaker + self.__lock = threading.RLock() + self._failures = 0 + + def increment(self): + with self._acquire_lock(): + self._failures = min(self._failures + 1, self._max_failures) + if self._failures > self._max_failures: + self._circuit_breaker.trip() + + def decrement(self): + with self._acquire_lock(): + self._decrement_without_lock(1) + + def _decrement_without_lock(self, delta): + self._failures = max(self._failures - delta, 0) + if ( + self._circuit_breaker.tripped() + and self._failures <= self._max_failures + ): + self._circuit_breaker.reset() + + @contextlib.contextmanager + def _acquire_lock(self): + with self.__lock: + since_last_reset = self._clock() - self._last_reset + if since_last_reset >= self._period: + self.__decrement_without_lock(since_last_reset / self._period) + self._last_reset = self._clock() + yield + + class CircuitBreaker(object): + def __init__(self): + self._tripped = False + self._force_tripped = False + self._lock = threading.RLock() + + def trip(self): + with self._lock: + self._tripped = True + + def reset(self): + with self._lock: + self._tripped = False + + def force_trip(self): + with self._lock: + self._force_tripped = True + + def force_reset(self): + with self._lock: + self._force_tripped = False + + @property + def tripped(self): + with self._lock: + return self._force_tripped or self._tripped + + +class HealthMonitor(object): def report_success(self, response): - raise NotImplementedError + self._failure_tracker.decrement() def report_failure(self, response): - raise NotImplementedError + self._failure_tracker.increment() def report_exception(self, exc_type, exc_val, exc_tb): - raise NotImplementedError - - def is_tripped(self): - raise NotImplementedError + self._failure_tracker.increment() class FailureChecker(object): @@ -33,26 +99,30 @@ def is_expected_exception(self, exc_type, exc_val, exc_tb): class CircuitRequestTemplate(RequestTemplate): - def __init__(self, circuit_breaker, fallback_func, failure_checker): + def __init__( + self, circuit_breaker, health_monitor, fallback_func, failure_checker + ): self._circuit_breaker = circuit_breaker self._fallback_func = fallback_func self._failure_checker = failure_checker + self._health_monitor = health_monitor def before_request(self, request): - if self._circuit_breaker.is_tripped(): + if self._circuit_breaker.tripped: # Short-circuit. return transitions.finish(self._fallback_func(request)) def after_response(self, request, response): if self._failure_checker.is_failure(response): - self._circuit_breaker.report_failure(response) + self._health_monitor.report_failure(response) return transitions.finish(self._fallback_func(request)) else: - self._circuit_breaker.report_success(response) + self._health_monitor.report_success(response) def after_exception(self, request, exc_type, exc_val, exc_tb): - self._circuit_breaker.report_exception(exc_type, exc_val, exc_tb) + self._health_monitor.report_exception(exc_type, exc_val, exc_tb) if self._failure_checker.is_expected_exception( exc_type, exc_val, exc_tb ): return transitions.finish(self._fallback_func(request)) + return transitions.fail() From 924b5eabb3d38e9273d6cfc4a5ff8023e5d2dfb8 Mon Sep 17 00:00:00 2001 From: "P. Raj Kumar" Date: Tue, 5 Feb 2019 09:19:28 -0800 Subject: [PATCH 03/10] Add circuit breaker states --- uplink/circuit_breaker.py | 79 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/uplink/circuit_breaker.py b/uplink/circuit_breaker.py index 2c7afeb8..85082906 100644 --- a/uplink/circuit_breaker.py +++ b/uplink/circuit_breaker.py @@ -1,11 +1,90 @@ # Standard library imports import contextlib +import time import threading # Local imports from uplink.clients.io import RequestTemplate, transitions +# Use monotonic time if available, otherwise fall back to the system clock. +now = time.monotonic if hasattr(time, "monotonic") else time.time + + +# Circuit breaker states from pg. 95 of Release It! (2nd Edition) +# by Michael T. Nygard + + +class CircuitBreakerState(object): + def prepare(self, breaker): + pass + + def is_closed(self, breaker): + pass + + def on_successful_call(self, breaker): + pass + + def on_failed_call(self, breaker): + pass + + +class Closed(CircuitBreakerState): + def __init__(self, threshold): + self._threshold = threshold + self._failure_count = 0 + + def is_closed(self, breaker): + # Pass through. + return True + + def on_successful_call(self, breaker): + # Reset count. + self._failure_count = 0 + + def on_failed_call(self, breaker): + # Count failure + self._failure_count += 1 + + # Trip breaker if threshold reached + if self._threshold > self._failure_count: + breaker.trip() + + +class Open(CircuitBreakerState): + def __init__(self, timeout, clock): + self._timeout = timeout + self._clock = clock + self._start_time = clock() + + def prepare(self, breaker): + # On timeout, attempt reset. + if self.period_remaining <= 0: + breaker.attempt_reset() + + def is_closed(self, breaker): + # Fail fast. + return False + + @property + def period_remaining(self): + return self._timeout - (self._clock() - self._start_time) + + +class HalfOpen(CircuitBreakerState): + def is_closed(self, breaker): + # Pass through. + return True + + def on_successful_call(self, breaker): + # Reset circuit. + breaker.reset() + + def on_failed_call(self, breaker): + # Trip breaker. + breaker.trip() + + class FailureTracker(object): _failures = 0 From 013d04f87dd46126359a070dd25995fefc46ccca Mon Sep 17 00:00:00 2001 From: "P. Raj Kumar" Date: Tue, 5 Feb 2019 19:29:41 -0800 Subject: [PATCH 04/10] Define interfaces --- uplink/circuit_breaker.py | 191 +++++++++++++++++--------------------- 1 file changed, 86 insertions(+), 105 deletions(-) diff --git a/uplink/circuit_breaker.py b/uplink/circuit_breaker.py index 85082906..276518cf 100644 --- a/uplink/circuit_breaker.py +++ b/uplink/circuit_breaker.py @@ -1,7 +1,5 @@ # Standard library imports -import contextlib import time -import threading # Local imports from uplink.clients.io import RequestTemplate, transitions @@ -11,6 +9,11 @@ now = time.monotonic if hasattr(time, "monotonic") else time.time +class CircuitBreakerOpen(Exception): + # TODO: Define body. + pass + + # Circuit breaker states from pg. 95 of Release It! (2nd Edition) # by Michael T. Nygard @@ -22,32 +25,31 @@ def prepare(self, breaker): def is_closed(self, breaker): pass - def on_successful_call(self, breaker): + def on_success(self, breaker): pass - def on_failed_call(self, breaker): + def on_error(self, breaker, failure): pass class Closed(CircuitBreakerState): - def __init__(self, threshold): - self._threshold = threshold - self._failure_count = 0 + def __init__(self, failure_counter): + self._failure_counter = failure_counter def is_closed(self, breaker): # Pass through. return True - def on_successful_call(self, breaker): + def on_success(self, breaker): # Reset count. - self._failure_count = 0 + self._failure_counter.reset() - def on_failed_call(self, breaker): - # Count failure - self._failure_count += 1 + def on_error(self, breaker, failure): + # Count failure. + self._failure_counter.count(failure) # Trip breaker if threshold reached - if self._threshold > self._failure_count: + if self._failure_counter.has_exceeded_treshold(): breaker.trip() @@ -76,132 +78,111 @@ def is_closed(self, breaker): # Pass through. return True - def on_successful_call(self, breaker): + def on_success(self, breaker): # Reset circuit. breaker.reset() - def on_failed_call(self, breaker): + def on_error(self, breaker): # Trip breaker. breaker.trip() -class FailureTracker(object): - _failures = 0 +class ForceOpened(CircuitBreakerState): + def is_closed(self, breaker): + # Fail always. + return False - def __init__(self, max_failures, circuit_breaker): - self._max_failures = max_failures - self._circuit_breaker = circuit_breaker - self.__lock = threading.RLock() - self._failures = 0 - - def increment(self): - with self._acquire_lock(): - self._failures = min(self._failures + 1, self._max_failures) - if self._failures > self._max_failures: - self._circuit_breaker.trip() - - def decrement(self): - with self._acquire_lock(): - self._decrement_without_lock(1) - - def _decrement_without_lock(self, delta): - self._failures = max(self._failures - delta, 0) - if ( - self._circuit_breaker.tripped() - and self._failures <= self._max_failures - ): - self._circuit_breaker.reset() - - @contextlib.contextmanager - def _acquire_lock(self): - with self.__lock: - since_last_reset = self._clock() - self._last_reset - if since_last_reset >= self._period: - self.__decrement_without_lock(since_last_reset / self._period) - self._last_reset = self._clock() - yield - - -class CircuitBreaker(object): - def __init__(self): - self._tripped = False - self._force_tripped = False - self._lock = threading.RLock() - def trip(self): - with self._lock: - self._tripped = True +class Disabled(CircuitBreakerState): + def is_closed(self, breaker): + # Pass through always. + return True + + +class FailureCounter(object): + def count(self, failure): + raise NotImplementedError def reset(self): - with self._lock: - self._tripped = False + raise NotImplementedError + - def force_trip(self): - with self._lock: - self._force_tripped = True +class BaseCircuitBreaker(object): + def __init__(self, timeout, failure_counter, health_monitor): + self._timeout = timeout + self._health_monitor = health_monitor + self._failure_counter = failure_counter + self._state = None + self.reset() + + def reset(self): + self._state = Closed(self._failure_counter) - def force_reset(self): - with self._lock: - self._force_tripped = False + def force_open(self): + self._state = ForceOpened() + + def disable(self): + self._state = Disabled() + + def attempt_reset(self): + self._state = HalfOpen() + + def trip(self): + self._state = Open(self._timeout, clock=now) + + def on_success(self, request, response): + self._health_monitor.report_success(request, response) + self._state.on_success(self) + + def on_failure(self, request, failure): + self._health_monitor.report_failure(request, failure) + self._state.on_failure(self, failure) @property - def tripped(self): - with self._lock: - return self._force_tripped or self._tripped + def closed(self): + self._state.prepare(self) + return self._state.is_closed() class HealthMonitor(object): def report_success(self, response): - self._failure_tracker.decrement() - - def report_failure(self, response): - self._failure_tracker.increment() + pass - def report_exception(self, exc_type, exc_val, exc_tb): - self._failure_tracker.increment() + def report_failure(self, failure): + pass -class FailureChecker(object): - def is_failure(self, response): +class FailureFactory(object): + def from_response(self, response): raise NotImplementedError - def is_expected_exception(self, exc_type, exc_val, exc_tb): + def from_exception(self, exc_val): raise NotImplementedError -class BasicFailureChecker(FailureChecker): - def is_failure(self, response): - return False - - def is_expected_exception(self, exc_type, exc_val, exc_tb): - return False - - class CircuitRequestTemplate(RequestTemplate): - def __init__( - self, circuit_breaker, health_monitor, fallback_func, failure_checker - ): + def __init__(self, circuit_breaker, fallback, failure_factory): self._circuit_breaker = circuit_breaker - self._fallback_func = fallback_func - self._failure_checker = failure_checker - self._health_monitor = health_monitor + self._fallback = fallback + self._failure_factory = failure_factory def before_request(self, request): - if self._circuit_breaker.tripped: + if not self._circuit_breaker.closed: + if not callable(self._fallback): + raise CircuitBreakerOpen() + # Short-circuit. - return transitions.finish(self._fallback_func(request)) + return transitions.finish(self._fallback(request)) def after_response(self, request, response): - if self._failure_checker.is_failure(response): - self._health_monitor.report_failure(response) - return transitions.finish(self._fallback_func(request)) + failure = self._failure_factory.from_response(response) + if failure is None: + self._circuit_breaker.on_success(request, response) else: - self._health_monitor.report_success(response) + self._circuit_breaker.on_failure(request, failure) def after_exception(self, request, exc_type, exc_val, exc_tb): - self._health_monitor.report_exception(exc_type, exc_val, exc_tb) - if self._failure_checker.is_expected_exception( - exc_type, exc_val, exc_tb - ): - return transitions.finish(self._fallback_func(request)) - return transitions.fail() + failure = self._failure_factory.from_exception(exc_val) + self._circuit_breaker.on_failure(request, failure) + if callable(self._fallback): + return transitions.finish(self._fallback(request)) From 25bd90d0e87bb51140d6a0edb2e338c4ad3b2902 Mon Sep 17 00:00:00 2001 From: "P. Raj Kumar" Date: Tue, 5 Feb 2019 20:07:36 -0800 Subject: [PATCH 05/10] Add Failure contract --- uplink/circuit_breaker.py | 42 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/uplink/circuit_breaker.py b/uplink/circuit_breaker.py index 276518cf..1fae1b2b 100644 --- a/uplink/circuit_breaker.py +++ b/uplink/circuit_breaker.py @@ -99,6 +99,36 @@ def is_closed(self, breaker): return True +class Failure(object): + def __init__(self, expected=False, exception=None, status_code=None): + self._expected = expected + self._exception = exception + self._status_code = status_code + + @staticmethod + def of_response(response): + return Failure(status_code=response.status_code) + + @staticmethod + def of_exception(exception, expected): + return Failure(exception=exception, expected=expected) + + @property + def expected(self): + return self._expected + + def is_exception(self): + return self._exception is not None + + @property + def exception(self): + return self._exception + + @property + def status_code(self): + return self._status_code + + class FailureCounter(object): def count(self, failure): raise NotImplementedError @@ -156,10 +186,18 @@ class FailureFactory(object): def from_response(self, response): raise NotImplementedError - def from_exception(self, exc_val): + def from_exception(self, exception): raise NotImplementedError +class BasicFailureFactory(FailureFactory): + def from_response(self, response): + return None + + def from_exception(self, exception): + return Failure.of_exception(exception=exception, expected=True) + + class CircuitRequestTemplate(RequestTemplate): def __init__(self, circuit_breaker, fallback, failure_factory): self._circuit_breaker = circuit_breaker @@ -184,5 +222,5 @@ def after_response(self, request, response): def after_exception(self, request, exc_type, exc_val, exc_tb): failure = self._failure_factory.from_exception(exc_val) self._circuit_breaker.on_failure(request, failure) - if callable(self._fallback): + if failure.is_expected and callable(self._fallback): return transitions.finish(self._fallback(request)) From a414854e0efe22975a662bb42a6d9a4dc59bcc5e Mon Sep 17 00:00:00 2001 From: "P. Raj Kumar" Date: Wed, 6 Feb 2019 12:40:21 -0800 Subject: [PATCH 06/10] Add monitoring for circuit breaker --- uplink/circuit_breaker.py | 172 +++++++++++++++++++++++++++++--------- 1 file changed, 134 insertions(+), 38 deletions(-) diff --git a/uplink/circuit_breaker.py b/uplink/circuit_breaker.py index 1fae1b2b..5e251254 100644 --- a/uplink/circuit_breaker.py +++ b/uplink/circuit_breaker.py @@ -1,4 +1,6 @@ # Standard library imports +import contextlib +import threading import time # Local imports @@ -41,15 +43,13 @@ def is_closed(self, breaker): return True def on_success(self, breaker): - # Reset count. - self._failure_counter.reset() + self._failure_counter.count_success() def on_error(self, breaker, failure): - # Count failure. - self._failure_counter.count(failure) + self._failure_counter.count_failure(failure) # Trip breaker if threshold reached - if self._failure_counter.has_exceeded_treshold(): + if self._failure_counter.is_above_threshold(): breaker.trip() @@ -74,17 +74,26 @@ def period_remaining(self): class HalfOpen(CircuitBreakerState): + def __init__(self, failure_counter): + self._failure_counter = failure_counter + def is_closed(self, breaker): # Pass through. return True def on_success(self, breaker): + self._failure_counter.count_success() + # Reset circuit. - breaker.reset() + if self._failure_counter.is_below_threshold(): + breaker.reset() + + def on_error(self, breaker, failure): + self._failure_counter.count_failure(failure) - def on_error(self, breaker): # Trip breaker. - breaker.trip() + if self._failure_counter.is_above_threshold(): + breaker.trip() class ForceOpened(CircuitBreakerState): @@ -100,8 +109,7 @@ def is_closed(self, breaker): class Failure(object): - def __init__(self, expected=False, exception=None, status_code=None): - self._expected = expected + def __init__(self, exception=None, status_code=None): self._exception = exception self._status_code = status_code @@ -110,12 +118,8 @@ def of_response(response): return Failure(status_code=response.status_code) @staticmethod - def of_exception(exception, expected): - return Failure(exception=exception, expected=expected) - - @property - def expected(self): - return self._expected + def of_exception(exception): + return Failure(exception=exception) def is_exception(self): return self._exception is not None @@ -137,16 +141,14 @@ def reset(self): raise NotImplementedError -class BaseCircuitBreaker(object): - def __init__(self, timeout, failure_counter, health_monitor): +class BreakerContext(object): + def __init__(self, failure_counter_factory, timeout): + self._failure_counter_factory = failure_counter_factory self._timeout = timeout - self._health_monitor = health_monitor - self._failure_counter = failure_counter self._state = None - self.reset() def reset(self): - self._state = Closed(self._failure_counter) + self._state = Closed(self._failure_counter_factory()) def force_open(self): self._state = ForceOpened() @@ -155,30 +157,112 @@ def disable(self): self._state = Disabled() def attempt_reset(self): - self._state = HalfOpen() + self._state = HalfOpen(self._failure_counter_factory()) def trip(self): self._state = Open(self._timeout, clock=now) + @property + def state(self): + return self._state + + +@contextlib.contextmanager +def _monitor_state_transition(context, monitor): + from_state = type(context.state) + yield + monitor.on_state_transistion(from_state, type(context.state)) + + +class MonitoringContextWrapper(object): + def __init__(self, context, monitor): + self._context = context + self._monitor = monitor + + def _monitor(self): + return _monitor_state_transition(self._context, self._monitor) + + def reset(self): + with self._monitor(): + self._context.reset() + + def force_open(self): + with self._monitor(): + self._context.force_open() + + def disable(self): + with self._monitor(): + self._context.disable() + + def attempt_reset(self): + with self._monitor(): + self._context.attempt_reset() + + def trip(self): + with self._monitor(): + self._context.trip() + + @property + def state(self): + return self._state + + +class CircuitBreaker(object): + def __init__(self, context, monitor): + self._context = context + self._monitor = monitor + self._context.reset() + self._lock = threading.RLock() + + @property + def _monitored_context(self): + return MonitoringContextWrapper(self._context, self._monitor) + + def reset(self): + with self._lock: + self._monitored_context.reset() + + def force_open(self): + with self._lock: + self._monitored_context.force_open() + + def disable(self): + with self._lock: + self._monitored_context.disable() + def on_success(self, request, response): - self._health_monitor.report_success(request, response) - self._state.on_success(self) + with self._lock: + self._monitor.on_success(request, response) + self._context.state.on_success(self._monitored_context) - def on_failure(self, request, failure): - self._health_monitor.report_failure(request, failure) - self._state.on_failure(self, failure) + def on_error(self, request, failure): + with self._lock: + self._monitor.on_error(request, failure) + self._context.state.on_error(self._monitored_context, failure) + + def update(self): + with self._lock: + self._context.state.prepare(self._monitored_context) @property def closed(self): - self._state.prepare(self) - return self._state.is_closed() + return self._context.state.is_closed() class HealthMonitor(object): - def report_success(self, response): + def on_state_transition(self, from_state, to_state): + pass + + def on_success(self, request, response): + pass + + def on_error(self, request, failure): pass - def report_failure(self, failure): + def on_ignored_error(self, request, failure): + pass + + def on_request_not_permitted(self, request): pass @@ -195,32 +279,44 @@ def from_response(self, response): return None def from_exception(self, exception): - return Failure.of_exception(exception=exception, expected=True) + return Failure.of_exception(exception=exception) class CircuitRequestTemplate(RequestTemplate): - def __init__(self, circuit_breaker, fallback, failure_factory): + def __init__(self, circuit_breaker, fallback, monitor, failure_factory): self._circuit_breaker = circuit_breaker self._fallback = fallback + self._monitor = monitor self._failure_factory = failure_factory def before_request(self, request): + self._circuit_breaker.update() + if not self._circuit_breaker.closed: + self._monitor.on_request_not_permitted(request) + if not callable(self._fallback): raise CircuitBreakerOpen() # Short-circuit. return transitions.finish(self._fallback(request)) + def _handle_failure(self, request, failure): + self._circuit_breaker.on_failure(request, failure) + + if callable(self._fallback): + return transitions.finish(self._fallback(request)) + def after_response(self, request, response): failure = self._failure_factory.from_response(response) if failure is None: self._circuit_breaker.on_success(request, response) else: - self._circuit_breaker.on_failure(request, failure) + return self._handle_failure(request, failure) def after_exception(self, request, exc_type, exc_val, exc_tb): failure = self._failure_factory.from_exception(exc_val) - self._circuit_breaker.on_failure(request, failure) - if failure.is_expected and callable(self._fallback): - return transitions.finish(self._fallback(request)) + if failure is not None: + return self._handle_failure(request, failure) + else: + self._monitor.on_ignored_error(request, exc_val) From dea500d8469e958e50dd552b8070b9e7fff2d061 Mon Sep 17 00:00:00 2001 From: "P. Raj Kumar" Date: Wed, 6 Feb 2019 14:45:46 -0800 Subject: [PATCH 07/10] Move locking to circuit breaker decorator --- uplink/circuit_breaker.py | 163 ++++++++++++++++++++++++++++---------- 1 file changed, 120 insertions(+), 43 deletions(-) diff --git a/uplink/circuit_breaker.py b/uplink/circuit_breaker.py index 5e251254..abef859b 100644 --- a/uplink/circuit_breaker.py +++ b/uplink/circuit_breaker.py @@ -141,11 +141,46 @@ def reset(self): raise NotImplementedError -class BreakerContext(object): +class CircuitBreaker(object): + def reset(self): + raise NotImplementedError + + def force_open(self): + raise NotImplementedError + + def disable(self): + raise NotImplementedError + + def attempt_reset(self): + raise NotImplementedError + + def trip(self): + raise NotImplementedError + + def on_success(self, request, response): + raise NotImplementedError + + def on_error(self, request, failure): + raise NotImplementedError + + def update(self): + raise NotImplementedError + + @property + def closed(self): + raise NotImplementedError + + @property + def state(self): + raise NotImplementedError + + +class BasicCircuitBreaker(CircuitBreaker): def __init__(self, failure_counter_factory, timeout): self._failure_counter_factory = failure_counter_factory self._timeout = timeout self._state = None + self.reset() def reset(self): self._state = Closed(self._failure_counter_factory()) @@ -162,91 +197,133 @@ def attempt_reset(self): def trip(self): self._state = Open(self._timeout, clock=now) + def on_success(self, request, response): + self._state.on_success(self) + + def on_error(self, request, failure): + self._state.on_failure(self, failure) + + def update(self): + self._state.prepare(self) + + @property + def closed(self): + return self._state.is_closed() + @property def state(self): return self._state +class CircuitBreakerDecorator(CircuitBreaker): + def __init__(self, breaker): + self._breaker = breaker + + def reset(self): + self._breaker.reset() + + def force_open(self): + self._breaker.force_open() + + def disable(self): + self._breaker.disble() + + def attempt_reset(self): + self._breaker.attempt_reset() + + def trip(self): + self._breaker.trip() + + def on_success(self, request, response): + self._breaker.on_success(request, response) + + def on_error(self, request, failure): + self._breaker.on_error(request, failure) + + def update(self): + self._breaker.update() + + @property + def state(self): + return self._breaker.state + + @property + def closed(self): + return self._breaker.closed + + @contextlib.contextmanager -def _monitor_state_transition(context, monitor): - from_state = type(context.state) +def _monitor_state_transition(breaker, monitor): + from_state = type(breaker.state) yield - monitor.on_state_transistion(from_state, type(context.state)) + monitor.on_state_transistion(from_state, type(breaker.state)) -class MonitoringContextWrapper(object): - def __init__(self, context, monitor): - self._context = context +class MonitoringCircuitBreaker(CircuitBreakerDecorator): + def __init__(self, breaker, monitor): + super(MonitoringCircuitBreaker, self).__init__(breaker) self._monitor = monitor - def _monitor(self): - return _monitor_state_transition(self._context, self._monitor) + def _monitor_state_transition(self): + return _monitor_state_transition(self._breaker, self._monitor) def reset(self): - with self._monitor(): - self._context.reset() + with self._monitor_state_transition(): + super(MonitoringCircuitBreaker, self).reset() def force_open(self): - with self._monitor(): - self._context.force_open() + with self._monitor_state_transition(): + super(MonitoringCircuitBreaker, self).force_open() def disable(self): - with self._monitor(): - self._context.disable() + with self._monitor_state_transition(): + super(MonitoringCircuitBreaker, self).disable() def attempt_reset(self): - with self._monitor(): - self._context.attempt_reset() + with self._monitor_state_transition(): + super(MonitoringCircuitBreaker, self).attempt_reset() def trip(self): - with self._monitor(): - self._context.trip() + with self._monitor_state_transition(): + super(MonitoringCircuitBreaker, self).reset() - @property - def state(self): - return self._state + def on_success(self, request, response): + self._monitor.on_success(request, response) + super(MonitoringCircuitBreaker, self).on_success(request, response) + def on_error(self, request, failure): + self._monitor.on_error(request, failure) + super(MonitoringCircuitBreaker, self).on_error(request, failure) -class CircuitBreaker(object): - def __init__(self, context, monitor): - self._context = context - self._monitor = monitor - self._context.reset() - self._lock = threading.RLock() - @property - def _monitored_context(self): - return MonitoringContextWrapper(self._context, self._monitor) +class AtomicCircuitBreaker(CircuitBreakerDecorator): + def __init__(self, breaker): + super(AtomicCircuitBreaker, self).__init__(breaker) + self._lock = threading.RLock() def reset(self): with self._lock: - self._monitored_context.reset() + super(AtomicCircuitBreaker, self).reset() def force_open(self): with self._lock: - self._monitored_context.force_open() + super(AtomicCircuitBreaker, self).force_open() def disable(self): with self._lock: - self._monitored_context.disable() + super(AtomicCircuitBreaker, self).disable() def on_success(self, request, response): with self._lock: - self._monitor.on_success(request, response) - self._context.state.on_success(self._monitored_context) + super(AtomicCircuitBreaker, self).on_success(request, response) def on_error(self, request, failure): with self._lock: - self._monitor.on_error(request, failure) - self._context.state.on_error(self._monitored_context, failure) + super(AtomicCircuitBreaker, self).on_error(request, failure) def update(self): with self._lock: - self._context.state.prepare(self._monitored_context) - - @property - def closed(self): - return self._context.state.is_closed() + super(AtomicCircuitBreaker, self).update() class HealthMonitor(object): From f4a0dd451b060ea0325e823e771748f3c3a0b2a1 Mon Sep 17 00:00:00 2001 From: "P. Raj Kumar" Date: Wed, 6 Feb 2019 17:02:32 -0800 Subject: [PATCH 08/10] Implement naive failure counter --- uplink/circuit_breaker.py | 109 ++++++++++++++++++++++++++++++-------- 1 file changed, 87 insertions(+), 22 deletions(-) diff --git a/uplink/circuit_breaker.py b/uplink/circuit_breaker.py index abef859b..ed604f31 100644 --- a/uplink/circuit_breaker.py +++ b/uplink/circuit_breaker.py @@ -11,7 +11,7 @@ now = time.monotonic if hasattr(time, "monotonic") else time.time -class CircuitBreakerOpen(Exception): +class CircuitBreakerOpenException(Exception): # TODO: Define body. pass @@ -24,7 +24,7 @@ class CircuitBreakerState(object): def prepare(self, breaker): pass - def is_closed(self, breaker): + def is_closed(self): pass def on_success(self, breaker): @@ -38,7 +38,7 @@ class Closed(CircuitBreakerState): def __init__(self, failure_counter): self._failure_counter = failure_counter - def is_closed(self, breaker): + def is_closed(self): # Pass through. return True @@ -64,7 +64,7 @@ def prepare(self, breaker): if self.period_remaining <= 0: breaker.attempt_reset() - def is_closed(self, breaker): + def is_closed(self): # Fail fast. return False @@ -77,8 +77,10 @@ class HalfOpen(CircuitBreakerState): def __init__(self, failure_counter): self._failure_counter = failure_counter - def is_closed(self, breaker): - # Pass through. + def is_closed(self): + # TODO: + # Consider only letting the first call go through, and failing fast on + # all other calls. return True def on_success(self, breaker): @@ -97,13 +99,13 @@ def on_error(self, breaker, failure): class ForceOpened(CircuitBreakerState): - def is_closed(self, breaker): + def is_closed(self): # Fail always. return False class Disabled(CircuitBreakerState): - def is_closed(self, breaker): + def is_closed(self): # Pass through always. return True @@ -134,13 +136,66 @@ def status_code(self): class FailureCounter(object): - def count(self, failure): + def count_failure(self, failure): raise NotImplementedError - def reset(self): + def count_success(self): + raise NotImplementedError + + def is_above_threshold(self): + raise NotImplementedError + + def is_below_threshold(self): raise NotImplementedError +class BasicFailureCounter(FailureCounter): + def __init__(self, decrement_every, buffer, failure_threshold, clock=now): + self._window = decrement_every + self._failure_threshold = failure_threshold + self._failure_count = 0 + self._num_rounds = 0 + self._buffer = buffer + self._diff = 0 + self._last_window = clock() + self._clock = clock + + def decrement(self, n=1): + assert n >= 0 + self._failure_count -= n + self._num_rounds += 1 + + def increment(self, n=1): + self._failure_count += n + self._num_rounds += 1 + + def update(self): + now_ = self._clock() + windows_elapsed = int(max(now_ - self._last_window, 0) / self._window) + self.decrement(windows_elapsed) + self._last_window += self._window * windows_elapsed + + def count_success(self): + self.update() + self.decrement() + + def count_failure(self, failure): + self.update() + self.increment() + + def is_above_threshold(self): + return ( + self._num_rounds >= self._buffer + and self._failure_count > self._failure_threshold + ) + + def is_below_threshold(self): + return ( + self._num_rounds >= self._buffer + and self._failure_count < self._failure_threshold + ) + + class CircuitBreaker(object): def reset(self): raise NotImplementedError @@ -176,14 +231,16 @@ def state(self): class BasicCircuitBreaker(CircuitBreaker): - def __init__(self, failure_counter_factory, timeout): + def __init__(self, failure_counter_factory, timeout_factory): + # TODO: Find more appropriate names for these arguments self._failure_counter_factory = failure_counter_factory - self._timeout = timeout + self._timeout_factory = timeout_factory + self._timeout_generator = None self._state = None self.reset() def reset(self): - self._state = Closed(self._failure_counter_factory()) + self._state = Closed(self._failure_counter_factory.for_closed_state()) def force_open(self): self._state = ForceOpened() @@ -192,10 +249,18 @@ def disable(self): self._state = Disabled() def attempt_reset(self): - self._state = HalfOpen(self._failure_counter_factory()) + self._state = HalfOpen( + self._failure_counter_factory.for_half_open_state() + ) def trip(self): - self._state = Open(self._timeout, clock=now) + # Don't reset timeout if the breaker is being opened from the half-open state. + if self._timeout_generator is None or not isinstance( + self._state, HalfOpen + ): + self._timeout_generator = self._timeout_factory() + + self._state = Open(next(self._timeout_generator), clock=now) def on_success(self, request, response): self._state.on_success(self) @@ -360,26 +425,26 @@ def from_exception(self, exception): class CircuitRequestTemplate(RequestTemplate): - def __init__(self, circuit_breaker, fallback, monitor, failure_factory): - self._circuit_breaker = circuit_breaker + def __init__(self, breaker, fallback, monitor, failure_factory): + self._breaker = breaker self._fallback = fallback self._monitor = monitor self._failure_factory = failure_factory def before_request(self, request): - self._circuit_breaker.update() + self._breaker.update() - if not self._circuit_breaker.closed: + if not self._breaker.closed: self._monitor.on_request_not_permitted(request) if not callable(self._fallback): - raise CircuitBreakerOpen() + raise CircuitBreakerOpenException() # Short-circuit. return transitions.finish(self._fallback(request)) def _handle_failure(self, request, failure): - self._circuit_breaker.on_failure(request, failure) + self._breaker.on_failure(request, failure) if callable(self._fallback): return transitions.finish(self._fallback(request)) @@ -387,7 +452,7 @@ def _handle_failure(self, request, failure): def after_response(self, request, response): failure = self._failure_factory.from_response(response) if failure is None: - self._circuit_breaker.on_success(request, response) + self._breaker.on_success(request, response) else: return self._handle_failure(request, failure) From 0749b73ec6fd8cd01edfaa0842325c082b461e72 Mon Sep 17 00:00:00 2001 From: "P. Raj Kumar" Date: Wed, 6 Feb 2019 22:52:26 -0800 Subject: [PATCH 09/10] Implement leak bucket and min calls --- uplink/circuit_breaker.py | 99 +++++++++++++++++++++++++++------------ 1 file changed, 69 insertions(+), 30 deletions(-) diff --git a/uplink/circuit_breaker.py b/uplink/circuit_breaker.py index ed604f31..05c11715 100644 --- a/uplink/circuit_breaker.py +++ b/uplink/circuit_breaker.py @@ -149,51 +149,89 @@ def is_below_threshold(self): raise NotImplementedError -class BasicFailureCounter(FailureCounter): - def __init__(self, decrement_every, buffer, failure_threshold, clock=now): - self._window = decrement_every - self._failure_threshold = failure_threshold +class IncrementalFailingCounter(FailureCounter): + def count_failure(self, failure): + self.increment() + + def count_success(self): + self.decrement() + + def decrement(self, n=1): + raise NotImplementedError + + def increment(self, n=1): + raise NotImplementedError + + +class SimpleFailureCounter(IncrementalFailingCounter): + def __init__(self, threshold): + self._threshold = threshold self._failure_count = 0 - self._num_rounds = 0 - self._buffer = buffer - self._diff = 0 - self._last_window = clock() - self._clock = clock + + def is_above_threshold(self): + return self._failure_count > self._threshold + + def is_below_threshold(self): + return not self.is_above_threshold() def decrement(self, n=1): - assert n >= 0 - self._failure_count -= n - self._num_rounds += 1 + self._failure_count = max(self._failure_count - n, 0) def increment(self, n=1): self._failure_count += n - self._num_rounds += 1 - def update(self): + +class LeakyBucket(FailureCounter): + def __init__(self, rate, counter, clock=now): + self._rate = rate + self._counter = counter + self._clock = clock + self._last_leak = clock() + + def leak(self): now_ = self._clock() - windows_elapsed = int(max(now_ - self._last_window, 0) / self._window) - self.decrement(windows_elapsed) - self._last_window += self._window * windows_elapsed + elapsed = now_ - self._last_leak + amount = elapsed * self._rate + self._counter.decrement(int(amount)) + self._last_leak = now_ - ((amount - int(amount)) / self._rate) def count_success(self): - self.update() - self.decrement() + self.leak() + self._counter.count_success() def count_failure(self, failure): - self.update() - self.increment() + self.leak() + self._counter.count_failure(failure) def is_above_threshold(self): - return ( - self._num_rounds >= self._buffer - and self._failure_count > self._failure_threshold - ) + return self._counter.is_above_threshold() def is_below_threshold(self): - return ( - self._num_rounds >= self._buffer - and self._failure_count < self._failure_threshold - ) + return self._counter.is_below_threshold() + + +class MinimumCalls(FailureCounter): + def __init__(self, counter, min_calls): + self._counter = counter + self._min_calls = min_calls + self._call_count = 0 + + def count_success(self): + self._counter.count_success() + self._call_count += 1 + + def count_failure(self, failure): + self._counter.count_failure(failure) + self._call_count += 1 + + def meets_minimum_calls(self): + return self._call_count >= self._min_calls + + def is_above_threshold(self): + return self.meets_minimum_calls() and self._counter.is_above_threshold() + + def is_below_threshold(self): + return self.meets_minimum_calls() and self._counter.is_below_threshold() class CircuitBreaker(object): @@ -254,7 +292,8 @@ def attempt_reset(self): ) def trip(self): - # Don't reset timeout if the breaker is being opened from the half-open state. + # Don't reset timeout if the breaker is being opened from the + # half-open state. if self._timeout_generator is None or not isinstance( self._state, HalfOpen ): From ac46aedd4adad5bd85ec3f9c64c3f0a89947f121 Mon Sep 17 00:00:00 2001 From: "P. Raj Kumar" Date: Tue, 2 Apr 2019 21:09:05 -0700 Subject: [PATCH 10/10] Update IncrementalFailingCounter --- uplink/circuit_breaker.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/uplink/circuit_breaker.py b/uplink/circuit_breaker.py index 05c11715..439b89f9 100644 --- a/uplink/circuit_breaker.py +++ b/uplink/circuit_breaker.py @@ -150,6 +150,12 @@ def is_below_threshold(self): class IncrementalFailingCounter(FailureCounter): + def is_above_threshold(self): + raise NotImplementedError + + def is_below_threshold(self): + raise NotImplementedError + def count_failure(self, failure): self.increment()