Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DVX-565: Added find_run_by_id method to the WorkflowClient #374

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion pyatlan/client/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,27 @@ def find_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
response = WorkflowSearchResponse(**raw_json)
return results[0] if (results := response.hits.hits) else None

@validate_arguments
def find_run_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
"""
Find workflow run based on their ID
(e.g: `atlan-snowflake-miner-1714638976-mzdza`)

:param id: identifier of the specific workflow run to find
:returns: singular result containing the searched workflow run or `None` if not found
:raises AtlanError: on any API communication issue
"""
query = Bool(
filter=[
Term(
field="_id",
value=id,
),
]
)
response = self._find_runs(query, size=1)
return results[0] if (results := response.hits.hits) else None

@validate_arguments
def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]:
"""
Expand Down Expand Up @@ -417,7 +438,7 @@ def get_runs(
],
filter=[Term(field="status.phase.keyword", value=workflow_phase.value)],
)
response = self._find_runs(query)
response = self._find_runs(query, from_=from_, size=size)
return results if (results := response.hits.hits) else None

@validate_arguments
Expand Down
9 changes: 9 additions & 0 deletions tests/integration/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ def test_workflow_get_runs_and_stop(client: AtlanClient, workflow: WorkflowRespo
workflow_status = client.workflow.monitor(workflow_response=workflow)
assert workflow_status == AtlanWorkflowPhase.FAILED

# Test find run by id
workflow_run = client.workflow.find_run_by_id(id=run.id)
assert (
workflow_run
and workflow_run.source
and workflow_run.source.status
and workflow_run.source.status.phase == AtlanWorkflowPhase.FAILED
)


def test_workflow_get_all_scheduled_runs(
client: AtlanClient, workflow: WorkflowResponse
Expand Down
19 changes: 19 additions & 0 deletions tests/unit/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pyatlan.client.constants import (
SCHEDULE_QUERY_WORKFLOWS_MISSED,
SCHEDULE_QUERY_WORKFLOWS_SEARCH,
WORKFLOW_INDEX_RUN_SEARCH,
WORKFLOW_INDEX_SEARCH,
)
from pyatlan.client.workflow import WorkflowClient
Expand Down Expand Up @@ -211,6 +212,24 @@ def test_find_by_id(
)


def test_find_run_by_id(
client: WorkflowClient, search_response: WorkflowSearchResponse, mock_api_caller
):
raw_json = search_response.dict()
mock_api_caller._call_api.return_value = raw_json

assert search_response.hits.hits
assert (
client.find_run_by_id(id="atlan-snowflake-miner-1714638976-mzdza")
== search_response.hits.hits[0]
)
mock_api_caller._call_api.called_once()
assert mock_api_caller._call_api.call_args.args[0] == WORKFLOW_INDEX_RUN_SEARCH
assert isinstance(
mock_api_caller._call_api.call_args.kwargs["request_obj"], WorkflowSearchRequest
)


def test_re_run_when_given_workflowpackage_with_no_prior_runs_raises_invalid_request_error(
client: WorkflowClient, mock_api_caller
):
Expand Down
Loading