Skip to content

Commit 08c41ed

Browse files
authored
Merge pull request #260 from nzlosh/fix_259
Set concurrency to 1 if 0 supplied. Fix #259
2 parents a225905 + 5d7bda1 commit 08c41ed

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

orquesta/conducting.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,9 @@ def _evaluate_task_actions(self, task):
628628
active_items = list(filter(lambda x: x[1]["status"] in statuses.ACTIVE_STATUSES, all_items))
629629

630630
if task["concurrency"] is not None:
631+
# Concurrency below 1 prevents scheduling of tasks.
632+
if task["concurrency"] <= 0:
633+
task["concurrency"] = 1
631634
availability = task["concurrency"] - len(active_items)
632635
candidates = list(zip(*notrun_items[:availability]))
633636
task["actions"] = list(candidates[0]) if candidates and availability > 0 else []

orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,77 @@ def test_basic_items_list_with_concurrency(self):
391391
# Assert the workflow succeeded.
392392
self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED)
393393

394+
def test_basic_items_list_with_zero_concurrency(self):
395+
wf_def = """
396+
version: 1.0
397+
398+
vars:
399+
- concurrency: 0
400+
- xs:
401+
- fee
402+
- fi
403+
- fo
404+
- fum
405+
406+
tasks:
407+
task1:
408+
with:
409+
items: <% ctx(xs) %>
410+
concurrency: <% ctx(concurrency) %>
411+
action: core.echo message=<% item() %>
412+
next:
413+
- publish:
414+
- items: <% result() %>
415+
416+
output:
417+
- items: <% ctx(items) %>
418+
"""
419+
420+
# Set the concurrency to 1 since concurrency 0 is expected to be
421+
# overridden in the Orquesta concurrency scheduling code.
422+
concurrency = 1
423+
424+
spec = native_specs.WorkflowSpec(wf_def)
425+
self.assertDictEqual(spec.inspect(), {})
426+
427+
conductor = conducting.WorkflowConductor(spec)
428+
conductor.request_workflow_status(statuses.RUNNING)
429+
430+
# Mock the action execution for each item and assert expected task statuses.
431+
task_route = 0
432+
task_name = "task1"
433+
task_ctx = {"xs": ["fee", "fi", "fo", "fum"], "concurrency": 0}
434+
435+
task_action_specs = [
436+
{"action": "core.echo", "input": {"message": "fee"}, "item_id": 0},
437+
{"action": "core.echo", "input": {"message": "fi"}, "item_id": 1},
438+
{"action": "core.echo", "input": {"message": "fo"}, "item_id": 2},
439+
{"action": "core.echo", "input": {"message": "fum"}, "item_id": 3},
440+
]
441+
442+
mock_ac_ex_statuses = [statuses.SUCCEEDED] * 4
443+
expected_task_statuses = [statuses.RUNNING] * 3 + [statuses.SUCCEEDED]
444+
expected_workflow_statuses = [statuses.RUNNING] * 3 + [statuses.SUCCEEDED]
445+
446+
self.assert_task_items(
447+
conductor,
448+
task_name,
449+
task_route,
450+
task_ctx,
451+
task_ctx["xs"],
452+
task_action_specs,
453+
mock_ac_ex_statuses,
454+
expected_task_statuses,
455+
expected_workflow_statuses,
456+
concurrency=concurrency,
457+
)
458+
459+
# Assert the task is removed from staging.
460+
self.assertIsNone(conductor.workflow_state.get_staged_task(task_name, task_route))
461+
462+
# Assert the workflow succeeded.
463+
self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED)
464+
394465
def test_multiple_items_list(self):
395466
wf_def = """
396467
version: 1.0

0 commit comments

Comments
 (0)