From c0b102442caf23ad7f1748ca509de0c081ea3c29 Mon Sep 17 00:00:00 2001 From: Emma Doyle Date: Thu, 30 May 2024 16:14:32 -0400 Subject: [PATCH] block-reconciliation: handlers need to be blocked by a reconciliation that's in progress (#136) * block-reconciliation: handlers need to be blocked by a reconciliation that's in progress * block-reconciliation: add block definition * block-reconciliation: add better logging * block-reconciliation: no need for duplicated calls --- src/passoperator/daemon.py | 45 ++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/src/passoperator/daemon.py b/src/passoperator/daemon.py index 87098d3..3dda626 100644 --- a/src/passoperator/daemon.py +++ b/src/passoperator/daemon.py @@ -11,6 +11,7 @@ from http import HTTPStatus from concurrent.futures import ThreadPoolExecutor from functools import partial +from time import sleep from passoperator.git import pull, clone from passoperator.utils import LogLevel @@ -38,6 +39,7 @@ def _passsecret_block(body: kopf.Body) -> None: Args: body [kopf.Body]: raw body of the PassSecret. """ + log.debug(f'Blocking PassSecret "{body["metadata"]["name"]}" in namespace "{body["metadata"]["namespace"]}') __in_progress_queue.append( ( body['metadata']['name'], @@ -62,6 +64,23 @@ def _is_passsecret_blocked(body: kopf.Body) -> bool: ) in __in_progress_queue +def _block_passsecret_block(body: kopf.Body) -> None: + """ + Block handlers' progress on a PassSecret until it's safe to modify the managed secret. + This should only ever be called by event handlers, not by the reconciliation loop. + + Args: + body (kopf.Body): raw body of the PassSecret. + """ + while True: + if not _is_passsecret_blocked(body): + _passsecret_block(body) + break + else: + log.debug(f'PassSecret "{body["metadata"]["name"]}" in namespace "{body["metadata"]["namespace"]}" is already blocked.') + sleep(0.5) + + def _lift_passsecret_block(body: kopf.Body) -> None: """ Unblock handlers' progress to modify the managed secret. @@ -69,6 +88,7 @@ def _lift_passsecret_block(body: kopf.Body) -> None: Args: body [kopf.Body]: raw body of the PassSecret. """ + log.debug(f'Lifting block on PassSecret "{body["metadata"]["name"]}" in namespace "{body["metadata"]["namespace"]}"') __in_progress_queue.remove( ( body['metadata']['name'], @@ -119,6 +139,7 @@ def reconciliation(body: kopf.Body, **_: Any) -> None: v1 = client.CoreV1Api() # Create a new PassSecret object with an up-to-date managedSecret decrypted value from the pass store. + _passsecret_block(body) passSecretObj = PassSecret.from_kopf(body) log.info( @@ -138,6 +159,7 @@ def reconciliation(body: kopf.Body, **_: Any) -> None: # submit a patch request to update it. if not _managedSecret.data_equals(passSecretObj.spec.managedSecret): if _managedSecret.immutable: + _lift_passsecret_block(body) raise kopf.TemporaryError( f'PassSecret "{passSecretObj.metadata.name}" managed secret "{passSecretObj.spec.managedSecret.metadata.name}" is immutable. Ignoring data patch.' ) @@ -164,7 +186,10 @@ def reconciliation(body: kopf.Body, **_: Any) -> None: ) ) else: + _lift_passsecret_block(body) raise kopf.PermanentError(e) + finally: + _lift_passsecret_block(body) # @kopf.on.cleanup() @@ -225,7 +250,9 @@ def update(old: kopf.BodyEssence | Any, new: kopf.BodyEssence | Any, meta: kopf. } } - _passsecret_block(body) + # If a reconciliation is already in progress for the triggered PassSecret, block this event handler + # until it's safe to modify the managed secret. + _block_passsecret_block(body) # Parse the old PassSecret manifest. try: @@ -295,10 +322,12 @@ def create(body: kopf.Body, **_: Any) -> None: Args: body [kopf.Body]: raw body of the created PassSecret. """ - try: - # Indicate to the reconciliation loop that this PassSecret is in progress. - _passsecret_block(body) + # If a reconciliation is already in progress for the triggered PassSecret, block this event handler + # until it's safe to modify the managed secret. + _block_passsecret_block(body) + + try: passSecretObj = PassSecret.from_kopf(body) except (ValueError, KeyError) as e: _lift_passsecret_block(body) @@ -336,10 +365,12 @@ def delete(body: kopf.Body, **_: Any) -> None: Args: body [kopf.Body]: raw body of the deleted PassSecret. """ - try: - # Indicate to the reconciliation loop that this PassSecret is in progress. - _passsecret_block(body) + # If a reconciliation is already in progress for the triggered PassSecret, block this event handler + # until it's safe to modify the managed secret. + _block_passsecret_block(body) + + try: passSecretObj = PassSecret.from_kopf(body) except (ValueError, KeyError) as e: _lift_passsecret_block(body)