Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cylc set: set success pathway outputs if none specified and no required outputs #6570

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6570.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Using `cylc set` without specifying `--out` on a task where success is optional now sets success pathway outputs instead of doing nothing.
3 changes: 2 additions & 1 deletion cylc/flow/id_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def filter_ids(
* If IDTokens.Cycle all CyclePoints with any matching tasks will
be returned.
warn:
Whether to log a warning if no matching tasks are found.
Whether to log a warning if no matching tasks are found in the
pool.

TODO:
Consider using wcmatch which would add support for
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2184,7 +2184,8 @@ class Meta:
description = sstrip("""
Set task prerequisites or outputs.

By default, set all required outputs for target task(s).
By default, set all required outputs for target task(s) (including
`submitted`, `started` and `succeeded` even if they are optional).

Setting prerequisites contributes to the task's readiness to run.

Expand Down
59 changes: 34 additions & 25 deletions cylc/flow/run_modes/skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,31 @@
"""
from logging import INFO
from typing import (
TYPE_CHECKING, Dict, List, Tuple)
TYPE_CHECKING,
Dict,
List,
Optional,
Set,
Tuple,
)

from cylc.flow import LOG
from cylc.flow.exceptions import WorkflowConfigError
from cylc.flow.run_modes import RunMode
from cylc.flow.task_outputs import (
TASK_OUTPUT_FAILED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_STARTED
)
from cylc.flow.run_modes import RunMode


if TYPE_CHECKING:
from cylc.flow.taskdef import TaskDef
from typing_extensions import Literal

from cylc.flow.task_job_mgr import TaskJobManager
from cylc.flow.task_proxy import TaskProxy
from typing_extensions import Literal
from cylc.flow.taskdef import TaskDef


def submit_task_job(
Expand Down Expand Up @@ -79,13 +87,18 @@ def submit_task_job(
}
)
task_job_mgr.workflow_db_mgr.put_update_task_state(itask)
for output in process_outputs(itask, rtconfig):
for output in sorted(
process_outputs(itask, rtconfig),
key=itask.state.outputs.output_sort_key,
):
task_job_mgr.task_events_mgr.process_message(itask, INFO, output)

return True


def process_outputs(itask: 'TaskProxy', rtconfig: Dict) -> List[str]:
def process_outputs(
itask: 'TaskProxy', rtconfig: Optional[dict] = None
) -> Set[str]:
"""Process Skip Mode Outputs:

* By default, all required outputs will be generated plus succeeded
Expand All @@ -96,13 +109,13 @@ def process_outputs(itask: 'TaskProxy', rtconfig: Dict) -> List[str]:
succeeded or failed then succeeded will be produced.

Return:
A list of outputs to emit.
A set of outputs to emit.

"""
# Always produce `submitted` & `started` outputs first:
result: List[str] = [TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED]
result: Set[str] = {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED}

conf_outputs = list(rtconfig['skip']['outputs'])
conf_outputs = list(rtconfig['skip']['outputs']) if rtconfig else []

# Send the rest of our outputs, unless they are succeeded or failed,
# which we hold back, to prevent warnings about pre-requisites being
Expand All @@ -117,26 +130,22 @@ def process_outputs(itask: 'TaskProxy', rtconfig: Dict) -> List[str]:
trigger = itask.state.outputs._message_to_trigger[message]
# Send message unless it be succeeded/failed.
if (
trigger not in {
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
}
trigger not in {TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED}
and (not conf_outputs or trigger in conf_outputs)
):
result.append(message)
result.add(message)

# Add optional outputs specified in skip settings:
for message, trigger in itask.state.outputs._message_to_trigger.items():
if trigger in conf_outputs and trigger not in result:
result.append(message)
result.update(
message
for message, trigger in itask.state.outputs._message_to_trigger.items()
if trigger in conf_outputs
)

# Send succeeded/failed last.
if TASK_OUTPUT_FAILED in conf_outputs:
result.append(TASK_OUTPUT_FAILED)
elif TASK_OUTPUT_SUCCEEDED not in result:
result.append(TASK_OUTPUT_SUCCEEDED)
result.add(TASK_OUTPUT_FAILED)
else:
result.add(TASK_OUTPUT_SUCCEEDED)

return result

Expand Down
16 changes: 9 additions & 7 deletions cylc/flow/scripts/set.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

Command to manually set task prerequisites and outputs in running workflows.

By default, it sets all required outputs (note "succeeded" may be optional).
By default, it sets all required outputs (including "submitted", "started" and
"succeeded" even if they are optional).

Setting task prerequisites:
- contributes to the task's readiness to run, and
Expand All @@ -35,14 +36,15 @@
- contributes to a task's completion, and
- spawns downstream tasks that depend on those outputs

Note setting final outputs (succeeded, failed, expired) also sets task state.
Setting the started and submitted outputs spawns downstream tasks that depend
on them but does not affect task state, because there is no running job.
Note setting final outputs ("succeeded", "failed", "expired") also sets task
state. Setting the "started" and "submitted" outputs spawns downstream tasks
that depend on them but does not affect task state, because there is no
running job.

Implied outputs are set automatically:
- started implies submitted
- succeeded and failed imply started
- custom outputs and expired do not imply other outputs
- "started" implies "submitted"
- "succeeded" and "failed" imply "started"
- custom outputs and "expired" do not imply other outputs

For custom outputs, use the output names not the associated task messages:
[runtime]
Expand Down
22 changes: 17 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2010,14 +2010,25 @@ def _set_outputs_itask(
itask: 'TaskProxy',
outputs: Iterable[str],
) -> None:
"""Set requested outputs on a task proxy and spawn children."""
"""Set requested outputs on a task proxy and spawn children.

If no outputs were specified and the task has no required outputs to
set, set the "success pathway" outputs in the same way that skip mode
does.
"""
outputs = set(outputs)
if not outputs:
outputs = itask.state.outputs.iter_required_messages()
outputs = set(
# Set required outputs by default
itask.state.outputs.iter_required_messages()
) or (
# Set success pathway outputs
get_skip_mode_outputs(itask)
)
else:
# --out=skip is a shortcut to setting all the outputs that
# skip mode would.
outputs = set(outputs)
skips = []
skips: Set[str] = set()
if RunMode.SKIP.value in outputs:
# Check for broadcasts to task:
outputs.remove(RunMode.SKIP.value)
Expand Down Expand Up @@ -2386,7 +2397,8 @@ def filter_task_proxies(
ids:
ID strings.
warn_no_active:
Whether to log a warning if no matching active tasks are found.
Whether to log a warning if no matching tasks are found in the
pool.
inactive:
If True, unmatched IDs will be checked against taskdefs
and cycle, and any matches will be returned in the second
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ addopts = --verbose
# disable pytest-tornasync because it conflicts with pytest-asyncio's auto mode
-p no:tornado
-m "not linkcheck"
verbosity_assertions = 2
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This makes pytest show full diffs without all the extra junk that comes from running with -vv. I find the truncated diffs that pytest shows by default to be difficult to understand.)

testpaths =
cylc/flow/
tests/unit/
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/run_modes/test_skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ async def test_skip_mode_outputs(
Skip mode proposal point 2
https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
"""
graph = """
graph = r"""
# By default, all required outputs will be generated
# plus succeeded if success is optional:
foo? & foo:required_out => success_if_optional & required_outs

# The outputs submitted and started are always produced
# and do not need to be defined in outputs:
# and do not need to be defined in [runtime][X][skip]outputs:
foo:submitted => submitted_always
foo:started => started_always

Expand Down
101 changes: 100 additions & 1 deletion tests/integration/scripts/test_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,36 @@
Note: see also functional tests
"""

from secrets import token_hex

from cylc.flow.commands import (
run_cmd,
set_prereqs_and_outputs,
)
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.data_messages_pb2 import PbTaskProxy
from cylc.flow.data_store_mgr import TASK_PROXIES
from cylc.flow.flow_mgr import FLOW_ALL
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import TASK_STATUS_SUCCEEDED, TASK_STATUS_WAITING
from cylc.flow.task_outputs import (
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUCCEEDED,
)
from cylc.flow.task_state import (
TASK_STATUS_SUCCEEDED,
TASK_STATUS_WAITING,
)


def outputs_section(*names: str) -> dict:
"""Create outputs section with random messages for the given output names.
"""
return {
'outputs': {
name: token_hex() for name in names
}
}


async def test_set_parentless_spawning(
Expand Down Expand Up @@ -164,3 +189,77 @@ async def test_pre_all(flow, scheduler, run):
schd.pool.set_prereqs_and_outputs(['1/z'], [], ['all'], ['all'])
warn_or_higher = [i for i in log.records if i.levelno > 30]
assert warn_or_higher == []


async def test_no_outputs_given(flow, scheduler, start):
"""Test `cylc set` without providing any outputs.

It should set the "success pathway" outputs.
"""
schd: Scheduler = scheduler(
flow({
'scheduling': {
'graph': {
'R1': r"""
foo? => alpha
foo:submitted? => bravo
foo:started? => charlie
foo:x => xray
# Optional custom outputs not emitted:
foo:y? => yankee
# Non-success-pathway outputs not emitted:
foo:submit-failed? => delta
""",
},
},
'runtime': {
'foo': outputs_section('x', 'y'),
},
})
)
async with start(schd):
foo = schd.pool.get_tasks()[0]
await run_cmd(
set_prereqs_and_outputs(schd, [foo.identity], [FLOW_ALL])
)
assert set(foo.state.outputs.get_completed_outputs()) == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
'x'
}
assert schd.pool.get_task_ids() == {
'1/alpha',
'1/bravo',
'1/charlie',
'1/xray',
}


async def test_completion_expr(flow, scheduler, start):
"""Test `cylc set` without providing any outputs on a task that has a
custom completion expression."""
conf = {
'scheduling': {
'graph': {
'R1': 'foo? | foo:x? => bar'
},
},
'runtime': {
'foo': {
**outputs_section('x'),
'completion': '(succeeded or x) or failed'
},
},
}
schd: Scheduler = scheduler(flow(conf))
async with start(schd):
foo = schd.pool.get_tasks()[0]
await run_cmd(
set_prereqs_and_outputs(schd, [foo.identity], [FLOW_ALL])
)
assert set(foo.state.outputs.get_completed_outputs()) == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
}
Loading
Loading