Skip to content

Commit

Permalink
Merge branch 'master' into issue_124
Browse files Browse the repository at this point in the history
  • Loading branch information
jinpingh authored Feb 13, 2019
2 parents fdc0124 + 774123b commit 8eb160c
Show file tree
Hide file tree
Showing 16 changed files with 1,053 additions and 18 deletions.
104 changes: 97 additions & 7 deletions docs/source/languages/orquesta.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,105 @@ execution, the task is evaluated for completion. If criteria for transition is m
set of tasks is invoked, sequentially in the order of the transitions and tasks that are listed.

If more than one tasks transition to the same task and ``join`` is specified in the latter (i.e. the
task named "task3" in the example below), then the task being transitioned into becomes a barrier
for the inbound task transitions. There will be only one instance of the barrier task. In the
workflow graph, there will be multiple inbound edges to the barrier node.
task named ``barrier_task`` in the example below), then the task being transitioned into becomes a
barrier for the inbound task transitions. There will be only one instance of the barrier task. In
the workflow graph, there will be multiple inbound edges to the barrier node.

The following workflow definition illustrates the execution of parallel branches. The barrier task
will be blocked until all the parallel branches complete and reach it.

.. code-block:: yaml
version: 1.0
tasks:
setup_task:
# ...
# Run tasks in parallel
next:
- do:
- parallel_task_1
- parallel_task_2
- parallel_task_3
parallel_task_1:
# ...
# Wait to run barrier_task after this
next:
- do:
- barrier_task
parallel_task_2:
# ...
# Eventually run barrier_task
next:
- do:
- intermediate_task
intermediate_task:
# ...
# Wait to run barrier_task after this
next:
- do:
- barrier_task
barrier_task:
# ...
# Run after parallel_task_1, parallel_task_2, and intermediate_task have all finished
join: all
parallel_task_3:
# ...
# Run immediately after setup_task, do NOT wait for barrier_task
The following is the corresponding workflow execution graph.

.. code-block:: none
=---- time (not to scale) ---->
setup_task --+
|
+------ parallel_task_1 --------------------------+
| |
+-- parallel_task_2 --+ |
| | |
| +---- intermediate_task ----+
| |
| +-- barrier_task --+
| |
+-- parallel_task_3 -------------------------------------------------+
|
+-- [finish]
Conversely, if more than one tasks transition to the same task and ``join`` is **not** specified in
the latter (i.e. the task named "log" in the example below), then the target task will be invoked
immediately following the completion of the previous task. There will be multiple instances of the
target task. In the workflow graph, each invocation of the target task will be its own branch with
the inbound edge from the node of the previous task.
the latter, then the target task will be invoked immediately following the completion of the
previous task. There will be multiple instances of the target task. In the workflow graph, each
invocation of the target task will be its own branch with the inbound edge from the node of the
previous task.

In other words, if ``join: all`` was removed from the previous workflow, the ``barrier_task`` would
be run two different times, resulting in this execution graph:

.. code-block:: none
=---- time (not to scale) ---->
setup_task --+
|
+------ parallel_task_1 ------+
| |
| +-- barrier_task (1) ----------------------+
| |
+-- parallel_task_2 --+ |
| | |
| +---- intermediate_task ----+ |
| | |
| +-- barrier_task (2) --+
| |
+-- parallel_task_3 -----------------------------------------------------+
|
+-- [finish]
With Items Model
----------------
Expand Down
4 changes: 4 additions & 0 deletions orquesta/composers/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import eventlet
import logging
from six.moves import queue

Expand Down Expand Up @@ -224,4 +225,7 @@ def _create_task_ex_name(task_name, split_id):
ref=prev_seq[3]['ref']
)

# Add sleep here to yield to other threads/processes.
eventlet.sleep(0.001)

return wf_ex_graph
109 changes: 103 additions & 6 deletions orquesta/graphing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import networkx as nx
from networkx.readwrite import json_graph

import six
from six.moves import queue

from orquesta import exceptions as exc
from orquesta.utils import dictionary as dict_utils
Expand All @@ -35,21 +37,40 @@ def __init__(self, graph=None):
self._graph = graph if graph else nx.MultiDiGraph()

def serialize(self):
return json_graph.adjacency_data(self._graph)
data = json_graph.adjacency_data(self._graph)

data['adjacency'] = [
sorted(outbounds, key=lambda x: x['id'])
for outbounds in data['adjacency']
]

return data

@classmethod
def deserialize(cls, data):
g = json_graph.adjacency_graph(copy.deepcopy(data), directed=True, multigraph=True)
return cls(graph=g)

@staticmethod
def get_root_nodes(graph):
nodes = [
{'id': n, 'name': graph.node[n].get('name', n)}
for n, d in graph.in_degree().items() if d == 0
]

return sorted(nodes, key=lambda x: x['id'])

@property
def roots(self):
tasks = [
{'id': n, 'name': self._graph.node[n].get('name', n)}
for n, d in self._graph.in_degree().items() if d == 0
]
return self.get_root_nodes(self._graph)

@property
def leaves(self):
# Reverse the graph using a copy to identify the root nodes.
return self.get_root_nodes(self._graph.reverse(copy=True))

return sorted(tasks, key=lambda x: x['name'])
def has_tasks(self):
return len(self._graph) > 0

def has_task(self, task_id):
return self._graph.has_node(task_id)
Expand Down Expand Up @@ -167,5 +188,81 @@ def has_barrier(self, task_id):

return (b is not None and b != '')

def get_cycles(self):
return [
{'tasks': sorted(c), 'route': nx.find_cycle(self._graph, c)}
for c in nx.simple_cycles(self._graph)
]

def in_cycle(self, task_id):
return [c for c in nx.simple_cycles(self._graph) if task_id in c]

