Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue/443/some-love-for-simpleflow-cli #444

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
def increment(x):
# Here's how you can access the raw context of the activity task if you need
# it. It gives you access to the response of the PollForActivityTask call to
# the SWF API. See docs for more info: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_PollForActivityTask.html#API_PollForActivityTask_ResponseSyntax # NOQA
# the SWF API. See docs for more info: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_PollForActivityTask.html#API_PollForActivityTask_ResponseSyntax
logger.warning(f"activity context: {increment.context}")
return x + 1

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ extend-select = [
"W", # pycodestyle warnings
"RUF", # ruff
]
fixable = ["ALL"]
allowed-confusables = ["‘", "’"]
extend-ignore = [
]
ignore = ["E501"]

[tool.ruff.lint.isort]
required-imports = ["from __future__ import annotations"]

Expand Down
2 changes: 1 addition & 1 deletion simpleflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from .activity import Activity # NOQA
from .runtime import logger # NOQA
from .signal import WaitForSignal # NOQA
from .simpleflow_signal import WaitForSignal # NOQA
from .workflow import Workflow # NOQA

__version__ = "0.34.0"
Expand Down
47 changes: 40 additions & 7 deletions simpleflow/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ def terminate_workflow(
run_id: str | None,
):
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
if not ex:
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
sys.exit(1)
ex.terminate()


Expand All @@ -251,6 +254,9 @@ def terminate_workflow(
)
def restart_workflow(domain: str, workflow_id: str, run_id: str | None):
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
if not ex:
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
sys.exit(1)
history = ex.history()
ex.terminate(reason="workflow.restart")
new_ex = ex.workflow_type.start_execution(
Expand Down Expand Up @@ -315,6 +321,7 @@ def profile(ctx, domain, workflow_id, run_id, nb_tasks):
)


