Skip to content

added extend lease thread capabilities #278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 109 additions & 1 deletion src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import sys
import threading
import time
import traceback

Expand All @@ -11,7 +12,7 @@
from conductor.client.http.models.task import Task
from conductor.client.http.models.task_exec_log import TaskExecLog
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.rest import AuthorizationException
from conductor.client.http.rest import ApiException, AuthorizationException
from conductor.client.telemetry.metrics_collector import MetricsCollector
from conductor.client.worker.worker_interface import WorkerInterface

Expand Down Expand Up @@ -123,10 +124,21 @@ def __execute_task(self, task: Task) -> TaskResult:
task_definition_name=task_definition_name
)
)

extend_lease_stop_event : threading.Event | None = None

try:
if self.worker.extend_lease_interval > 0:
extend_lease_stop_event = threading.Event()
self.__execute_extend_lease(task, task_definition_name, extend_lease_stop_event)

start_time = time.time()
task_result = self.worker.execute(task)
finish_time = time.time()

if extend_lease_stop_event is not None:
extend_lease_stop_event.set()

time_spent = finish_time - start_time
if self.metrics_collector is not None:
self.metrics_collector.record_task_execute_time(
Expand All @@ -145,6 +157,9 @@ def __execute_task(self, task: Task) -> TaskResult:
)
)
except Exception as e:
if extend_lease_stop_event is not None:
extend_lease_stop_event.set()

if self.metrics_collector is not None:
self.metrics_collector.increment_task_execution_error(
task_definition_name, type(e)
Expand Down Expand Up @@ -255,3 +270,96 @@ def __get_property_value_from_env(self, prop, task_type):
key_upper = prefix.upper() + "_" + task_type + "_" + prop.upper()
value = os.getenv(key_small, os.getenv(key_upper, value_all))
return value

def __execute_extend_lease(self, task: Task, task_definition_name: str, stop_event: threading.Event):
interval = self.worker.extend_lease_interval

task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=self.worker.get_identity(),
extend_lease=True
)

def extend_lease_target():
logger.debug(
'Start Extend lease for task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
task_definition_name=task_definition_name
))

while not stop_event.is_set():
stop_event.wait(interval)

for attempt in range(4):
if stop_event.is_set():
break

if attempt > 0:
# Wait for [10s, 20s, 30s] before next attempt
time.sleep(attempt * 10)

if stop_event.is_set():
break

logger.debug(
'Sending Extend lease for task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format(
task_id=task_result.task_id,
workflow_instance_id=task_result.workflow_instance_id,
task_definition_name=task_definition_name
)
)

try:
response = self.task_client.update_task(body=task_result)

logger.debug(
'Extend Lease for task sent, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format(
task_id=task_result.task_id,
workflow_instance_id=task_result.workflow_instance_id,
task_definition_name=task_definition_name,
response=response
)
)

break
except Exception as e:
if self.metrics_collector is not None:
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)

if isinstance(e, ApiException):
if e.status == 404:
logger.debug(
'Extend Lease stopping because received a 404 response for, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format(
task_id=task_result.task_id,
workflow_instance_id=task_result.workflow_instance_id,
task_definition_name=task_definition_name,
response=response
)
)
break

logger.error(
'Failed to extend task lease, id: {task_id}, workflow_instance_id: {workflow_instance_id}, '
'task_definition_name: {task_definition_name}, reason: {reason}'.format(
task_id=task_result.task_id,
workflow_instance_id=task_result.workflow_instance_id,
task_definition_name=task_definition_name,
reason=traceback.format_exc()
)
)

logger.debug(
'Extend lease for task ended, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
task_definition_name=task_definition_name
))



thread = threading.Thread(target=extend_lease_target)
thread.start()
23 changes: 20 additions & 3 deletions src/conductor/client/http/models/task_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class TaskResult(object):
'output_data': 'dict(str, object)',
'logs': 'list[TaskExecLog]',
'external_output_payload_storage_path': 'str',
'sub_workflow_id': 'str'
'sub_workflow_id': 'str',
'extend_lease': 'bool'
}

attribute_map = {
Expand All @@ -41,12 +42,13 @@ class TaskResult(object):
'output_data': 'outputData',
'logs': 'logs',
'external_output_payload_storage_path': 'externalOutputPayloadStoragePath',
'sub_workflow_id': 'subWorkflowId'
'sub_workflow_id': 'subWorkflowId',
'extend_lease': 'extendLease'
}

def __init__(self, workflow_instance_id=None, task_id=None, reason_for_incompletion=None,
callback_after_seconds=None, worker_id=None, status=None, output_data=None, logs=None,
external_output_payload_storage_path=None, sub_workflow_id=None): # noqa: E501
external_output_payload_storage_path=None, sub_workflow_id=None, extend_lease=None): # noqa: E501
"""TaskResult - a model defined in Swagger""" # noqa: E501
self._workflow_instance_id = None
self._task_id = None
Expand Down Expand Up @@ -77,6 +79,8 @@ def __init__(self, workflow_instance_id=None, task_id=None, reason_for_incomplet
self.external_output_payload_storage_path = external_output_payload_storage_path
if sub_workflow_id is not None:
self.sub_workflow_id = sub_workflow_id
if extend_lease is not None:
self.extend_lease = extend_lease

@property
def workflow_instance_id(self):
Expand Down Expand Up @@ -294,6 +298,19 @@ def sub_workflow_id(self, sub_workflow_id):

self._sub_workflow_id = sub_workflow_id

@property
def extend_lease(self):
return self._extend_lease

@extend_lease.setter
def extend_lease(self, extend_lease):
"""Sets the extend_lease of this TaskResult.
:param extend_lease: The extend_lease of this TaskResult. # noqa: E501
:type: bool
"""

self._extend_lease = extend_lease

def to_dict(self):
"""Returns the model properties as a dict"""
result = {}
Expand Down
3 changes: 3 additions & 0 deletions src/conductor/client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self,
poll_interval: float = None,
domain: str = None,
worker_id: str = None,
extend_lease_interval: float = None
) -> Self:
super().__init__(task_definition_name)
self.api_client = ApiClient()
Expand All @@ -66,6 +67,8 @@ def __init__(self,
else:
self.worker_id = deepcopy(worker_id)
self.execute_function = deepcopy(execute_function)
if extend_lease_interval is None:
self.extend_lease_interval = 0

def execute(self, task: Task) -> TaskResult:
task_input = {}
Expand Down
9 changes: 9 additions & 0 deletions src/conductor/client/worker/worker_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self, task_definition_name: Union[str, list]):
self._task_definition_name_cache = None
self._domain = None
self._poll_interval = DEFAULT_POLLING_INTERVAL
self._extend_lease_interval = 0

@abc.abstractmethod
def execute(self, task: Task) -> TaskResult:
Expand Down Expand Up @@ -117,3 +118,11 @@ def poll_interval(self):
@poll_interval.setter
def poll_interval(self, value):
self._poll_interval = value

@property
def extend_lease_interval(self):
return self._extend_lease_interval

@extend_lease_interval.setter
def extend_lease_interval(self, value):
self._extend_lease_interval = value
Loading