20
20
from orquesta .specs import native as native_specs
21
21
from orquesta import statuses
22
22
from orquesta .tests .unit import base as test_base
23
+ import yaql .language .utils as yaql_utils
23
24
24
25
25
26
class WorkflowConductorDataFlowTest (test_base .WorkflowConductorTest ):
26
27
27
- def _prep_conductor (self , context = None , inputs = None , status = None ):
28
- wf_def = """
29
- version: 1.0
30
-
31
- description: A basic sequential workflow.
32
-
33
- input:
34
- - a1
35
- - b1: <% ctx().a1 %>
36
-
37
- vars:
38
- - a2: <% ctx().b1 %>
39
- - b2: <% ctx().a2 %>
40
-
41
- output:
42
- - a5: <% ctx().b4 %>
43
- - b5: <% ctx().a5 %>
44
-
45
- tasks:
46
- task1:
47
- action: core.noop
48
- next:
49
- - when: <% succeeded() %>
50
- publish:
51
- - a3: <% ctx().b2 %>
52
- - b3: <% ctx().a3 %>
53
- do: task2
54
- task2:
55
- action: core.noop
56
- next:
57
- - when: <% succeeded() %>
58
- publish: a4=<% ctx().b3 %> b4=<% ctx().a4 %>
59
- do: task3
60
- task3:
61
- action: core.noop
62
- """
63
-
28
+ wf_def_yaql = """
29
+ version: 1.0
30
+
31
+ description: A basic sequential workflow.
32
+
33
+ input:
34
+ - a1
35
+ - b1: <% ctx().a1 %>
36
+
37
+ vars:
38
+ - a2: <% ctx().b1 %>
39
+ - b2: <% ctx().a2 %>
40
+
41
+ output:
42
+ - a5: <% ctx().b4 %>
43
+ - b5: <% ctx().a5 %>
44
+
45
+ tasks:
46
+ task1:
47
+ action: core.noop
48
+ next:
49
+ - when: <% succeeded() %>
50
+ publish:
51
+ - a3: <% ctx().b2 %>
52
+ - b3: <% ctx().a3 %>
53
+ do: task2
54
+ task2:
55
+ action: core.noop
56
+ next:
57
+ - when: <% succeeded() %>
58
+ publish: a4=<% ctx().b3 %> b4=<% ctx().a4 %>
59
+ do: task3
60
+ task3:
61
+ action: core.noop
62
+ """
63
+
64
+ wf_def_jinja = """
65
+ version: 1.0
66
+
67
+ description: A basic sequential workflow.
68
+
69
+ input:
70
+ - a1
71
+ - b1: '{{ ctx("a1") }}'
72
+
73
+ vars:
74
+ - a2: '{{ ctx("b1") }}'
75
+ - b2: '{{ ctx("a2") }}'
76
+
77
+ output:
78
+ - a5: '{{ ctx("b4") }}'
79
+ - b5: '{{ ctx("a5") }}'
80
+
81
+ tasks:
82
+ task1:
83
+ action: core.noop
84
+ next:
85
+ - when: '{{ succeeded() }}'
86
+ publish:
87
+ - a3: '{{ ctx("b2") }}'
88
+ - b3: '{{ ctx("a3") }}'
89
+ do: task2
90
+ task2:
91
+ action: core.noop
92
+ next:
93
+ - when: '{{ succeeded() }}'
94
+ publish: a4='{{ ctx("b3") }}' b4='{{ ctx("a4") }}'
95
+ do: task3
96
+ task3:
97
+ action: core.noop
98
+ """
99
+
100
+ def _prep_conductor (self , wf_def , context = None , inputs = None , status = None ):
64
101
spec = native_specs .WorkflowSpec (wf_def )
65
102
self .assertDictEqual (spec .inspect (), {})
66
103
@@ -76,33 +113,52 @@ def _prep_conductor(self, context=None, inputs=None, status=None):
76
113
77
114
return conductor
78
115
116
+ def _get_combined_value (self , callstack_depth = 0 ):
117
+ # This returns dict typed value all Python built-in type values
118
+ # which orquesta spec could accept.
119
+ if callstack_depth < 2 :
120
+ return {
121
+ 'null' : None ,
122
+ 'integer_positive' : 123 ,
123
+ 'integer_negative' : - 123 ,
124
+ 'number_positive' : 99.99 ,
125
+ 'number_negative' : - 99.99 ,
126
+ 'string' : 'xyz' ,
127
+ 'boolean_true' : True ,
128
+ 'boolean_false' : False ,
129
+ 'array' : list (self ._get_combined_value (callstack_depth + 1 ).values ()),
130
+ 'object' : self ._get_combined_value (callstack_depth + 1 ),
131
+ }
132
+ else :
133
+ return {}
134
+
135
+ def _assert_data_flow (self , inputs , expected_output ):
136
+ # This assert method checks input value would be handled and published
137
+ # as an expected type and value with both YAQL and Jinja expressions.
138
+ for wf_def in [self .wf_def_jinja , self .wf_def_yaql ]:
139
+ conductor = self ._prep_conductor (wf_def , inputs = inputs , status = statuses .RUNNING )
140
+
141
+ for i in range (1 , len (conductor .spec .tasks ) + 1 ):
142
+ task_name = 'task' + str (i )
143
+ forward_statuses = [statuses .RUNNING , statuses .SUCCEEDED ]
144
+ self .forward_task_statuses (conductor , task_name , forward_statuses )
145
+
146
+ # Render workflow output and checkout workflow status and output.
147
+ conductor .render_workflow_output ()
148
+ self .assertEqual (conductor .get_workflow_status (), statuses .SUCCEEDED )
149
+ self .assertDictEqual (conductor .get_workflow_output (), expected_output )
150
+
79
151
def assert_data_flow (self , input_value ):
80
152
inputs = {'a1' : input_value }
81
153
expected_output = {'a5' : inputs ['a1' ], 'b5' : inputs ['a1' ]}
82
- conductor = self ._prep_conductor (inputs = inputs , status = statuses .RUNNING )
83
154
84
- for i in range (1 , len (conductor .spec .tasks ) + 1 ):
85
- task_name = 'task' + str (i )
86
- self .forward_task_statuses (conductor , task_name , [statuses .RUNNING , statuses .SUCCEEDED ])
87
-
88
- # Render workflow output and checkout workflow status and output.
89
- conductor .render_workflow_output ()
90
- self .assertEqual (conductor .get_workflow_status (), statuses .SUCCEEDED )
91
- self .assertDictEqual (conductor .get_workflow_output (), expected_output )
155
+ self ._assert_data_flow (inputs , expected_output )
92
156
93
157
def assert_unicode_data_flow (self , input_value ):
94
158
inputs = {u'a1' : unicode (input_value , 'utf8' ) if six .PY2 else input_value }
95
159
expected_output = {u'a5' : inputs ['a1' ], u'b5' : inputs ['a1' ]}
96
- conductor = self ._prep_conductor (inputs = inputs , status = statuses .RUNNING )
97
-
98
- for i in range (1 , len (conductor .spec .tasks ) + 1 ):
99
- task_name = 'task' + str (i )
100
- self .forward_task_statuses (conductor , task_name , [statuses .RUNNING , statuses .SUCCEEDED ])
101
160
102
- # Render workflow output and checkout workflow status and output.
103
- conductor .render_workflow_output ()
104
- self .assertEqual (conductor .get_workflow_status (), statuses .SUCCEEDED )
105
- self .assertDictEqual (conductor .get_workflow_output (), expected_output )
161
+ self ._assert_data_flow (inputs , expected_output )
106
162
107
163
def test_data_flow_string (self ):
108
164
self .assert_data_flow ('xyz' )
@@ -119,11 +175,20 @@ def test_data_flow_boolean(self):
119
175
self .assert_data_flow (True )
120
176
self .assert_data_flow (False )
121
177
178
+ def test_data_flow_none (self ):
179
+ self .assert_data_flow (None )
180
+
122
181
def test_data_flow_dict (self ):
123
- self .assert_data_flow ({'x' : 123 , 'y' : 'abc' })
182
+ mapping_typed_data = self ._get_combined_value ()
183
+
184
+ self .assertIsInstance (mapping_typed_data , yaql_utils .MappingType )
185
+ self .assert_data_flow (mapping_typed_data )
124
186
125
187
def test_data_flow_list (self ):
126
- self .assert_data_flow ([123 , 'abc' , True ])
188
+ sequence_typed_data = list (self ._get_combined_value ().values ())
189
+
190
+ self .assertIsInstance (sequence_typed_data , yaql_utils .SequenceType )
191
+ self .assert_data_flow (sequence_typed_data )
127
192
128
193
def test_data_flow_unicode (self ):
129
194
self .assert_unicode_data_flow ('光合作用' )
0 commit comments