Skip to content

Commit

Permalink
block-reconciliation: handlers need to be blocked by a reconciliation…
Browse files Browse the repository at this point in the history
… that's in progress (#136)

* block-reconciliation: handlers need to be blocked by a reconciliation
that's in progress

* block-reconciliation: add block definition

* block-reconciliation: add better logging

* block-reconciliation: no need for duplicated calls
  • Loading branch information
emmeowzing authored May 30, 2024
1 parent a21b808 commit c0b1024
Showing 1 changed file with 38 additions and 7 deletions.
45 changes: 38 additions & 7 deletions src/passoperator/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from http import HTTPStatus
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from time import sleep

from passoperator.git import pull, clone
from passoperator.utils import LogLevel
Expand Down Expand Up @@ -38,6 +39,7 @@ def _passsecret_block(body: kopf.Body) -> None:
Args:
body [kopf.Body]: raw body of the PassSecret.
"""
log.debug(f'Blocking PassSecret "{body["metadata"]["name"]}" in namespace "{body["metadata"]["namespace"]}')
__in_progress_queue.append(
(
body['metadata']['name'],
Expand All @@ -62,13 +64,31 @@ def _is_passsecret_blocked(body: kopf.Body) -> bool:
) in __in_progress_queue


def _block_passsecret_block(body: kopf.Body) -> None:
"""
Block handlers' progress on a PassSecret until it's safe to modify the managed secret.
This should only ever be called by event handlers, not by the reconciliation loop.
Args:
body (kopf.Body): raw body of the PassSecret.
"""
while True:
if not _is_passsecret_blocked(body):
_passsecret_block(body)
break
else:
log.debug(f'PassSecret "{body["metadata"]["name"]}" in namespace "{body["metadata"]["namespace"]}" is already blocked.')
sleep(0.5)


def _lift_passsecret_block(body: kopf.Body) -> None:
"""
Unblock handlers' progress to modify the managed secret.
Args:
body [kopf.Body]: raw body of the PassSecret.
"""
log.debug(f'Lifting block on PassSecret "{body["metadata"]["name"]}" in namespace "{body["metadata"]["namespace"]}"')
__in_progress_queue.remove(
(
body['metadata']['name'],
Expand Down Expand Up @@ -119,6 +139,7 @@ def reconciliation(body: kopf.Body, **_: Any) -> None:
v1 = client.CoreV1Api()

# Create a new PassSecret object with an up-to-date managedSecret decrypted value from the pass store.
_passsecret_block(body)
passSecretObj = PassSecret.from_kopf(body)

log.info(
Expand All @@ -138,6 +159,7 @@ def reconciliation(body: kopf.Body, **_: Any) -> None:
# submit a patch request to update it.
if not _managedSecret.data_equals(passSecretObj.spec.managedSecret):
if _managedSecret.immutable:
_lift_passsecret_block(body)
raise kopf.TemporaryError(
f'PassSecret "{passSecretObj.metadata.name}" managed secret "{passSecretObj.spec.managedSecret.metadata.name}" is immutable. Ignoring data patch.'
)
Expand All @@ -164,7 +186,10 @@ def reconciliation(body: kopf.Body, **_: Any) -> None:
)
)
else:
_lift_passsecret_block(body)
raise kopf.PermanentError(e)
finally:
_lift_passsecret_block(body)


# @kopf.on.cleanup()
Expand Down Expand Up @@ -225,7 +250,9 @@ def update(old: kopf.BodyEssence | Any, new: kopf.BodyEssence | Any, meta: kopf.
}
}

_passsecret_block(body)
# If a reconciliation is already in progress for the triggered PassSecret, block this event handler
# until it's safe to modify the managed secret.
_block_passsecret_block(body)

# Parse the old PassSecret manifest.
try:
Expand Down Expand Up @@ -295,10 +322,12 @@ def create(body: kopf.Body, **_: Any) -> None:
Args:
body [kopf.Body]: raw body of the created PassSecret.
"""
try:
# Indicate to the reconciliation loop that this PassSecret is in progress.
_passsecret_block(body)

# If a reconciliation is already in progress for the triggered PassSecret, block this event handler
# until it's safe to modify the managed secret.
_block_passsecret_block(body)

try:
passSecretObj = PassSecret.from_kopf(body)
except (ValueError, KeyError) as e:
_lift_passsecret_block(body)
Expand Down Expand Up @@ -336,10 +365,12 @@ def delete(body: kopf.Body, **_: Any) -> None:
Args:
body [kopf.Body]: raw body of the deleted PassSecret.
"""
try:
# Indicate to the reconciliation loop that this PassSecret is in progress.
_passsecret_block(body)

# If a reconciliation is already in progress for the triggered PassSecret, block this event handler
# until it's safe to modify the managed secret.
_block_passsecret_block(body)

try:
passSecretObj = PassSecret.from_kopf(body)
except (ValueError, KeyError) as e:
_lift_passsecret_block(body)
Expand Down

0 comments on commit c0b1024

Please sign in to comment.