diff --git a/orquesta/benchmarks/__init__.py b/orquesta/benchmarks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orquesta/benchmarks/specs/__init__.py b/orquesta/benchmarks/specs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orquesta/benchmarks/specs/native/__init__.py b/orquesta/benchmarks/specs/native/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orquesta/benchmarks/specs/native/v1/__init__.py b/orquesta/benchmarks/specs/native/v1/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orquesta/benchmarks/specs/native/v1/test_models.py b/orquesta/benchmarks/specs/native/v1/test_models.py new file mode 100644 index 00000000..6d429b6b --- /dev/null +++ b/orquesta/benchmarks/specs/native/v1/test_models.py @@ -0,0 +1,142 @@ +# Copyright 2021 The StackStorm Authors. +# Copyright 2019 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import orquesta.specs.native.v1.models as models + +from orquesta.expressions import base as expr_base +from orquesta.utils import context as ctx_util + + +WITH_ITEMS = [ + [{"test": "s"}, {"test1": "s"}, {"test2": "s"}], + [{"test": "s" * 10000}, {"test1": "s" * 10000}, {"test2": "s" * 10000}], + [{"test": "s" * 1000000}, {"test1": "s" * 1000000}, {"test2": "s" * 1000000}], +] + + +@pytest.mark.parametrize("fixture", WITH_ITEMS, ids=["small", "medium", "large"]) +@pytest.mark.benchmark(group="no deepcopy") +def test_task_spec_render(benchmark, fixture): + def run_benchmark(): + # Instantiate workflow spec. + task_spec = models.TaskSpec( + { + "action": "core.echo message=<% item() %>", + "next": [{"publish": [{"items": "<% result() %>"}]}], + "with": {"items": "<% ctx(xs) %>"}, + } + ) + in_ctx = { + "xs": fixture, + "__current_task": {"id": "task1", "route": 0}, + "__state": { + "contexts": [{"xs": fixture}], + "routes": [[]], + "sequence": [], + "staged": [ + {"id": "task1", "ctxs": {"in": [0]}, "route": 0, "prev": {}, "ready": True} + ], + "status": "running", + "tasks": {}, + }, + } + # Instantiate conductor + task_spec.render(in_ctx) + + benchmark(run_benchmark) + + +class OldTaskSpec(models.TaskSpec): + def render(self, in_ctx): + action_specs = [] + + if not self.has_items(): + action_spec = { + "action": expr_base.evaluate(self.action, in_ctx), + "input": expr_base.evaluate(getattr(self, "input", {}), in_ctx), + } + + action_specs.append(action_spec) + else: + items_spec = self.get_items_spec() + + if " in " not in items_spec.items: + items_expr = items_spec.items.strip() + else: + start_idx = items_spec.items.index(" in ") + 4 + items_expr = items_spec.items[start_idx:].strip() + + items = expr_base.evaluate(items_expr, in_ctx) + + if not isinstance(items, list): + raise TypeError('The value of "%s" is not type of list.' % items_expr) + + item_keys = ( + None + if " in " not in items_spec.items + else items_spec.items[: items_spec.items.index(" in ")].replace(" ", "").split(",") + ) + + for idx, item in enumerate(items): + if item_keys and (isinstance(item, tuple) or isinstance(item, list)): + item = dict(zip(item_keys, list(item))) + elif item_keys and len(item_keys) == 1: + item = {item_keys[0]: item} + + item_ctx_value = ctx_util.set_current_item(in_ctx, item) + + action_spec = { + "action": expr_base.evaluate(self.action, item_ctx_value), + "input": expr_base.evaluate(getattr(self, "input", {}), item_ctx_value), + "item_id": idx, + } + + action_specs.append(action_spec) + + return self, action_specs + + +@pytest.mark.parametrize("fixture", WITH_ITEMS, ids=["small", "medium", "large"]) +@pytest.mark.benchmark(group="deepcopy") +def test_task_spec_render_old(benchmark, fixture): + def run_benchmark(): + # Instantiate workflow spec. + task_spec = OldTaskSpec( + { + "action": "core.echo message=<% item() %>", + "next": [{"publish": [{"items": "<% result() %>"}]}], + "with": {"items": "<% ctx(xs) %>"}, + } + ) + in_ctx = { + "xs": fixture, + "__current_task": {"id": "task1", "route": 0}, + "__state": { + "contexts": [{"xs": fixture}], + "routes": [[]], + "sequence": [], + "staged": [ + {"id": "task1", "ctxs": {"in": [0]}, "route": 0, "prev": {}, "ready": True} + ], + "status": "running", + "tasks": {}, + }, + } + # Instantiate conductor + task_spec.render(in_ctx) + + benchmark(run_benchmark) diff --git a/orquesta/conducting.py b/orquesta/conducting.py index 3a096974..f71e4610 100644 --- a/orquesta/conducting.py +++ b/orquesta/conducting.py @@ -49,29 +49,29 @@ def __init__(self, conductor=None): def serialize(self): data = { - "contexts": json_util.deepcopy(self.contexts), - "routes": json_util.deepcopy(self.routes), - "sequence": json_util.deepcopy(self.sequence), - "staged": json_util.deepcopy(self.staged), + "contexts": self.contexts, + "routes": self.routes, + "sequence": self.sequence, + "staged": self.staged, "status": self.status, - "tasks": json_util.deepcopy(self.tasks), + "tasks": self.tasks, } if self.reruns: - data["reruns"] = json_util.deepcopy(self.reruns) + data["reruns"] = self.reruns return data @classmethod def deserialize(cls, data): instance = cls() - instance.contexts = json_util.deepcopy(data.get("contexts", list())) - instance.routes = json_util.deepcopy(data.get("routes", list())) - instance.sequence = json_util.deepcopy(data.get("sequence", list())) + instance.contexts = data.get("contexts", list()) + instance.routes = data.get("routes", list()) + instance.sequence = data.get("sequence", list()) instance.staged = json_util.deepcopy(data.get("staged", list())) instance.status = data.get("status", statuses.UNSET) - instance.tasks = json_util.deepcopy(data.get("tasks", dict())) - instance.reruns = json_util.deepcopy(data.get("reruns", list())) + instance.tasks = data.get("tasks", dict()) + instance.reruns = data.get("reruns", list()) return instance @@ -279,10 +279,10 @@ def serialize(self): "spec": self.spec.serialize(), "graph": self.graph.serialize(), "input": self.get_workflow_input(), - "context": self.get_workflow_parent_context(), + "context": self._parent_ctx, "state": self.workflow_state.serialize(), - "log": json_util.deepcopy(self.log), - "errors": json_util.deepcopy(self.errors), + "log": self.log, + "errors": self.errors, "output": self.get_workflow_output(), } @@ -292,12 +292,12 @@ def deserialize(cls, data): spec = spec_module.WorkflowSpec.deserialize(data["spec"]) graph = graphing.WorkflowGraph.deserialize(data["graph"]) - inputs = json_util.deepcopy(data["input"]) - context = json_util.deepcopy(data["context"]) + inputs = data["input"] + context = data["context"] state = WorkflowState.deserialize(data["state"]) - log = json_util.deepcopy(data.get("log", [])) + log = data.get("log", []) errors = json_util.deepcopy(data["errors"]) - outputs = json_util.deepcopy(data["output"]) + outputs = data["output"] instance = cls(spec) instance.restore(graph, log, errors, state, inputs, outputs, context) @@ -317,7 +317,7 @@ def workflow_state(self): self._workflow_state = WorkflowState(conductor=self) # Set any given context as the initial context. - init_ctx = self.get_workflow_parent_context() + init_ctx = self.get_workflow_parent_context_copy() # Render workflow inputs and merge into the initial context. workflow_input = self.get_workflow_input() @@ -407,11 +407,11 @@ def log_errors(self, errors, task_id=None, route=None, task_transition_id=None): error, task_id=task_id, route=route, task_transition_id=task_transition_id ) - def get_workflow_parent_context(self): + def get_workflow_parent_context_copy(self): return json_util.deepcopy(self._parent_ctx) def get_workflow_input(self): - return json_util.deepcopy(self._inputs) + return self._inputs def get_workflow_status(self): return self.workflow_state.status @@ -460,7 +460,7 @@ def request_workflow_status(self, status): raise exc.InvalidWorkflowStatusTransition(current_status, wf_ex_event.name) def get_workflow_initial_context(self): - return json_util.deepcopy(self.workflow_state.contexts[0]) + return self.workflow_state.contexts[0] def get_workflow_terminal_context(self): if self.get_workflow_status() not in statuses.COMPLETED_STATUSES: @@ -481,8 +481,7 @@ def get_workflow_terminal_context(self): for idx, task in other_term_tasks: # Remove the initial context since the first task processed above already # inclulded that and we only want to apply the differences. - in_ctx_idxs = json_util.deepcopy(task["ctxs"]["in"]) - in_ctx_idxs.remove(0) + in_ctx_idxs = [i for index, i in enumerate(task["ctxs"]["in"]) if index != 0] wf_term_ctx = dict_util.merge_dicts( wf_term_ctx, self.get_task_context(in_ctx_idxs), overwrite=True @@ -512,7 +511,7 @@ def render_workflow_output(self): self.request_workflow_status(statuses.FAILED) def get_workflow_output(self): - return json_util.deepcopy(self._outputs) if self._outputs else None + return self._outputs if self._outputs else None def reset_workflow_output(self): self._outputs = None @@ -782,7 +781,7 @@ def setup_retry_in_task_state(self, task_state_entry, in_ctx_idxs): # Setup the retry in the task state. task_id = task_state_entry["id"] task_retry_spec = self.graph.get_task_retry_spec(task_id) - task_state_entry["retry"] = json_util.deepcopy(task_retry_spec) + task_state_entry["retry"] = task_retry_spec task_state_entry["retry"]["tally"] = 0 # Get task context for evaluating the expression in delay and count. @@ -1188,8 +1187,8 @@ def get_task_transition_contexts(self, task_id, route): def _request_task_rerun(self, task_id, route, reset_items=False): task = self.workflow_state.get_task(task_id, route) - task_ctx = json_util.deepcopy(task["ctxs"]["in"]) - task_prev = json_util.deepcopy(task["prev"]) + task_ctx = task["ctxs"]["in"] + task_prev = task["prev"] task_spec = self.spec.tasks.get_task(task_id) # Reset terminal status for the rerunnable candidate. diff --git a/orquesta/graphing.py b/orquesta/graphing.py index 6c718cb1..4944fd79 100644 --- a/orquesta/graphing.py +++ b/orquesta/graphing.py @@ -47,7 +47,7 @@ def serialize(self): @classmethod def deserialize(cls, data): - g = json_graph.adjacency_graph(json_util.deepcopy(data), directed=True, multigraph=True) + g = json_graph.adjacency_graph(data, directed=True, multigraph=True) return cls(graph=g) @staticmethod diff --git a/orquesta/machines.py b/orquesta/machines.py index 55b41be0..8657eec6 100644 --- a/orquesta/machines.py +++ b/orquesta/machines.py @@ -18,7 +18,6 @@ from orquesta import events from orquesta import exceptions as exc from orquesta import statuses -from orquesta.utils import jsonify as json_util LOG = logging.getLogger(__name__) @@ -527,11 +526,15 @@ def add_context_to_task_item_event(cls, workflow_state, task_id, task_route, ac_ if ac_ex_event.status in requirements: # Make a copy of the items and remove current item under evaluation. staged_task = workflow_state.get_staged_task(task_id, task_route) - items = json_util.deepcopy(staged_task["items"]) - del items[ac_ex_event.item_id] - items_status = [item.get("status", statuses.UNSET) for item in items] + items = staged_task["items"] + items_status = [ + item.get("status", statuses.UNSET) + for index, item in enumerate(items) + if index != ac_ex_event.item_id + ] # Assess various situations. + # todo(aj) loop over list one time and add to each list active = list(filter(lambda x: x in statuses.ACTIVE_STATUSES, items_status)) incomplete = list(filter(lambda x: x not in statuses.COMPLETED_STATUSES, items_status)) paused = list(filter(lambda x: x in [statuses.PENDING, statuses.PAUSED], items_status)) diff --git a/orquesta/specs/native/v1/models.py b/orquesta/specs/native/v1/models.py index 9cf10546..72b7b048 100644 --- a/orquesta/specs/native/v1/models.py +++ b/orquesta/specs/native/v1/models.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import logging import six from six.moves import queue @@ -22,7 +23,6 @@ from orquesta.expressions import base as expr_base from orquesta.specs.native.v1 import base as native_v1_specs from orquesta.specs import types as spec_types -from orquesta.utils import context as ctx_util from orquesta.utils import dictionary as dict_util from orquesta.utils import jsonify as json_util from orquesta.utils import parameters as args_util @@ -155,7 +155,6 @@ def has_retry(self): def render(self, in_ctx): action_specs = [] - if not self.has_items(): action_spec = { "action": expr_base.evaluate(self.action, in_ctx), @@ -182,27 +181,32 @@ def render(self, in_ctx): if " in " not in items_spec.items else items_spec.items[: items_spec.items.index(" in ")].replace(" ", "").split(",") ) - for idx, item in enumerate(items): if item_keys and (isinstance(item, tuple) or isinstance(item, list)): item = dict(zip(item_keys, list(item))) elif item_keys and len(item_keys) == 1: item = {item_keys[0]: item} - item_ctx_value = ctx_util.set_current_item(in_ctx, item) + if in_ctx and not isinstance(in_ctx, dict): + raise TypeError("The context is not type of dict.") - action_spec = { - "action": expr_base.evaluate(self.action, item_ctx_value), - "input": expr_base.evaluate(getattr(self, "input", {}), item_ctx_value), - "item_id": idx, - } + in_ctx["__current_item"] = item + try: + action_spec = { + "action": expr_base.evaluate(self.action, in_ctx), + "input": expr_base.evaluate(getattr(self, "input", {}), in_ctx), + "item_id": idx, + } + finally: + in_ctx.pop("__current_item", None) + # expr_base.evaluate does the copy for us here. action_specs.append(action_spec) return self, action_specs def finalize_context(self, next_task_name, task_transition_meta, in_ctx): - rolling_ctx = json_util.deepcopy(in_ctx) + rolling_ctx = copy.copy(in_ctx) new_ctx = {} errors = [] @@ -647,7 +651,8 @@ def __init__(self, spec, name=None, member=False): super(WorkflowSpec, self).__init__(spec, name=name, member=member) def render_input(self, runtime_inputs, in_ctx=None): - rolling_ctx = json_util.deepcopy(in_ctx) if in_ctx else {} + # only replacing top key values in dict a copy is fine here + rolling_ctx = copy.copy(in_ctx) if in_ctx else {} errors = [] for input_spec in getattr(self, "input") or []: @@ -669,7 +674,8 @@ def render_input(self, runtime_inputs, in_ctx=None): return rolling_ctx, errors def render_vars(self, in_ctx): - rolling_ctx = json_util.deepcopy(in_ctx) + # only replacing top key values in dict a copy is fine here + rolling_ctx = copy.copy(in_ctx) rendered_vars = {} errors = [] @@ -688,7 +694,7 @@ def render_vars(self, in_ctx): def render_output(self, in_ctx): output_specs = getattr(self, "output") or [] - rolling_ctx = json_util.deepcopy(in_ctx) + rolling_ctx = copy.copy(in_ctx) rendered_outputs = {} errors = [] diff --git a/orquesta/tests/unit/base.py b/orquesta/tests/unit/base.py index d0cca988..3b846cf1 100644 --- a/orquesta/tests/unit/base.py +++ b/orquesta/tests/unit/base.py @@ -209,20 +209,11 @@ def assert_task_list(self, conductor, actual, expected): expected_copy = copy.deepcopy(expected) for task in actual_copy: - for staged_task in task["ctx"]["__state"]["staged"]: - if "items" in staged_task: - del staged_task["items"] - task["spec"] = task["spec"].serialize() for task in expected_copy: task["ctx"]["__current_task"] = {"id": task["id"], "route": task["route"]} task["ctx"]["__state"] = conductor.workflow_state.serialize() - - for staged_task in task["ctx"]["__state"]["staged"]: - if "items" in staged_task: - del staged_task["items"] - task["spec"] = task["spec"].serialize() self.assertListEqual(actual_copy, expected_copy) diff --git a/orquesta/tests/unit/conducting/test_workflow_conductor.py b/orquesta/tests/unit/conducting/test_workflow_conductor.py index cbbe5ddd..00a80b97 100644 --- a/orquesta/tests/unit/conducting/test_workflow_conductor.py +++ b/orquesta/tests/unit/conducting/test_workflow_conductor.py @@ -97,7 +97,7 @@ def _prep_conductor(self, context=None, inputs=None, status=None): self.assertDictEqual(conductor._inputs, user_inputs) self.assertDictEqual(conductor.get_workflow_input(), user_inputs) self.assertDictEqual(conductor._parent_ctx, parent_context) - self.assertDictEqual(conductor.get_workflow_parent_context(), parent_context) + self.assertDictEqual(conductor.get_workflow_parent_context_copy(), parent_context) default_inputs = {"a": None, "b": False} init_ctx_value = dict_util.merge_dicts(default_inputs, user_inputs, True) @@ -282,7 +282,7 @@ def test_serialization(self): "spec": conductor.spec.serialize(), "graph": conductor.graph.serialize(), "state": conductor.workflow_state.serialize(), - "context": conductor.get_workflow_parent_context(), + "context": conductor.get_workflow_parent_context_copy(), "input": conductor.get_workflow_input(), "output": conductor.get_workflow_output(), "errors": conductor.errors, diff --git a/orquesta/tests/unit/conducting/test_workflow_state.py b/orquesta/tests/unit/conducting/test_workflow_state.py index 52909689..a038ebcf 100644 --- a/orquesta/tests/unit/conducting/test_workflow_state.py +++ b/orquesta/tests/unit/conducting/test_workflow_state.py @@ -71,6 +71,26 @@ def test_get_tasks_by_task_id(self): self.assertListEqual(actual_task_sequence, expected_task_sequence) + def test_stage_is_deepcopied(self): + # if staged is not deep copied then an st2 workflow with a failed + # with_items task will never finish running. It will remain in a + # running state forever. I believe it is due to iteration over staged + # tasks and mutation of the staged section. + data = copy.deepcopy(MOCK_WORKFLOW_STATE) + + task_sequence = [ + {"id": "task1", "route": 0}, + {"id": "task2", "route": 0}, + {"id": "task2", "route": 1}, + {"id": "task2", "route": 2}, + {"id": "task3", "route": 0}, + ] + + data["sequence"] = copy.deepcopy(task_sequence) + state = conducting.WorkflowState.deserialize(data) + MOCK_WORKFLOW_STATE["staged"] = ["something"] + self.assertNotEqual(len(state.staged), len(MOCK_WORKFLOW_STATE["staged"])) + def test_get_tasks_by_task_id_and_route(self): data = copy.deepcopy(MOCK_WORKFLOW_STATE) diff --git a/requirements-test.txt b/requirements-test.txt index 7c109be0..1a47c367 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -8,5 +8,6 @@ pytest-cov==4.1.0 pep8==1.7.1 pylint==3.1.0 twine +pytest-benchmark # 202404: Use forked version for flake8 v7.0.0 to align requirements with st2 test-requirements hacking @ git+https://github.com/nzlosh/hacking@flake8v7 diff --git a/tox.ini b/tox.ini index bd6ee1b4..4459c485 100644 --- a/tox.ini +++ b/tox.ini @@ -17,6 +17,7 @@ deps = -r{toxinidir}/requirements-test.txt commands = pytest --cov=orquesta --cov-report=term orquesta/tests + pytest orquesta/benchmarks [testenv:pep8] deps =