From 296a0b4258d7ccd1ef9090aeed0b44584ff0a7cd Mon Sep 17 00:00:00 2001 From: blag Date: Wed, 5 Dec 2018 17:53:56 -0800 Subject: [PATCH 1/8] Add ASCII art diagrams to illustrate the join task attribute --- docs/source/languages/orquesta.rst | 93 +++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 3 deletions(-) diff --git a/docs/source/languages/orquesta.rst b/docs/source/languages/orquesta.rst index d7af8e33..babfc3b0 100644 --- a/docs/source/languages/orquesta.rst +++ b/docs/source/languages/orquesta.rst @@ -107,9 +107,73 @@ execution, the task is evaluated for completion. If criteria for transition is m set of tasks is invoked, sequentially in the order of the transitions and tasks that are listed. If more than one tasks transition to the same task and ``join`` is specified in the latter (i.e. the -task named "task3" in the example below), then the task being transitioned into becomes a barrier -for the inbound task transitions. There will be only one instance of the barrier task. In the -workflow graph, there will be multiple inbound edges to the barrier node. +task named ``barrier_task`` in the example below), then the task being transitioned into becomes a +barrier for the inbound task transitions. There will be only one instance of the barrier task. In +the workflow graph, there will be multiple inbound edges to the barrier node. + +.. code-block:: yaml + + version: 1.0 + + tasks: + setup_task: + # ... + # Run tasks in parallel + next: + - do: + - parallel_task_1 + - parallel_task_2 + - parallel_task_3 + + parallel_task_1: + # ... + # Wait to run barrier_task after this + next: + - do: + - barrier_task + + parallel_task_2: + # ... + # Eventually run barrier_task + next: + - do: + - intermediate_task + + intermediate_task: + # ... + # Wait to run barrier_task after this + next: + - do: + - barrier_task + + barrier_task: + # ... + # Run after parallel_task_1, parallel_task_2, and intermediate_task have all finished + join: all + + parallel_task_3: + # ... + # Run immediately after setup_task, do NOT wait for barrier_task + +Will result in this execution graph: + +.. code-block:: none + + =---- time (not to scale) ----> + + setup_task --+ + | + +------ parallel_task_1 --------------------------+ + | | + +-- parallel_task_2 --+ | + | | | + | +---- intermediate_task ----+ + | | + | +-- barrier_task --+ + | | + +-- parallel_task_3 -------------------------------------------------+ + | + +-- [finish] Conversely, if more than one tasks transition to the same task and ``join`` is **not** specified in the latter (i.e. the task named "log" in the example below), then the target task will be invoked @@ -117,6 +181,29 @@ immediately following the completion of the previous task. There will be multipl target task. In the workflow graph, each invocation of the target task will be its own branch with the inbound edge from the node of the previous task. +In other words, if ``join: all`` was removed from the previous workflow, the ``barrier_task`` would +be run two different times, resulting in this execution graph: + +.. code-block:: none + + =---- time (not to scale) ----> + + setup_task --+ + | + +------ parallel_task_1 ------+ + | | + | +-- barrier_task (1) ----------------------+ + | | + +-- parallel_task_2 --+ | + | | | + | +---- intermediate_task ----+ | + | | | + | +-- barrier_task (2) --+ + | | + +-- parallel_task_3 -----------------------------------------------------+ + | + +-- [finish] + With Items Model ---------------- From c80556187c86a0b414693e3b52dbb7ae75937869 Mon Sep 17 00:00:00 2001 From: blag Date: Tue, 18 Dec 2018 13:01:58 -0800 Subject: [PATCH 2/8] Fix last few references and betterize example callout --- docs/source/languages/orquesta.rst | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/source/languages/orquesta.rst b/docs/source/languages/orquesta.rst index babfc3b0..be79f071 100644 --- a/docs/source/languages/orquesta.rst +++ b/docs/source/languages/orquesta.rst @@ -111,6 +111,9 @@ task named ``barrier_task`` in the example below), then the task being transitio barrier for the inbound task transitions. There will be only one instance of the barrier task. In the workflow graph, there will be multiple inbound edges to the barrier node. +The following workflow definition illustrates the execution of parallel branches. The barrier task +will be blocked until all the parallel branches complete and reach it. + .. code-block:: yaml version: 1.0 @@ -155,7 +158,7 @@ the workflow graph, there will be multiple inbound edges to the barrier node. # ... # Run immediately after setup_task, do NOT wait for barrier_task -Will result in this execution graph: +The following is the corresponding workflow execution graph. .. code-block:: none @@ -176,10 +179,10 @@ Will result in this execution graph: +-- [finish] Conversely, if more than one tasks transition to the same task and ``join`` is **not** specified in -the latter (i.e. the task named "log" in the example below), then the target task will be invoked -immediately following the completion of the previous task. There will be multiple instances of the -target task. In the workflow graph, each invocation of the target task will be its own branch with -the inbound edge from the node of the previous task. +the latter, then the target task will be invoked immediately following the completion of the +previous task. There will be multiple instances of the target task. In the workflow graph, each +invocation of the target task will be its own branch with the inbound edge from the node of the +previous task. In other words, if ``join: all`` was removed from the previous workflow, the ``barrier_task`` would be run two different times, resulting in this execution graph: From b0289a2bcda56d08c0b8c9ba54dc368579235d4a Mon Sep 17 00:00:00 2001 From: W Chan Date: Thu, 10 Jan 2019 15:17:34 -0800 Subject: [PATCH 3/8] Add get_routes function to workflow graph Add a function in workflow graph to identify all possible routes as a function of the leaves or leaf clusters (closed cycles) in the graph. --- orquesta/graphing.py | 109 +++++++- orquesta/tests/unit/base.py | 5 + .../tests/unit/composition/native/base.py | 10 + .../tests/unit/graphing/native/__init__.py | 0 .../unit/graphing/native/test_routes_basic.py | 132 +++++++++ .../unit/graphing/native/test_routes_cycle.py | 65 +++++ .../unit/graphing/native/test_routes_join.py | 75 +++++ .../unit/graphing/native/test_routes_split.py | 263 ++++++++++++++++++ .../native/test_routes_task_transition.py | 164 +++++++++++ .../unit/graphing/test_workflow_graph.py | 82 ++++++ 10 files changed, 899 insertions(+), 6 deletions(-) create mode 100644 orquesta/tests/unit/graphing/native/__init__.py create mode 100644 orquesta/tests/unit/graphing/native/test_routes_basic.py create mode 100644 orquesta/tests/unit/graphing/native/test_routes_cycle.py create mode 100644 orquesta/tests/unit/graphing/native/test_routes_join.py create mode 100644 orquesta/tests/unit/graphing/native/test_routes_split.py create mode 100644 orquesta/tests/unit/graphing/native/test_routes_task_transition.py diff --git a/orquesta/graphing.py b/orquesta/graphing.py index d08b61e1..29b3e76f 100644 --- a/orquesta/graphing.py +++ b/orquesta/graphing.py @@ -16,7 +16,9 @@ import networkx as nx from networkx.readwrite import json_graph + import six +from six.moves import queue from orquesta import exceptions as exc from orquesta.utils import dictionary as dict_utils @@ -35,21 +37,40 @@ def __init__(self, graph=None): self._graph = graph if graph else nx.MultiDiGraph() def serialize(self): - return json_graph.adjacency_data(self._graph) + data = json_graph.adjacency_data(self._graph) + + data['adjacency'] = [ + sorted(outbounds, key=lambda x: x['id']) + for outbounds in data['adjacency'] + ] + + return data @classmethod def deserialize(cls, data): g = json_graph.adjacency_graph(copy.deepcopy(data), directed=True, multigraph=True) return cls(graph=g) + @staticmethod + def get_root_nodes(graph): + nodes = [ + {'id': n, 'name': graph.node[n].get('name', n)} + for n, d in graph.in_degree().items() if d == 0 + ] + + return sorted(nodes, key=lambda x: x['id']) + @property def roots(self): - tasks = [ - {'id': n, 'name': self._graph.node[n].get('name', n)} - for n, d in self._graph.in_degree().items() if d == 0 - ] + return self.get_root_nodes(self._graph) + + @property + def leaves(self): + # Reverse the graph using a copy to identify the root nodes. + return self.get_root_nodes(self._graph.reverse(copy=True)) - return sorted(tasks, key=lambda x: x['name']) + def has_tasks(self): + return len(self._graph) > 0 def has_task(self, task_id): return self._graph.has_node(task_id) @@ -167,5 +188,81 @@ def has_barrier(self, task_id): return (b is not None and b != '') + def get_cycles(self): + return [ + {'tasks': sorted(c), 'route': nx.find_cycle(self._graph, c)} + for c in nx.simple_cycles(self._graph) + ] + def in_cycle(self, task_id): return [c for c in nx.simple_cycles(self._graph) if task_id in c] + + def is_cycle_closed(self, cycle): + # A cycle is closed, for a lack of better term, if there is no task + # transition to any task that is not a member of the cycle. + for task_id in cycle['tasks']: + for transition in self.get_next_transitions(task_id): + if transition[1] not in cycle['tasks']: + return False + + return True + + def get_route(self, leaf_cluster): + tasks = [] + path = [] + q = queue.Queue() + + if isinstance(leaf_cluster, six.string_types): + leaf_cluster = [leaf_cluster] + + for leaf_task_id in leaf_cluster: + for transition in self.get_prev_transitions(leaf_task_id): + q.put(transition) + + while not q.empty(): + transition = q.get() + + source = transition[0] + destination = transition[1] + edge_key = transition[2] + path_section = (source, destination, edge_key) + + if path_section in path: + continue + + tasks.append(source) + tasks.append(destination) + path.append(path_section) + + for transition in self.get_prev_transitions(source): + q.put(transition) + + if not tasks or not path: + return None + + return { + 'tasks': sorted(list(set(tasks))), + 'path': sorted(path, key=lambda x: (x[0], x[1], x[2])) + } + + def get_routes(self): + routes = [] + + # Identify routes for each leaves in the graph first. This list is needed + # to filter out cycles that are not covered. + for node in self.leaves: + routes.append(self.get_route([node['id']])) + + # Identify routes for each leaf clusters (closed cycles) in the graph. + for cycle in self.get_cycles(): + if not routes or self.is_cycle_closed(cycle): + routes.append(self.get_route(cycle['tasks'])) + else: + for route in routes: + # If the cycle is not in any of the routes, + # then consider this cycle as a separate route. + if set(cycle['tasks']) - set(route['tasks']): + routes.append(self.get_route(cycle['tasks'])) + break + + return routes diff --git a/orquesta/tests/unit/base.py b/orquesta/tests/unit/base.py index 47f4c68a..99be9001 100644 --- a/orquesta/tests/unit/base.py +++ b/orquesta/tests/unit/base.py @@ -67,6 +67,11 @@ def assert_graph_equal(self, wf_graph, expected_wf_graph): self.assertListEqual(wf_graph_meta, expected_wf_graph_meta) + wf_graph_attrs = sorted(wf_graph_json['graph'], key=lambda x: x[0]) + expected_wf_graph_attrs = sorted(expected_wf_graph['graph'], key=lambda x: x[0]) + + self.assertListEqual(wf_graph_attrs, expected_wf_graph_attrs) + class WorkflowSpecTest(unittest.TestCase): spec_module_name = 'mock' diff --git a/orquesta/tests/unit/composition/native/base.py b/orquesta/tests/unit/composition/native/base.py index e2e47783..79807045 100644 --- a/orquesta/tests/unit/composition/native/base.py +++ b/orquesta/tests/unit/composition/native/base.py @@ -10,6 +10,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + from orquesta.tests.unit import base @@ -19,3 +21,11 @@ class OrchestraWorkflowComposerTest(base.WorkflowComposerTest): def setUpClass(cls): cls.spec_module_name = 'native' super(OrchestraWorkflowComposerTest, cls).setUpClass() + + def assert_wf_ex_routes(self, wf_name, expected_routes, debug=False): + wf_ex_graph = self.compose_wf_ex_graph(wf_name) + + if debug: + print(json.dumps(wf_ex_graph.get_routes(), indent=4)) + + self.assertListEqual(wf_ex_graph.get_routes(), expected_routes) diff --git a/orquesta/tests/unit/graphing/native/__init__.py b/orquesta/tests/unit/graphing/native/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orquesta/tests/unit/graphing/native/test_routes_basic.py b/orquesta/tests/unit/graphing/native/test_routes_basic.py new file mode 100644 index 00000000..f30489f4 --- /dev/null +++ b/orquesta/tests/unit/graphing/native/test_routes_basic.py @@ -0,0 +1,132 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orquesta.tests.unit.composition.native import base + + +class BasicWorkflowRoutesTest(base.OrchestraWorkflowComposerTest): + + def test_sequential(self): + wf_name = 'sequential' + + expected_routes = [ + { + 'tasks': [ + 'noop', + 'task1', + 'task2', + 'task3' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task3', 0), + ('task3', 'noop', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_parallel(self): + wf_name = 'parallel' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task3' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task3', 0) + ] + }, + { + 'tasks': [ + 'task4', + 'task5', + 'task6' + ], + 'path': [ + ('task4', 'task5', 0), + ('task5', 'task6', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_branching(self): + wf_name = 'branching' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task3' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task3', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task4', + 'task5' + ], + 'path': [ + ('task1', 'task4', 0), + ('task4', 'task5', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_decision(self): + wf_name = 'decision' + + expected_routes = [ + { + 'tasks': [ + 'a', + 't1' + ], + 'path': [ + ('t1', 'a', 0), + ] + }, + { + 'tasks': [ + 'b', + 't1' + ], + 'path': [ + ('t1', 'b', 0), + ] + }, + { + 'tasks': [ + 'c', + 't1' + ], + 'path': [ + ('t1', 'c', 0), + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) diff --git a/orquesta/tests/unit/graphing/native/test_routes_cycle.py b/orquesta/tests/unit/graphing/native/test_routes_cycle.py new file mode 100644 index 00000000..e98f36db --- /dev/null +++ b/orquesta/tests/unit/graphing/native/test_routes_cycle.py @@ -0,0 +1,65 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orquesta.tests.unit.composition.native import base + + +class CyclicWorkflowRoutesTest(base.OrchestraWorkflowComposerTest): + + def test_cycle(self): + wf_name = 'cycle' + + expected_routes = [ + { + 'tasks': [ + 'prep', + 'task1', + 'task2', + 'task3' + ], + 'path': [ + ('prep', 'task1', 0), + ('task1', 'task2', 0), + ('task2', 'task3', 0), + ('task3', 'task1', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_cycles(self): + wf_name = 'cycles' + + expected_routes = [ + { + 'tasks': [ + 'prep', + 'task1', + 'task2', + 'task3', + 'task4', + 'task5' + ], + 'path': [ + ('prep', 'task1', 0), + ('task1', 'task2', 0), + ('task2', 'task3', 0), + ('task2', 'task5', 0), + ('task3', 'task4', 0), + ('task4', 'task2', 0), + ('task5', 'task1', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) diff --git a/orquesta/tests/unit/graphing/native/test_routes_join.py b/orquesta/tests/unit/graphing/native/test_routes_join.py new file mode 100644 index 00000000..d1bdaca4 --- /dev/null +++ b/orquesta/tests/unit/graphing/native/test_routes_join.py @@ -0,0 +1,75 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orquesta.tests.unit.composition.native import base + + +class JoinWorkflowRoutesTest(base.OrchestraWorkflowComposerTest): + + def test_join(self): + wf_name = 'join' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task3', + 'task4', + 'task5', + 'task6', + 'task7' + ], + 'path': [ + ('task1', 'task2', 0), + ('task1', 'task4', 0), + ('task2', 'task3', 0), + ('task3', 'task6', 0), + ('task4', 'task5', 0), + ('task5', 'task6', 0), + ('task6', 'task7', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_join_count(self): + wf_name = 'join-count' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task3', + 'task4', + 'task5', + 'task6', + 'task7', + 'task8' + ], + 'path': [ + ('task1', 'task2', 0), + ('task1', 'task4', 0), + ('task1', 'task6', 0), + ('task2', 'task3', 0), + ('task3', 'task8', 0), + ('task4', 'task5', 0), + ('task5', 'task8', 0), + ('task6', 'task7', 0), + ('task7', 'task8', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) diff --git a/orquesta/tests/unit/graphing/native/test_routes_split.py b/orquesta/tests/unit/graphing/native/test_routes_split.py new file mode 100644 index 00000000..5bb90766 --- /dev/null +++ b/orquesta/tests/unit/graphing/native/test_routes_split.py @@ -0,0 +1,263 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orquesta.tests.unit.composition.native import base + + +class SplitWorkflowRoutesTest(base.OrchestraWorkflowComposerTest): + + def test_split(self): + wf_name = 'split' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task4__1', + 'task5__1', + 'task6__1', + 'task7__1', + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task4__1', 0), + ('task4__1', 'task5__1', 0), + ('task4__1', 'task6__1', 0), + ('task5__1', 'task7__1', 0), + ('task6__1', 'task7__1', 0), + ] + }, + { + 'tasks': [ + 'task1', + 'task3', + 'task4__2', + 'task5__2', + 'task6__2', + 'task7__2' + ], + 'path': [ + ('task1', 'task3', 0), + ('task3', 'task4__2', 0), + ('task4__2', 'task5__2', 0), + ('task4__2', 'task6__2', 0), + ('task5__2', 'task7__2', 0), + ('task6__2', 'task7__2', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_splits(self): + wf_name = 'splits' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task8__1' + ], + 'path': [ + ('task1', 'task8__1', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task2', + 'task4__1', + 'task5__1', + 'task6__1', + 'task7__1', + 'task8__2' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task4__1', 0), + ('task4__1', 'task5__1', 0), + ('task4__1', 'task6__1', 0), + ('task5__1', 'task7__1', 0), + ('task6__1', 'task7__1', 0), + ('task7__1', 'task8__2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task3', + 'task4__2', + 'task5__2', + 'task6__2', + 'task7__2', + 'task8__3' + ], + 'path': [ + ('task1', 'task3', 0), + ('task3', 'task4__2', 0), + ('task4__2', 'task5__2', 0), + ('task4__2', 'task6__2', 0), + ('task5__2', 'task7__2', 0), + ('task6__2', 'task7__2', 0), + ('task7__2', 'task8__3', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_nested_splits(self): + wf_name = 'splits-nested' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task10__1', + 'task2', + 'task4__1', + 'task5__1', + 'task7__1', + 'task8__1', + 'task9__1' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task4__1', 0), + ('task4__1', 'task5__1', 0), + ('task5__1', 'task7__1', 0), + ('task7__1', 'task8__1', 0), + ('task7__1', 'task9__1', 0), + ('task8__1', 'task10__1', 0), + ('task9__1', 'task10__1', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task10__2', + 'task2', + 'task4__1', + 'task6__1', + 'task7__2', + 'task8__2', + 'task9__2' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task4__1', 0), + ('task4__1', 'task6__1', 0), + ('task6__1', 'task7__2', 0), + ('task7__2', 'task8__2', 0), + ('task7__2', 'task9__2', 0), + ('task8__2', 'task10__2', 0), + ('task9__2', 'task10__2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task10__3', + 'task3', + 'task4__2', + 'task5__2', + 'task7__3', + 'task8__3', + 'task9__3' + ], + 'path': [ + ('task1', 'task3', 0), + ('task3', 'task4__2', 0), + ('task4__2', 'task5__2', 0), + ('task5__2', 'task7__3', 0), + ('task7__3', 'task8__3', 0), + ('task7__3', 'task9__3', 0), + ('task8__3', 'task10__3', 0), + ('task9__3', 'task10__3', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task10__4', + 'task3', + 'task4__2', + 'task6__2', + 'task7__4', + 'task8__4', + 'task9__4' + ], + 'path': [ + ('task1', 'task3', 0), + ('task3', 'task4__2', 0), + ('task4__2', 'task6__2', 0), + ('task6__2', 'task7__4', 0), + ('task7__4', 'task8__4', 0), + ('task7__4', 'task9__4', 0), + ('task8__4', 'task10__4', 0), + ('task9__4', 'task10__4', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_splits_extra_join(self): + wf_name = 'splits-join' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task4__1', + 'task5__1', + 'task6__1', + 'task7__1', + 'task8__1' + ], + 'path': [ + ('task1', 'task2', 0), + ('task1', 'task8__1', 0), + ('task2', 'task4__1', 0), + ('task4__1', 'task5__1', 0), + ('task4__1', 'task6__1', 0), + ('task5__1', 'task7__1', 0), + ('task6__1', 'task7__1', 0), + ('task7__1', 'task8__1', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task3', + 'task4__2', + 'task5__2', + 'task6__2', + 'task7__2', + 'task8__2' + ], + 'path': [ + ('task1', 'task3', 0), + ('task1', 'task8__2', 0), + ('task3', 'task4__2', 0), + ('task4__2', 'task5__2', 0), + ('task4__2', 'task6__2', 0), + ('task5__2', 'task7__2', 0), + ('task6__2', 'task7__2', 0), + ('task7__2', 'task8__2', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) diff --git a/orquesta/tests/unit/graphing/native/test_routes_task_transition.py b/orquesta/tests/unit/graphing/native/test_routes_task_transition.py new file mode 100644 index 00000000..adf188d7 --- /dev/null +++ b/orquesta/tests/unit/graphing/native/test_routes_task_transition.py @@ -0,0 +1,164 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orquesta.tests.unit.composition.native import base + + +class TaskTransitionWorkflowRoutesTest(base.OrchestraWorkflowComposerTest): + + def test_error_handling(self): + wf_name = 'error-handling' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2' + ], + 'path': [ + ('task1', 'task2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task3' + ], + 'path': [ + ('task1', 'task3', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_task_with_duplicate_when(self): + wf_name = 'task-duplicate-when' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2' + ], + 'path': [ + ('task1', 'task2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task3' + ], + 'path': [ + ('task1', 'task3', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_task_with_duplicate_transition(self): + wf_name = 'task-duplicate-transition' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2__1' + ], + 'path': [ + ('task1', 'task2__1', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task2__2' + ], + 'path': [ + ('task1', 'task2__2', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_on_complete(self): + wf_name = 'task-on-complete' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2' + ], + 'path': [ + ('task1', 'task2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task3' + ], + 'path': [ + ('task1', 'task3', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task4' + ], + 'path': [ + ('task1', 'task4', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_task_transitions_split(self): + wf_name = 'task-transitions-split' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2__1' + ], + 'path': [ + ('task1', 'task2__1', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task2__2' + ], + 'path': [ + ('task1', 'task2__2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task2__3' + ], + 'path': [ + ('task1', 'task2__3', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) diff --git a/orquesta/tests/unit/graphing/test_workflow_graph.py b/orquesta/tests/unit/graphing/test_workflow_graph.py index 1ad77fe5..ab8d90ef 100644 --- a/orquesta/tests/unit/graphing/test_workflow_graph.py +++ b/orquesta/tests/unit/graphing/test_workflow_graph.py @@ -155,6 +155,18 @@ def test_graph_roots(self): self.assertListEqual(wf_graph.roots, expected) + def test_graph_leaves(self): + wf_graph = self._prep_graph() + + expected_leaves = [{'id': 'task6', 'name': 'task6'}, {'id': 'task9', 'name': 'task9'}] + + self.assertListEqual(wf_graph.leaves, expected_leaves) + + # Ensure the underlying graph is not permanently altered. + expected_roots = [{'id': 'task1', 'name': 'task1'}] + + self.assertListEqual(wf_graph.roots, expected_roots) + def test_skip_add_tasks(self): wf_graph = graphing.WorkflowGraph() @@ -347,3 +359,73 @@ def test_split_from_reused_task(self): len(wf_graph.get_prev_transitions('task9')) > 1 and not wf_graph.has_barrier('task9') ) + + def test_get_route_bad_leaves(self): + wf_graph = self._prep_graph() + + self.assertIsNone(wf_graph.get_route('task1')) + self.assertIsNone(wf_graph.get_route('task101')) + + def test_get_route(self): + wf_graph = self._prep_graph() + + expected_route = { + 'tasks': [ + 'task1', + 'task2', + 'task3', + 'task4', + 'task5', + 'task6' + ], + 'path': [ + ('task1', 'task2', 0), + ('task1', 'task4', 0), + ('task2', 'task3', 0), + ('task3', 'task5', 0), + ('task4', 'task5', 0), + ('task5', 'task6', 0) + ] + } + + self.assertDictEqual(wf_graph.get_route('task6'), expected_route) + + def test_get_routes(self): + wf_graph = self._prep_graph() + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task3', + 'task4', + 'task5', + 'task6' + ], + 'path': [ + ('task1', 'task2', 0), + ('task1', 'task4', 0), + ('task2', 'task3', 0), + ('task3', 'task5', 0), + ('task4', 'task5', 0), + ('task5', 'task6', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task7', + 'task8', + 'task9' + ], + 'path': [ + ('task1', 'task7', 0), + ('task1', 'task9', 0), + ('task7', 'task8', 0), + ('task8', 'task9', 0) + ] + } + ] + + self.assertListEqual(wf_graph.get_routes(), expected_routes) From d2c0298f30ffd65e238eecf9056e12e3ee0bdfd6 Mon Sep 17 00:00:00 2001 From: W Chan Date: Fri, 11 Jan 2019 10:32:59 -0800 Subject: [PATCH 4/8] Update python version in project meta Update the python version to 3.6 in the project classification at setup file. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 534128ad..3903dfa7 100644 --- a/setup.py +++ b/setup.py @@ -60,7 +60,7 @@ def get_requirements(): 'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.5' + 'Programming Language :: Python :: 3.6' ], entry_points={ 'orquesta.composers': [ From c0d2abe29e4cbe5e86cb7518d919a5fb64190bf5 Mon Sep 17 00:00:00 2001 From: Anton Kayukov Date: Fri, 18 Jan 2019 22:43:19 -0800 Subject: [PATCH 5/8] Fix for variables containing 'in' --- orquesta/specs/native/v1/models.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/orquesta/specs/native/v1/models.py b/orquesta/specs/native/v1/models.py index 3af60b89..686a3961 100644 --- a/orquesta/specs/native/v1/models.py +++ b/orquesta/specs/native/v1/models.py @@ -183,8 +183,8 @@ def render(self, in_ctx): items_spec = self.get_items_spec() items_expr = ( - items_spec.items.strip() if 'in' not in items_spec.items - else items_spec.items[items_spec.items.index('in') + 2:].strip() + items_spec.items.strip() if ' in ' not in items_spec.items + else items_spec.items[items_spec.items.index(' in ') + 4:].strip() ) items = expr.evaluate(items_expr, in_ctx) @@ -193,8 +193,8 @@ def render(self, in_ctx): raise TypeError('The value of "%s" is not type of list.' % items_expr) item_keys = ( - None if 'in' not in items_spec.items - else items_spec.items[:items_spec.items.index('in')].replace(' ', '').split(',') + None if ' in ' not in items_spec.items + else items_spec.items[:items_spec.items.index(' in ')].replace(' ', '').split(',') ) for idx, item in enumerate(items): From 8aa040f40396262df81ddd376a046fe141453741 Mon Sep 17 00:00:00 2001 From: Anton Kayukov Date: Sat, 19 Jan 2019 07:26:10 -0800 Subject: [PATCH 6/8] List rendering test for var contianing 'in' --- .../test_task_rendering_for_with_items.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py b/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py index da6b7963..607dec26 100644 --- a/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py +++ b/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py @@ -299,6 +299,53 @@ def test_basic_list_rendering(self): actual_tasks = conductor.get_next_tasks() self.assert_task_list(actual_tasks, expected_tasks) + def test_basic_list_rendering_var_w_in(self): + wf_def = """ + version: 1.0 + + vars: + - domains: + - fee + - fi + - fo + - fum + + tasks: + task1: + with: <% ctx(domains) %> + action: core.echo message=<% item() %> + """ + + spec = specs.WorkflowSpec(wf_def) + self.assertDictEqual(spec.inspect(), {}) + + conductor = conducting.WorkflowConductor(spec) + conductor.request_workflow_state(states.RUNNING) + + next_task_name = 'task1' + next_task_ctx = {'domains': ['fee', 'fi', 'fo', 'fum']} + next_task_spec = conductor.spec.tasks.get_task(next_task_name) + + next_task_action_specs = [ + {'action': 'core.echo', 'input': {'message': 'fee'}, 'item_id': 0}, + {'action': 'core.echo', 'input': {'message': 'fi'}, 'item_id': 1}, + {'action': 'core.echo', 'input': {'message': 'fo'}, 'item_id': 2}, + {'action': 'core.echo', 'input': {'message': 'fum'}, 'item_id': 3}, + ] + + expected_task = self.format_task_item( + next_task_name, + next_task_ctx, + next_task_spec, + action_specs=next_task_action_specs, + items_count=len(next_task_ctx['xs']), + items_concurrency=None + ) + + expected_tasks = [expected_task] + actual_tasks = conductor.get_next_tasks() + self.assert_task_list(actual_tasks, expected_tasks) + def test_multiple_lists_rendering(self): wf_def = """ version: 1.0 From f03f3f2f3820bf111a9277f4f6c5d6c83a89d004 Mon Sep 17 00:00:00 2001 From: Anton Kayukov Date: Sat, 19 Jan 2019 07:40:05 -0800 Subject: [PATCH 7/8] fix var name in test --- .../conducting/native/test_task_rendering_for_with_items.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py b/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py index 607dec26..336b7b5e 100644 --- a/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py +++ b/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py @@ -338,7 +338,7 @@ def test_basic_list_rendering_var_w_in(self): next_task_ctx, next_task_spec, action_specs=next_task_action_specs, - items_count=len(next_task_ctx['xs']), + items_count=len(next_task_ctx['domains']), items_concurrency=None ) From d52f07604d04b590494fa5735ecda9a1dceacff3 Mon Sep 17 00:00:00 2001 From: W Chan Date: Thu, 7 Feb 2019 13:06:01 -0800 Subject: [PATCH 8/8] Add sleep in while loop of composer to yield cpu Add sleep in while loop of composer to yield cpu to other threads/processes. --- orquesta/composers/native.py | 4 ++++ requirements.txt | 1 + 2 files changed, 5 insertions(+) diff --git a/orquesta/composers/native.py b/orquesta/composers/native.py index 218e6906..ad0f1761 100644 --- a/orquesta/composers/native.py +++ b/orquesta/composers/native.py @@ -10,6 +10,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import eventlet import logging from six.moves import queue @@ -224,4 +225,7 @@ def _create_task_ex_name(task_name, split_id): ref=prev_seq[3]['ref'] ) + # Add sleep here to yield to other threads/processes. + eventlet.sleep(0.001) + return wf_ex_graph diff --git a/requirements.txt b/requirements.txt index 8d1a99e8..b761826d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ chardet +eventlet Jinja2>=2.8 # BSD License (3 clause) jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT networkx>=1.10,<2.0