Skip to content

Commit

Permalink
DVX-565: Added find_run_by_id method to the WorkflowClient
Browse files Browse the repository at this point in the history
  • Loading branch information
Aryamanz29 committed Aug 28, 2024
1 parent c6fe307 commit d84017e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 1 deletion.
23 changes: 22 additions & 1 deletion pyatlan/client/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,27 @@ def find_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
response = WorkflowSearchResponse(**raw_json)
return results[0] if (results := response.hits.hits) else None

@validate_arguments
def find_run_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
"""
Find workflow run based on their ID
(e.g: `atlan-snowflake-miner-1714638976-mzdza`)
:param id: identifier of the specific workflow run to find
:returns: singular result containing the searched workflow run or `None` if not found
:raises AtlanError: on any API communication issue
"""
query = Bool(
filter=[
Term(
field="_id",
value=id,
),
]
)
response = self._find_runs(query, size=1)
return results[0] if (results := response.hits.hits) else None

@validate_arguments
def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]:
"""
Expand Down Expand Up @@ -417,7 +438,7 @@ def get_runs(
],
filter=[Term(field="status.phase.keyword", value=workflow_phase.value)],
)
response = self._find_runs(query)
response = self._find_runs(query, from_=from_, size=size)
return results if (results := response.hits.hits) else None

@validate_arguments
Expand Down
9 changes: 9 additions & 0 deletions tests/integration/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ def test_workflow_get_runs_and_stop(client: AtlanClient, workflow: WorkflowRespo
workflow_status = client.workflow.monitor(workflow_response=workflow)
assert workflow_status == AtlanWorkflowPhase.FAILED

# Test find run by id
workflow_run = client.workflow.find_run_by_id(id=run.id)
assert (
workflow_run
and workflow_run.source
and workflow_run.source.status
and workflow_run.source.status.phase == AtlanWorkflowPhase.FAILED
)


def test_workflow_get_all_scheduled_runs(
client: AtlanClient, workflow: WorkflowResponse
Expand Down
19 changes: 19 additions & 0 deletions tests/unit/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pyatlan.client.constants import (
SCHEDULE_QUERY_WORKFLOWS_MISSED,
SCHEDULE_QUERY_WORKFLOWS_SEARCH,
WORKFLOW_INDEX_RUN_SEARCH,
WORKFLOW_INDEX_SEARCH,
)
from pyatlan.client.workflow import WorkflowClient
Expand Down Expand Up @@ -211,6 +212,24 @@ def test_find_by_id(
)


def test_find_run_by_id(
client: WorkflowClient, search_response: WorkflowSearchResponse, mock_api_caller
):
raw_json = search_response.dict()
mock_api_caller._call_api.return_value = raw_json

assert search_response.hits.hits
assert (
client.find_run_by_id(id="atlan-snowflake-miner-1714638976-mzdza")
== search_response.hits.hits[0]
)
mock_api_caller._call_api.called_once()
assert mock_api_caller._call_api.call_args.args[0] == WORKFLOW_INDEX_RUN_SEARCH
assert isinstance(
mock_api_caller._call_api.call_args.kwargs["request_obj"], WorkflowSearchRequest
)


def test_re_run_when_given_workflowpackage_with_no_prior_runs_raises_invalid_request_error(
client: WorkflowClient, mock_api_caller
):
Expand Down

0 comments on commit d84017e

Please sign in to comment.