Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy & add type annotations #4213

Merged
merged 8 commits into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/test_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ jobs:

- name: style
run: |
pycodestyle
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

flake8
etc/bin/shellchecker

Expand Down
94 changes: 53 additions & 41 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import os
import re
import traceback
from typing import Set
from typing import (
Any, Callable, Dict, List, Mapping, Optional, Set, TYPE_CHECKING, Tuple
)

from metomi.isodatetime.data import Calendar
from metomi.isodatetime.parsers import DurationParser
Expand Down Expand Up @@ -88,6 +90,10 @@
get_current_time_string, set_utc_mode, get_utc_mode)
from cylc.flow.xtrigger_mgr import XtriggerManager

if TYPE_CHECKING:
from optparse import Values
from cylc.flow.cycling import IntervalBase, PointBase, SequenceBase


RE_CLOCK_OFFSET = re.compile(r'(' + TaskID.NAME_RE + r')(?:\(\s*(.+)\s*\))?')
RE_EXT_TRIGGER = re.compile(r'(.*)\s*\(\s*(.+)\s*\)\s*')
Expand Down Expand Up @@ -140,23 +146,23 @@ class WorkflowConfig:

def __init__(
self,
workflow,
fpath,
options=None,
template_vars=None,
is_reload=False,
output_fname=None,
xtrigger_mgr=None,
mem_log_func=None,
run_dir=None,
log_dir=None,
work_dir=None,
share_dir=None,
):
workflow: str,
fpath: str,
options: Optional['Values'] = None,
template_vars: Optional[Mapping[str, Any]] = None,
is_reload: bool = False,
output_fname: Optional[str] = None,
xtrigger_mgr: Optional[XtriggerManager] = None,
mem_log_func: Optional[Callable[[str], None]] = None,
run_dir: Optional[str] = None,
log_dir: Optional[str] = None,
work_dir: Optional[str] = None,
share_dir: Optional[str] = None
) -> None:

self.mem_log = mem_log_func
if mem_log_func is None:
self.mem_log = lambda *a: False
if self.mem_log is None:
self.mem_log = lambda x: None
self.mem_log("config.py:config.py: start init config")
self.workflow = workflow # workflow name
self.fpath = fpath # workflow definition
Expand All @@ -167,30 +173,32 @@ def __init__(
self.work_dir = work_dir or get_workflow_run_work_dir(self.workflow)
self.options = options
self.implicit_tasks: Set[str] = set()
self.edges = {}
self.taskdefs = {}
self.initial_point = None
self.start_point = None
self.final_point = None
self.edges: Dict[
'SequenceBase', Set[Tuple[str, str, bool, bool]]
] = {}
self.taskdefs: Dict[str, TaskDef] = {}
self.initial_point: Optional['PointBase'] = None
self.start_point: Optional['PointBase'] = None
self.final_point: Optional['PointBase'] = None
self.first_graph = True
self.clock_offsets = {}
self.expiration_offsets = {}
self.ext_triggers = {} # Old external triggers (client/server)
self.xtrigger_mgr = xtrigger_mgr
self.workflow_polling_tasks = {}
self._last_graph_raw_id = None
self._last_graph_raw_edges = []
self.workflow_polling_tasks = {} # type: ignore # TODO figure out type
self._last_graph_raw_id: Optional[tuple] = None
self._last_graph_raw_edges = [] # type: ignore # TODO figure out type

self.sequences = []
self.actual_first_point = None
self._start_point_for_actual_first_point = None
self.sequences: List['SequenceBase'] = []
self.actual_first_point: Optional['PointBase'] = None
self._start_point_for_actual_first_point: Optional['PointBase'] = None

self.task_param_vars = {}
self.custom_runahead_limit = None
self.task_param_vars = {} # type: ignore # TODO figure out type
self.custom_runahead_limit: Optional['IntervalBase'] = None
self.max_num_active_cycle_points = None

# runtime hierarchy dicts keyed by namespace name:
self.runtime = {
self.runtime: Dict[str, dict] = { # TODO figure out type
# lists of parent namespaces
'parents': {},
# lists of C3-linearized ancestor namespaces
Expand All @@ -206,9 +214,9 @@ def __init__(
'first-parent descendants': {},
}
# tasks
self.leaves = []
self.leaves = [] # TODO figure out type
# one up from root
self.feet = []
self.feet = [] # type: ignore # TODO figure out type

# Export local environmental workflow context before config parsing.
self.process_workflow_env()
Expand Down Expand Up @@ -484,7 +492,7 @@ def __init__(

ngs = self.cfg['visualization']['node groups']
# If a node group member is a family, include its descendants too.
replace = {}
replace = {} # type: ignore # TODO figure out type
for ng, mems in ngs.items():
replace[ng] = []
for mem in mems:
Expand Down Expand Up @@ -521,14 +529,14 @@ def __init__(
LOG.warning(err_msg)

# 2. node attributes must refer to node groups or namespaces
bad = []
bad_nas = []
for na in self.cfg['visualization']['node attributes']:
if na not in ngs and na not in nspaces:
bad.append(na)
if bad:
bad_nas.append(na)
if bad_nas:
err_msg = "undefined node attribute targets"
for na in bad:
err_msg += "\n+ " + str(na)
for na in bad_nas:
err_msg += f"\n+ {na}"
LOG.warning(err_msg)

# 3. node attributes must be lists of quoted "key=value" pairs.
Expand Down Expand Up @@ -595,7 +603,9 @@ def __init__(
except IsodatetimeError:
vfcp = get_point(
self.cfg['visualization']['final cycle point']
).standardise()
)
if vfcp is not None:
vfcp = vfcp.standardise()
else:
vfcp = None

Expand Down Expand Up @@ -2123,7 +2133,9 @@ def find_taskdefs(self, name):
ret.append(self.taskdefs[member])
return ret

def get_taskdef(self, name, orig_expr=None):
def get_taskdef(
self, name: str, orig_expr: Optional[str] = None
) -> TaskDef:
"""Return an instance of TaskDef for task name."""
if name not in self.taskdefs:
try:
Expand All @@ -2134,7 +2146,7 @@ def get_taskdef(self, name, orig_expr=None):
raise WorkflowConfigError(str(exc))
return self.taskdefs[name]

def _get_taskdef(self, name):
def _get_taskdef(self, name: str) -> TaskDef:
"""Get the dense task runtime."""
# (TaskDefError caught above)

Expand Down
Loading