# FIXME superseded by history
@click.option(
"--nb-tasks",
"-n",
Expand Down Expand Up @@ -347,6 +354,7 @@ def workflow_tasks(
)


# FIXME superseded by filter
@click.argument(
"domain",
envvar="SWF_DOMAIN",
Expand All @@ -373,16 +381,16 @@ def list_workflows(ctx, domain: str, status: str, started_since: int):
_NOTSET = object()


@click.argument(
"domain",
envvar="SWF_DOMAIN",
)
@cli.command(
"workflow.history",
help="Workflow history from workflow WORKFLOW_ID [RUN_ID].",
)
@click.argument("workflow_id")
@click.argument("run_id", type=RUN_ID, required=False)
@click.option(
"--domain",
envvar="SWF_DOMAIN",
)
@click.option(
"--output-format",
"--of",
Expand All @@ -402,12 +410,15 @@ def workflow_history(
output_format: str,
reverse_order: bool = False,
) -> None:
from simpleflow.swf.mapper.models.history.base import History as BaseHistory

if ctx.parent.params["format"] != "json" or not ctx.parent.params["header"]:
raise NotImplementedError("Only pretty JSON mode is implemented")

from simpleflow.swf.mapper.models.history.base import History as BaseHistory

ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
if not ex:
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
sys.exit(1)
events = ex.history_events(
callback=get_progression_callback("events"),
reverse_order=reverse_order,
Expand All @@ -419,7 +430,7 @@ def workflow_history(
history = History(raw_history)
if output_format == "raw":
events = []
for event in history.events[:10]:
for event in history.events:
e = {}
for k in ["id", "type", "state", "timestamp", "input", "control", *event.__dict__]:
if k.startswith("_") or k == "raw":
Expand All @@ -432,11 +443,28 @@ def workflow_history(
elif output_format == "cooked":
history.parse()
events = {
"workflow": history.workflow,
"activities": history.activities,
"child_workflows": history.child_workflows,
"markers": history.markers,
"timers": history.timers,
"signals": history.signals,
"signal_lists": history.signal_lists,
"external_workflows_signaling": history.external_workflows_signaling,
"signaled_workflows": history.signaled_workflows,
}
elif output_format == "cooked2":
history.parse()
events = {
"workflow": [t for t in history.tasks if t.type == "child_workflow"],
"activities": [t for t in history.tasks if t.type == "activity"],
"child_workflows": history.child_workflows,
"markers": history.markers,
"timers": history.timers,
"signals": [t for t in history.tasks if t.type == "signal"],
"signal_lists": history.signal_lists,
"external_workflows_signaling": history.external_workflows_signaling,
"signaled_workflows": history.signaled_workflows,
}
else:
raise NotImplementedError
Expand Down Expand Up @@ -840,6 +868,11 @@ def standalone(
ex.workflow_id,
ex.run_id,
)
if not ex:
print(
f"Execution {workflow_id} {ex.run_id} not found" if ex.run_id else f"Workflow {workflow_id} not found"
)
sys.exit(1)
if display_status:
print(f"status: {ex.status}", file=sys.stderr)
if ex.status == ex.STATUS_CLOSED:
Expand Down
161 changes: 160 additions & 1 deletion simpleflow/history.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import collections
from typing import TYPE_CHECKING, Callable, ClassVar
from typing import TYPE_CHECKING, Callable, ClassVar, cast

import simpleflow.swf.mapper.models.history
from simpleflow import logger
Expand Down Expand Up @@ -43,11 +43,16 @@ def __init__(self, history: simpleflow.swf.mapper.models.history.History) -> Non
self.started_decision_id: int | None = None
self.completed_decision_id: int | None = None
self.last_event_id: int | None = None
self._workflow: dict[str, Any] = {}

@property
def swf_history(self) -> simpleflow.swf.mapper.models.history.History:
return self._history

@property
def workflow(self):
return self._workflow

@property
def activities(self) -> dict[str, ActivityTaskEventDict]:
"""
Expand Down Expand Up @@ -138,6 +143,48 @@ def tasks(self):
def events(self) -> list[Event]:
return self._history.events

def get_activities_history(self) -> dict[str, dict[str, Any]]:
activities: dict[str, dict[str, Any]] = {}
scheduled_to_activity_id: dict[int, str] = {}
event: ActivityTaskEvent | Any
for event in self.events:
if event.type != "ActivityTask":
continue
cast(ActivityTaskEvent, event)
activity_id = getattr(event, "activity_id", None)
if event.state == "scheduled" and activity_id not in activities:
activities[activity_id] = {
"id": activity_id,
"name": event.activity_type["name"],
"version": event.activity_type["version"],
"states": [event.state],
"scheduled_ids": [event.id],
"scheduled_timestamps": [event.timestamp],
"inputs": [event.input],
"task_lists": [event.task_list["name"]],
}
scheduled_to_activity_id[event.id] = activity_id
else:
if event.state != "scheduled":
scheduled_event = self.events[event.scheduled_event_id - 1]
scheduled_id = scheduled_event.id
activity = activities[scheduled_to_activity_id[scheduled_id]]
activity["task_lists"].append(event.task_list["name"])
else:
activity = activities[event.activity_id]
activity.setdefault("states", []).append(event.state)
activity.setdefault(f"{event.state}_ids", []).append(event.id)
activity.setdefault(f"{event.state}_timestamp", []).append(event.timestamp)
for attr in ("identity", "result", "reason", "details"):
if hasattr(event, attr):
activity.setdefault(attr, []).append(getattr(event, attr))
if event.state == "timed_out":
activity.setdefault("timeout_types", []).append(event.timeout_type)
activity.setdefault(f"{event.timeout_type}_timeouts", []).append(
getattr(event, f"{event.timeout_type}_timeout")
)
activity.setdefault(f"{event.timeout_values}", []).append()

def parse_activity_event(self, events: list[Event | ActivityTaskEvent], event: ActivityTaskEvent):
"""
Aggregate all the attributes of an activity in a single entry.
Expand Down Expand Up @@ -432,6 +479,118 @@ def parse_workflow_event(self, events: list[Event], event: WorkflowExecutionEven
"""
Parse a workflow event.
"""
if event.state == "started":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"child_policy": getattr(event, "child_policy", None),
"task_list": event.task_list["name"],
"workflow_type": event.workflow_type,
"continued_execution_run_id": getattr(event, "continued_execution_run_id", None),
"execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None),
"input": getattr(event, "input", None),
"lambda_role": getattr(event, "lambda_role", None),
"parent_initiated_event_id": getattr(event, "parent_initiated_event_id", None),
"parent_workflow_execution": getattr(event, "parent_workflow_execution", None),
"tag_list": getattr(event, "tag_list", None),
"task_priority": getattr(event, "task_priority", None),
"task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None),
}
)
elif event.state == "continued_as_new":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id,
"new_execution_run_id": event.new_execution_run_id,
"task_list": event.task_list["name"],
"workflow_type": event.workflow_type,
"execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None),
"input": getattr(event, "input", None),
"lambda_role": getattr(event, "lambda_role", None),
"tag_list": getattr(event, "tag_list", None),
"task_priority": getattr(event, "task_priority", None),
"task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None),
}
)
elif event.state == "completed":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"initiated_event_id": getattr(event, "initiated_event_id", None),
"result": getattr(event, "result", None),
}
)
elif event.state == "cancelled":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"initiated_event_id": getattr(event, "initiated_event_id", None),
"decision_task_completed_event_id": event.decision_task_completed_event_id,
"details": getattr(event, "details", None),
}
)
elif event.state == "failed":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"initiated_event_id": getattr(event, "initiated_event_id", None),
"decision_task_completed_event_id": event.decision_task_completed_event_id,
"reason": getattr(event, "reason", None),
"details": getattr(event, "details", None),
}
)
elif event.state == "terminated":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"initiated_event_id": getattr(event, "initiated_event_id", None),
"cause": getattr(event, "cause", None),
"details": getattr(event, "details", None),
}
)
elif event.state == "timed_out":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"initiated_event_id": getattr(event, "initiated_event_id", None),
"timeout_type": event.timeout_type,
}
)
# elif event.state in (
# "cancel_failed",
# "complete_failed",
# "continue_as_new",
# "fail_failed",
# "start_child_failed",
# "start_failed",
# "terminate_failed",
# ):
# self._workflow.update(
# {
# "state": event.state,
# f"{event.state}_id": event.id,
# f"{event.state}_cause": getattr(event, "cause", None),
# f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id,
# }
# )

if event.state == "cancel_requested":
self._workflow.update()
if event.state == "signaled":
signal = {
"type": "signal",
Expand Down
2 changes: 1 addition & 1 deletion simpleflow/local/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from simpleflow.base import Submittable
from simpleflow.history import History
from simpleflow.marker import Marker
from simpleflow.signal import WaitForSignal
from simpleflow.simpleflow_signal import WaitForSignal
from simpleflow.swf.mapper.models.history import builder
from simpleflow.task import ActivityTask, MarkerTask, SignalTask, TaskFailureContext, WorkflowTask
from simpleflow.utils import format_exc, format_exc_type, issubclass_, json_dumps
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from simpleflow.base import Submittable
from simpleflow.history import History
from simpleflow.marker import Marker
from simpleflow.signal import WaitForSignal
from simpleflow.simpleflow_signal import WaitForSignal
from simpleflow.swf import constants
from simpleflow.swf.helpers import swf_identity
from simpleflow.swf.mapper.core import ConnectedSWFObject
Expand Down
Loading
Loading