diff --git a/docs/source/languages/orquesta.rst b/docs/source/languages/orquesta.rst index d7af8e33..be79f071 100644 --- a/docs/source/languages/orquesta.rst +++ b/docs/source/languages/orquesta.rst @@ -107,15 +107,105 @@ execution, the task is evaluated for completion. If criteria for transition is m set of tasks is invoked, sequentially in the order of the transitions and tasks that are listed. If more than one tasks transition to the same task and ``join`` is specified in the latter (i.e. the -task named "task3" in the example below), then the task being transitioned into becomes a barrier -for the inbound task transitions. There will be only one instance of the barrier task. In the -workflow graph, there will be multiple inbound edges to the barrier node. +task named ``barrier_task`` in the example below), then the task being transitioned into becomes a +barrier for the inbound task transitions. There will be only one instance of the barrier task. In +the workflow graph, there will be multiple inbound edges to the barrier node. + +The following workflow definition illustrates the execution of parallel branches. The barrier task +will be blocked until all the parallel branches complete and reach it. + +.. code-block:: yaml + + version: 1.0 + + tasks: + setup_task: + # ... + # Run tasks in parallel + next: + - do: + - parallel_task_1 + - parallel_task_2 + - parallel_task_3 + + parallel_task_1: + # ... + # Wait to run barrier_task after this + next: + - do: + - barrier_task + + parallel_task_2: + # ... + # Eventually run barrier_task + next: + - do: + - intermediate_task + + intermediate_task: + # ... + # Wait to run barrier_task after this + next: + - do: + - barrier_task + + barrier_task: + # ... + # Run after parallel_task_1, parallel_task_2, and intermediate_task have all finished + join: all + + parallel_task_3: + # ... + # Run immediately after setup_task, do NOT wait for barrier_task + +The following is the corresponding workflow execution graph. + +.. code-block:: none + + =---- time (not to scale) ----> + + setup_task --+ + | + +------ parallel_task_1 --------------------------+ + | | + +-- parallel_task_2 --+ | + | | | + | +---- intermediate_task ----+ + | | + | +-- barrier_task --+ + | | + +-- parallel_task_3 -------------------------------------------------+ + | + +-- [finish] Conversely, if more than one tasks transition to the same task and ``join`` is **not** specified in -the latter (i.e. the task named "log" in the example below), then the target task will be invoked -immediately following the completion of the previous task. There will be multiple instances of the -target task. In the workflow graph, each invocation of the target task will be its own branch with -the inbound edge from the node of the previous task. +the latter, then the target task will be invoked immediately following the completion of the +previous task. There will be multiple instances of the target task. In the workflow graph, each +invocation of the target task will be its own branch with the inbound edge from the node of the +previous task. + +In other words, if ``join: all`` was removed from the previous workflow, the ``barrier_task`` would +be run two different times, resulting in this execution graph: + +.. code-block:: none + + =---- time (not to scale) ----> + + setup_task --+ + | + +------ parallel_task_1 ------+ + | | + | +-- barrier_task (1) ----------------------+ + | | + +-- parallel_task_2 --+ | + | | | + | +---- intermediate_task ----+ | + | | | + | +-- barrier_task (2) --+ + | | + +-- parallel_task_3 -----------------------------------------------------+ + | + +-- [finish] With Items Model ---------------- diff --git a/orquesta/composers/native.py b/orquesta/composers/native.py index 218e6906..ad0f1761 100644 --- a/orquesta/composers/native.py +++ b/orquesta/composers/native.py @@ -10,6 +10,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import eventlet import logging from six.moves import queue @@ -224,4 +225,7 @@ def _create_task_ex_name(task_name, split_id): ref=prev_seq[3]['ref'] ) + # Add sleep here to yield to other threads/processes. + eventlet.sleep(0.001) + return wf_ex_graph diff --git a/orquesta/graphing.py b/orquesta/graphing.py index d08b61e1..29b3e76f 100644 --- a/orquesta/graphing.py +++ b/orquesta/graphing.py @@ -16,7 +16,9 @@ import networkx as nx from networkx.readwrite import json_graph + import six +from six.moves import queue from orquesta import exceptions as exc from orquesta.utils import dictionary as dict_utils @@ -35,21 +37,40 @@ def __init__(self, graph=None): self._graph = graph if graph else nx.MultiDiGraph() def serialize(self): - return json_graph.adjacency_data(self._graph) + data = json_graph.adjacency_data(self._graph) + + data['adjacency'] = [ + sorted(outbounds, key=lambda x: x['id']) + for outbounds in data['adjacency'] + ] + + return data @classmethod def deserialize(cls, data): g = json_graph.adjacency_graph(copy.deepcopy(data), directed=True, multigraph=True) return cls(graph=g) + @staticmethod + def get_root_nodes(graph): + nodes = [ + {'id': n, 'name': graph.node[n].get('name', n)} + for n, d in graph.in_degree().items() if d == 0 + ] + + return sorted(nodes, key=lambda x: x['id']) + @property def roots(self): - tasks = [ - {'id': n, 'name': self._graph.node[n].get('name', n)} - for n, d in self._graph.in_degree().items() if d == 0 - ] + return self.get_root_nodes(self._graph) + + @property + def leaves(self): + # Reverse the graph using a copy to identify the root nodes. + return self.get_root_nodes(self._graph.reverse(copy=True)) - return sorted(tasks, key=lambda x: x['name']) + def has_tasks(self): + return len(self._graph) > 0 def has_task(self, task_id): return self._graph.has_node(task_id) @@ -167,5 +188,81 @@ def has_barrier(self, task_id): return (b is not None and b != '') + def get_cycles(self): + return [ + {'tasks': sorted(c), 'route': nx.find_cycle(self._graph, c)} + for c in nx.simple_cycles(self._graph) + ] + def in_cycle(self, task_id): return [c for c in nx.simple_cycles(self._graph) if task_id in c] + + def is_cycle_closed(self, cycle): + # A cycle is closed, for a lack of better term, if there is no task + # transition to any task that is not a member of the cycle. + for task_id in cycle['tasks']: + for transition in self.get_next_transitions(task_id): + if transition[1] not in cycle['tasks']: + return False + + return True + + def get_route(self, leaf_cluster): + tasks = [] + path = [] + q = queue.Queue() + + if isinstance(leaf_cluster, six.string_types): + leaf_cluster = [leaf_cluster] + + for leaf_task_id in leaf_cluster: + for transition in self.get_prev_transitions(leaf_task_id): + q.put(transition) + + while not q.empty(): + transition = q.get() + + source = transition[0] + destination = transition[1] + edge_key = transition[2] + path_section = (source, destination, edge_key) + + if path_section in path: + continue + + tasks.append(source) + tasks.append(destination) + path.append(path_section) + + for transition in self.get_prev_transitions(source): + q.put(transition) + + if not tasks or not path: + return None + + return { + 'tasks': sorted(list(set(tasks))), + 'path': sorted(path, key=lambda x: (x[0], x[1], x[2])) + } + + def get_routes(self): + routes = [] + + # Identify routes for each leaves in the graph first. This list is needed + # to filter out cycles that are not covered. + for node in self.leaves: + routes.append(self.get_route([node['id']])) + + # Identify routes for each leaf clusters (closed cycles) in the graph. + for cycle in self.get_cycles(): + if not routes or self.is_cycle_closed(cycle): + routes.append(self.get_route(cycle['tasks'])) + else: + for route in routes: + # If the cycle is not in any of the routes, + # then consider this cycle as a separate route. + if set(cycle['tasks']) - set(route['tasks']): + routes.append(self.get_route(cycle['tasks'])) + break + + return routes diff --git a/orquesta/specs/native/v1/models.py b/orquesta/specs/native/v1/models.py index 3af60b89..686a3961 100644 --- a/orquesta/specs/native/v1/models.py +++ b/orquesta/specs/native/v1/models.py @@ -183,8 +183,8 @@ def render(self, in_ctx): items_spec = self.get_items_spec() items_expr = ( - items_spec.items.strip() if 'in' not in items_spec.items - else items_spec.items[items_spec.items.index('in') + 2:].strip() + items_spec.items.strip() if ' in ' not in items_spec.items + else items_spec.items[items_spec.items.index(' in ') + 4:].strip() ) items = expr.evaluate(items_expr, in_ctx) @@ -193,8 +193,8 @@ def render(self, in_ctx): raise TypeError('The value of "%s" is not type of list.' % items_expr) item_keys = ( - None if 'in' not in items_spec.items - else items_spec.items[:items_spec.items.index('in')].replace(' ', '').split(',') + None if ' in ' not in items_spec.items + else items_spec.items[:items_spec.items.index(' in ')].replace(' ', '').split(',') ) for idx, item in enumerate(items): diff --git a/orquesta/tests/unit/base.py b/orquesta/tests/unit/base.py index 47f4c68a..99be9001 100644 --- a/orquesta/tests/unit/base.py +++ b/orquesta/tests/unit/base.py @@ -67,6 +67,11 @@ def assert_graph_equal(self, wf_graph, expected_wf_graph): self.assertListEqual(wf_graph_meta, expected_wf_graph_meta) + wf_graph_attrs = sorted(wf_graph_json['graph'], key=lambda x: x[0]) + expected_wf_graph_attrs = sorted(expected_wf_graph['graph'], key=lambda x: x[0]) + + self.assertListEqual(wf_graph_attrs, expected_wf_graph_attrs) + class WorkflowSpecTest(unittest.TestCase): spec_module_name = 'mock' diff --git a/orquesta/tests/unit/composition/native/base.py b/orquesta/tests/unit/composition/native/base.py index e2e47783..79807045 100644 --- a/orquesta/tests/unit/composition/native/base.py +++ b/orquesta/tests/unit/composition/native/base.py @@ -10,6 +10,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + from orquesta.tests.unit import base @@ -19,3 +21,11 @@ class OrchestraWorkflowComposerTest(base.WorkflowComposerTest): def setUpClass(cls): cls.spec_module_name = 'native' super(OrchestraWorkflowComposerTest, cls).setUpClass() + + def assert_wf_ex_routes(self, wf_name, expected_routes, debug=False): + wf_ex_graph = self.compose_wf_ex_graph(wf_name) + + if debug: + print(json.dumps(wf_ex_graph.get_routes(), indent=4)) + + self.assertListEqual(wf_ex_graph.get_routes(), expected_routes) diff --git a/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py b/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py index da6b7963..336b7b5e 100644 --- a/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py +++ b/orquesta/tests/unit/conducting/native/test_task_rendering_for_with_items.py @@ -299,6 +299,53 @@ def test_basic_list_rendering(self): actual_tasks = conductor.get_next_tasks() self.assert_task_list(actual_tasks, expected_tasks) + def test_basic_list_rendering_var_w_in(self): + wf_def = """ + version: 1.0 + + vars: + - domains: + - fee + - fi + - fo + - fum + + tasks: + task1: + with: <% ctx(domains) %> + action: core.echo message=<% item() %> + """ + + spec = specs.WorkflowSpec(wf_def) + self.assertDictEqual(spec.inspect(), {}) + + conductor = conducting.WorkflowConductor(spec) + conductor.request_workflow_state(states.RUNNING) + + next_task_name = 'task1' + next_task_ctx = {'domains': ['fee', 'fi', 'fo', 'fum']} + next_task_spec = conductor.spec.tasks.get_task(next_task_name) + + next_task_action_specs = [ + {'action': 'core.echo', 'input': {'message': 'fee'}, 'item_id': 0}, + {'action': 'core.echo', 'input': {'message': 'fi'}, 'item_id': 1}, + {'action': 'core.echo', 'input': {'message': 'fo'}, 'item_id': 2}, + {'action': 'core.echo', 'input': {'message': 'fum'}, 'item_id': 3}, + ] + + expected_task = self.format_task_item( + next_task_name, + next_task_ctx, + next_task_spec, + action_specs=next_task_action_specs, + items_count=len(next_task_ctx['domains']), + items_concurrency=None + ) + + expected_tasks = [expected_task] + actual_tasks = conductor.get_next_tasks() + self.assert_task_list(actual_tasks, expected_tasks) + def test_multiple_lists_rendering(self): wf_def = """ version: 1.0 diff --git a/orquesta/tests/unit/graphing/native/__init__.py b/orquesta/tests/unit/graphing/native/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orquesta/tests/unit/graphing/native/test_routes_basic.py b/orquesta/tests/unit/graphing/native/test_routes_basic.py new file mode 100644 index 00000000..f30489f4 --- /dev/null +++ b/orquesta/tests/unit/graphing/native/test_routes_basic.py @@ -0,0 +1,132 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orquesta.tests.unit.composition.native import base + + +class BasicWorkflowRoutesTest(base.OrchestraWorkflowComposerTest): + + def test_sequential(self): + wf_name = 'sequential' + + expected_routes = [ + { + 'tasks': [ + 'noop', + 'task1', + 'task2', + 'task3' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task3', 0), + ('task3', 'noop', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_parallel(self): + wf_name = 'parallel' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task3' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task3', 0) + ] + }, + { + 'tasks': [ + 'task4', + 'task5', + 'task6' + ], + 'path': [ + ('task4', 'task5', 0), + ('task5', 'task6', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_branching(self): + wf_name = 'branching' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task3' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task3', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task4', + 'task5' + ], + 'path': [ + ('task1', 'task4', 0), + ('task4', 'task5', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_decision(self): + wf_name = 'decision' + + expected_routes = [ + { + 'tasks': [ + 'a', + 't1' + ], + 'path': [ + ('t1', 'a', 0), + ] + }, + { + 'tasks': [ + 'b', + 't1' + ], + 'path': [ + ('t1', 'b', 0), + ] + }, + { + 'tasks': [ + 'c', + 't1' + ], + 'path': [ + ('t1', 'c', 0), + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) diff --git a/orquesta/tests/unit/graphing/native/test_routes_cycle.py b/orquesta/tests/unit/graphing/native/test_routes_cycle.py new file mode 100644 index 00000000..e98f36db --- /dev/null +++ b/orquesta/tests/unit/graphing/native/test_routes_cycle.py @@ -0,0 +1,65 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orquesta.tests.unit.composition.native import base + + +class CyclicWorkflowRoutesTest(base.OrchestraWorkflowComposerTest): + + def test_cycle(self): + wf_name = 'cycle' + + expected_routes = [ + { + 'tasks': [ + 'prep', + 'task1', + 'task2', + 'task3' + ], + 'path': [ + ('prep', 'task1', 0), + ('task1', 'task2', 0), + ('task2', 'task3', 0), + ('task3', 'task1', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_cycles(self): + wf_name = 'cycles' + + expected_routes = [ + { + 'tasks': [ + 'prep', + 'task1', + 'task2', + 'task3', + 'task4', + 'task5' + ], + 'path': [ + ('prep', 'task1', 0), + ('task1', 'task2', 0), + ('task2', 'task3', 0), + ('task2', 'task5', 0), + ('task3', 'task4', 0), + ('task4', 'task2', 0), + ('task5', 'task1', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) diff --git a/orquesta/tests/unit/graphing/native/test_routes_join.py b/orquesta/tests/unit/graphing/native/test_routes_join.py new file mode 100644 index 00000000..d1bdaca4 --- /dev/null +++ b/orquesta/tests/unit/graphing/native/test_routes_join.py @@ -0,0 +1,75 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orquesta.tests.unit.composition.native import base + + +class JoinWorkflowRoutesTest(base.OrchestraWorkflowComposerTest): + + def test_join(self): + wf_name = 'join' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task3', + 'task4', + 'task5', + 'task6', + 'task7' + ], + 'path': [ + ('task1', 'task2', 0), + ('task1', 'task4', 0), + ('task2', 'task3', 0), + ('task3', 'task6', 0), + ('task4', 'task5', 0), + ('task5', 'task6', 0), + ('task6', 'task7', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_join_count(self): + wf_name = 'join-count' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task3', + 'task4', + 'task5', + 'task6', + 'task7', + 'task8' + ], + 'path': [ + ('task1', 'task2', 0), + ('task1', 'task4', 0), + ('task1', 'task6', 0), + ('task2', 'task3', 0), + ('task3', 'task8', 0), + ('task4', 'task5', 0), + ('task5', 'task8', 0), + ('task6', 'task7', 0), + ('task7', 'task8', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) diff --git a/orquesta/tests/unit/graphing/native/test_routes_split.py b/orquesta/tests/unit/graphing/native/test_routes_split.py new file mode 100644 index 00000000..5bb90766 --- /dev/null +++ b/orquesta/tests/unit/graphing/native/test_routes_split.py @@ -0,0 +1,263 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orquesta.tests.unit.composition.native import base + + +class SplitWorkflowRoutesTest(base.OrchestraWorkflowComposerTest): + + def test_split(self): + wf_name = 'split' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task4__1', + 'task5__1', + 'task6__1', + 'task7__1', + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task4__1', 0), + ('task4__1', 'task5__1', 0), + ('task4__1', 'task6__1', 0), + ('task5__1', 'task7__1', 0), + ('task6__1', 'task7__1', 0), + ] + }, + { + 'tasks': [ + 'task1', + 'task3', + 'task4__2', + 'task5__2', + 'task6__2', + 'task7__2' + ], + 'path': [ + ('task1', 'task3', 0), + ('task3', 'task4__2', 0), + ('task4__2', 'task5__2', 0), + ('task4__2', 'task6__2', 0), + ('task5__2', 'task7__2', 0), + ('task6__2', 'task7__2', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_splits(self): + wf_name = 'splits' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task8__1' + ], + 'path': [ + ('task1', 'task8__1', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task2', + 'task4__1', + 'task5__1', + 'task6__1', + 'task7__1', + 'task8__2' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task4__1', 0), + ('task4__1', 'task5__1', 0), + ('task4__1', 'task6__1', 0), + ('task5__1', 'task7__1', 0), + ('task6__1', 'task7__1', 0), + ('task7__1', 'task8__2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task3', + 'task4__2', + 'task5__2', + 'task6__2', + 'task7__2', + 'task8__3' + ], + 'path': [ + ('task1', 'task3', 0), + ('task3', 'task4__2', 0), + ('task4__2', 'task5__2', 0), + ('task4__2', 'task6__2', 0), + ('task5__2', 'task7__2', 0), + ('task6__2', 'task7__2', 0), + ('task7__2', 'task8__3', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_nested_splits(self): + wf_name = 'splits-nested' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task10__1', + 'task2', + 'task4__1', + 'task5__1', + 'task7__1', + 'task8__1', + 'task9__1' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task4__1', 0), + ('task4__1', 'task5__1', 0), + ('task5__1', 'task7__1', 0), + ('task7__1', 'task8__1', 0), + ('task7__1', 'task9__1', 0), + ('task8__1', 'task10__1', 0), + ('task9__1', 'task10__1', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task10__2', + 'task2', + 'task4__1', + 'task6__1', + 'task7__2', + 'task8__2', + 'task9__2' + ], + 'path': [ + ('task1', 'task2', 0), + ('task2', 'task4__1', 0), + ('task4__1', 'task6__1', 0), + ('task6__1', 'task7__2', 0), + ('task7__2', 'task8__2', 0), + ('task7__2', 'task9__2', 0), + ('task8__2', 'task10__2', 0), + ('task9__2', 'task10__2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task10__3', + 'task3', + 'task4__2', + 'task5__2', + 'task7__3', + 'task8__3', + 'task9__3' + ], + 'path': [ + ('task1', 'task3', 0), + ('task3', 'task4__2', 0), + ('task4__2', 'task5__2', 0), + ('task5__2', 'task7__3', 0), + ('task7__3', 'task8__3', 0), + ('task7__3', 'task9__3', 0), + ('task8__3', 'task10__3', 0), + ('task9__3', 'task10__3', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task10__4', + 'task3', + 'task4__2', + 'task6__2', + 'task7__4', + 'task8__4', + 'task9__4' + ], + 'path': [ + ('task1', 'task3', 0), + ('task3', 'task4__2', 0), + ('task4__2', 'task6__2', 0), + ('task6__2', 'task7__4', 0), + ('task7__4', 'task8__4', 0), + ('task7__4', 'task9__4', 0), + ('task8__4', 'task10__4', 0), + ('task9__4', 'task10__4', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_splits_extra_join(self): + wf_name = 'splits-join' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task4__1', + 'task5__1', + 'task6__1', + 'task7__1', + 'task8__1' + ], + 'path': [ + ('task1', 'task2', 0), + ('task1', 'task8__1', 0), + ('task2', 'task4__1', 0), + ('task4__1', 'task5__1', 0), + ('task4__1', 'task6__1', 0), + ('task5__1', 'task7__1', 0), + ('task6__1', 'task7__1', 0), + ('task7__1', 'task8__1', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task3', + 'task4__2', + 'task5__2', + 'task6__2', + 'task7__2', + 'task8__2' + ], + 'path': [ + ('task1', 'task3', 0), + ('task1', 'task8__2', 0), + ('task3', 'task4__2', 0), + ('task4__2', 'task5__2', 0), + ('task4__2', 'task6__2', 0), + ('task5__2', 'task7__2', 0), + ('task6__2', 'task7__2', 0), + ('task7__2', 'task8__2', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) diff --git a/orquesta/tests/unit/graphing/native/test_routes_task_transition.py b/orquesta/tests/unit/graphing/native/test_routes_task_transition.py new file mode 100644 index 00000000..adf188d7 --- /dev/null +++ b/orquesta/tests/unit/graphing/native/test_routes_task_transition.py @@ -0,0 +1,164 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orquesta.tests.unit.composition.native import base + + +class TaskTransitionWorkflowRoutesTest(base.OrchestraWorkflowComposerTest): + + def test_error_handling(self): + wf_name = 'error-handling' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2' + ], + 'path': [ + ('task1', 'task2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task3' + ], + 'path': [ + ('task1', 'task3', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_task_with_duplicate_when(self): + wf_name = 'task-duplicate-when' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2' + ], + 'path': [ + ('task1', 'task2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task3' + ], + 'path': [ + ('task1', 'task3', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_task_with_duplicate_transition(self): + wf_name = 'task-duplicate-transition' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2__1' + ], + 'path': [ + ('task1', 'task2__1', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task2__2' + ], + 'path': [ + ('task1', 'task2__2', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_on_complete(self): + wf_name = 'task-on-complete' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2' + ], + 'path': [ + ('task1', 'task2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task3' + ], + 'path': [ + ('task1', 'task3', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task4' + ], + 'path': [ + ('task1', 'task4', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) + + def test_task_transitions_split(self): + wf_name = 'task-transitions-split' + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2__1' + ], + 'path': [ + ('task1', 'task2__1', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task2__2' + ], + 'path': [ + ('task1', 'task2__2', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task2__3' + ], + 'path': [ + ('task1', 'task2__3', 0) + ] + } + ] + + self.assert_wf_ex_routes(wf_name, expected_routes) diff --git a/orquesta/tests/unit/graphing/test_workflow_graph.py b/orquesta/tests/unit/graphing/test_workflow_graph.py index 1ad77fe5..ab8d90ef 100644 --- a/orquesta/tests/unit/graphing/test_workflow_graph.py +++ b/orquesta/tests/unit/graphing/test_workflow_graph.py @@ -155,6 +155,18 @@ def test_graph_roots(self): self.assertListEqual(wf_graph.roots, expected) + def test_graph_leaves(self): + wf_graph = self._prep_graph() + + expected_leaves = [{'id': 'task6', 'name': 'task6'}, {'id': 'task9', 'name': 'task9'}] + + self.assertListEqual(wf_graph.leaves, expected_leaves) + + # Ensure the underlying graph is not permanently altered. + expected_roots = [{'id': 'task1', 'name': 'task1'}] + + self.assertListEqual(wf_graph.roots, expected_roots) + def test_skip_add_tasks(self): wf_graph = graphing.WorkflowGraph() @@ -347,3 +359,73 @@ def test_split_from_reused_task(self): len(wf_graph.get_prev_transitions('task9')) > 1 and not wf_graph.has_barrier('task9') ) + + def test_get_route_bad_leaves(self): + wf_graph = self._prep_graph() + + self.assertIsNone(wf_graph.get_route('task1')) + self.assertIsNone(wf_graph.get_route('task101')) + + def test_get_route(self): + wf_graph = self._prep_graph() + + expected_route = { + 'tasks': [ + 'task1', + 'task2', + 'task3', + 'task4', + 'task5', + 'task6' + ], + 'path': [ + ('task1', 'task2', 0), + ('task1', 'task4', 0), + ('task2', 'task3', 0), + ('task3', 'task5', 0), + ('task4', 'task5', 0), + ('task5', 'task6', 0) + ] + } + + self.assertDictEqual(wf_graph.get_route('task6'), expected_route) + + def test_get_routes(self): + wf_graph = self._prep_graph() + + expected_routes = [ + { + 'tasks': [ + 'task1', + 'task2', + 'task3', + 'task4', + 'task5', + 'task6' + ], + 'path': [ + ('task1', 'task2', 0), + ('task1', 'task4', 0), + ('task2', 'task3', 0), + ('task3', 'task5', 0), + ('task4', 'task5', 0), + ('task5', 'task6', 0) + ] + }, + { + 'tasks': [ + 'task1', + 'task7', + 'task8', + 'task9' + ], + 'path': [ + ('task1', 'task7', 0), + ('task1', 'task9', 0), + ('task7', 'task8', 0), + ('task8', 'task9', 0) + ] + } + ] + + self.assertListEqual(wf_graph.get_routes(), expected_routes) diff --git a/requirements.txt b/requirements.txt index 8d1a99e8..b761826d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ chardet +eventlet Jinja2>=2.8 # BSD License (3 clause) jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT networkx>=1.10,<2.0 diff --git a/setup.py b/setup.py index 534128ad..3903dfa7 100644 --- a/setup.py +++ b/setup.py @@ -60,7 +60,7 @@ def get_requirements(): 'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.5' + 'Programming Language :: Python :: 3.6' ], entry_points={ 'orquesta.composers': [