Skip to content

Commit

Permalink
feat(docworker): Improve retry mechanism for document generation
Browse files Browse the repository at this point in the history
  • Loading branch information
MarekSuchanek committed Dec 20, 2024
1 parent f29f5a2 commit 788f9dc
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 10 deletions.
4 changes: 2 additions & 2 deletions packages/dsw-command-queue/dsw/command_queue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .command_queue import CommandQueue, CommandWorker
from .command_queue import CommandJobError, CommandQueue, CommandWorker

__all__ = ['CommandQueue', 'CommandWorker']
__all__ = ['CommandJobError', 'CommandQueue', 'CommandWorker']
60 changes: 59 additions & 1 deletion packages/dsw-command-queue/dsw/command_queue/command_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,45 @@ def signal_handler(recv_signal, frame):
signal.signal(signal.SIGABRT, signal_handler)


class CommandJobError(BaseException):

def __init__(self, job_id: str, message: str, try_again: bool,
exc: BaseException | None = None):
self.job_id = job_id
self.message = message
self.try_again = try_again
self.exc = exc
super().__init__(message)

def __str__(self):
return self.message

def log_message(self):
if self.exc is None:
return self.message
else:
return f'{self.message} (caused by: [{type(self.exc).__name__}] {str(self.exc)})'

def db_message(self):
if self.exc is None:
return self.message
return f'{self.message}\n\n' \
f'Caused by: {type(self.exc).__name__}\n' \
f'{str(self.exc)}'

@staticmethod
def create(job_id: str, message: str, try_again: bool = True,
exc: BaseException | None = None):
if isinstance(exc, CommandJobError):
return exc
return CommandJobError(
job_id=job_id,
message=message,
try_again=try_again,
exc=exc,
)


class CommandWorker:

@abc.abstractmethod
Expand Down Expand Up @@ -190,8 +229,27 @@ def work():
updated_at=datetime.datetime.now(tz=datetime.UTC),
uuid=command.uuid,
)
except CommandJobError as e:
if e.try_again and attempt_number < command.max_attempts:
query = self.queries.query_command_error()
msg = f'Failed with job error: {e.message} (will try again)'
else:
query = self.queries.query_command_error_stop()
msg = f'Failed with job error: {e.message}'
LOG.warning(msg)
self.worker.process_exception(e)
self.db.execute_query(
query=query,
attempts=attempt_number,
error_message=msg,
updated_at=datetime.datetime.now(tz=datetime.UTC),
uuid=command.uuid,
)
except Exception as e:
msg = f'Failed with exception: {str(e)} ({type(e).__name__})'
if attempt_number < command.max_attempts:
msg = f'Failed with exception [{type(e).__name__}]: {str(e)} (will try again)'
else:
msg = f'Failed with exception [{type(e).__name__}]: {str(e)}'
LOG.warning(msg)
self.worker.process_exception(e)
self.db.execute_query(
Expand Down
12 changes: 12 additions & 0 deletions packages/dsw-command-queue/dsw/command_queue/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ def query_command_error() -> str:
WHERE uuid = %(uuid)s;
"""

@staticmethod
def query_command_error_stop() -> str:
return f"""
UPDATE persistent_command
SET attempts = %(attempts)s,
max_attempts = %(attempts)s,
last_error_message = %(error_message)s,
state = '{CommandState.ERROR}',
updated_at = %(updated_at)s
WHERE uuid = %(uuid)s;
"""

@staticmethod
def query_command_done() -> str:
return f"""
Expand Down
16 changes: 9 additions & 7 deletions packages/dsw-document-worker/dsw/document_worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ def _enrich_context(self):
extras['questionnaire'] = questionnaire.to_dict()
self.doc_context['extras'] = extras

def check_compliance(self):
metamodel_version = int(self.doc_context.get('metamodelVersion', '0'))
if metamodel_version != CURRENT_METAMODEL:
LOG.error('Command with metamodel version %d is not supported '
'by this worker (version %d)', metamodel_version, CURRENT_METAMODEL)
raise RuntimeError(f'Unsupported metamodel version: {metamodel_version} '
f'(expected {CURRENT_METAMODEL})')

@handle_job_step('Failed to build final document')
def build_document(self):
LOG.info('Building document by rendering template with context')
Expand Down Expand Up @@ -231,6 +239,7 @@ def try_set_job_state(self, state: str, message: str) -> bool:
return False

def _run(self):
self.check_compliance()
self.get_document()

self.prepare_template()
Expand Down Expand Up @@ -343,14 +352,7 @@ def run_once(self):
queue.run_once()

def work(self, cmd: PersistentCommand):
metamodel_version = int(cmd.body.get('metamodelVersion', '0'))
if metamodel_version != CURRENT_METAMODEL:
LOG.error('Command with metamodel version %d is not supported '
'by this worker (version %d)', metamodel_version, CURRENT_METAMODEL)
raise RuntimeError(f'Unsupported metamodel version: {metamodel_version} '
f'(expected {CURRENT_METAMODEL})')
document_uuid = cmd.body['document']['uuid']

Context.get().update_trace_id(cmd.uuid)
Context.get().update_document_id(document_uuid)
SentryReporter.set_context('cmd_uuid', cmd.uuid)
Expand Down

0 comments on commit 788f9dc

Please sign in to comment.