def is_cycle_closed(self, cycle):
# A cycle is closed, for a lack of better term, if there is no task
# transition to any task that is not a member of the cycle.
for task_id in cycle['tasks']:
for transition in self.get_next_transitions(task_id):
if transition[1] not in cycle['tasks']:
return False

return True

def get_route(self, leaf_cluster):
tasks = []
path = []
q = queue.Queue()

if isinstance(leaf_cluster, six.string_types):
leaf_cluster = [leaf_cluster]

for leaf_task_id in leaf_cluster:
for transition in self.get_prev_transitions(leaf_task_id):
q.put(transition)

while not q.empty():
transition = q.get()

source = transition[0]
destination = transition[1]
edge_key = transition[2]
path_section = (source, destination, edge_key)

if path_section in path:
continue

tasks.append(source)
tasks.append(destination)
path.append(path_section)

for transition in self.get_prev_transitions(source):
q.put(transition)

if not tasks or not path:
return None

return {
'tasks': sorted(list(set(tasks))),
'path': sorted(path, key=lambda x: (x[0], x[1], x[2]))
}

def get_routes(self):
routes = []

# Identify routes for each leaves in the graph first. This list is needed
# to filter out cycles that are not covered.
for node in self.leaves:
routes.append(self.get_route([node['id']]))

# Identify routes for each leaf clusters (closed cycles) in the graph.
for cycle in self.get_cycles():
if not routes or self.is_cycle_closed(cycle):
routes.append(self.get_route(cycle['tasks']))
else:
for route in routes:
# If the cycle is not in any of the routes,
# then consider this cycle as a separate route.
if set(cycle['tasks']) - set(route['tasks']):
routes.append(self.get_route(cycle['tasks']))
break

return routes
8 changes: 4 additions & 4 deletions orquesta/specs/native/v1/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ def render(self, in_ctx):
items_spec = self.get_items_spec()

items_expr = (
items_spec.items.strip() if 'in' not in items_spec.items
else items_spec.items[items_spec.items.index('in') + 2:].strip()
items_spec.items.strip() if ' in ' not in items_spec.items
else items_spec.items[items_spec.items.index(' in ') + 4:].strip()
)

items = expr.evaluate(items_expr, in_ctx)
Expand All @@ -193,8 +193,8 @@ def render(self, in_ctx):
raise TypeError('The value of "%s" is not type of list.' % items_expr)

item_keys = (
None if 'in' not in items_spec.items
else items_spec.items[:items_spec.items.index('in')].replace(' ', '').split(',')
None if ' in ' not in items_spec.items
else items_spec.items[:items_spec.items.index(' in ')].replace(' ', '').split(',')
)

for idx, item in enumerate(items):
Expand Down
5 changes: 5 additions & 0 deletions orquesta/tests/unit/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ def assert_graph_equal(self, wf_graph, expected_wf_graph):

self.assertListEqual(wf_graph_meta, expected_wf_graph_meta)

wf_graph_attrs = sorted(wf_graph_json['graph'], key=lambda x: x[0])
expected_wf_graph_attrs = sorted(expected_wf_graph['graph'], key=lambda x: x[0])

self.assertListEqual(wf_graph_attrs, expected_wf_graph_attrs)


class WorkflowSpecTest(unittest.TestCase):
spec_module_name = 'mock'
Expand Down
10 changes: 10 additions & 0 deletions orquesta/tests/unit/composition/native/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json

from orquesta.tests.unit import base


Expand All @@ -19,3 +21,11 @@ class OrchestraWorkflowComposerTest(base.WorkflowComposerTest):
def setUpClass(cls):
cls.spec_module_name = 'native'
super(OrchestraWorkflowComposerTest, cls).setUpClass()

def assert_wf_ex_routes(self, wf_name, expected_routes, debug=False):
wf_ex_graph = self.compose_wf_ex_graph(wf_name)

if debug:
print(json.dumps(wf_ex_graph.get_routes(), indent=4))

self.assertListEqual(wf_ex_graph.get_routes(), expected_routes)
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,53 @@ def test_basic_list_rendering(self):
actual_tasks = conductor.get_next_tasks()
self.assert_task_list(actual_tasks, expected_tasks)

def test_basic_list_rendering_var_w_in(self):
wf_def = """
version: 1.0
vars:
- domains:
- fee
- fi
- fo
- fum
tasks:
task1:
with: <% ctx(domains) %>
action: core.echo message=<% item() %>
"""

spec = specs.WorkflowSpec(wf_def)
self.assertDictEqual(spec.inspect(), {})

conductor = conducting.WorkflowConductor(spec)
conductor.request_workflow_state(states.RUNNING)

next_task_name = 'task1'
next_task_ctx = {'domains': ['fee', 'fi', 'fo', 'fum']}
next_task_spec = conductor.spec.tasks.get_task(next_task_name)

next_task_action_specs = [
{'action': 'core.echo', 'input': {'message': 'fee'}, 'item_id': 0},
{'action': 'core.echo', 'input': {'message': 'fi'}, 'item_id': 1},
{'action': 'core.echo', 'input': {'message': 'fo'}, 'item_id': 2},
{'action': 'core.echo', 'input': {'message': 'fum'}, 'item_id': 3},
]

expected_task = self.format_task_item(
next_task_name,
next_task_ctx,
next_task_spec,
action_specs=next_task_action_specs,
items_count=len(next_task_ctx['domains']),
items_concurrency=None
)

expected_tasks = [expected_task]
actual_tasks = conductor.get_next_tasks()
self.assert_task_list(actual_tasks, expected_tasks)

def test_multiple_lists_rendering(self):
wf_def = """
version: 1.0
Expand Down
Empty file.
Loading

0 comments on commit 8eb160c

Please sign in to comment.