From e2fa43efc804a28cbddc241272562ceb2516a775 Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Mon, 7 Nov 2022 16:40:44 -0500 Subject: [PATCH 1/3] feat: generate kinds concurrently --- src/taskgraph/generator.py | 78 ++++++++++++++++++++++++-------- src/taskgraph/transforms/base.py | 31 +++++++++++-- 2 files changed, 86 insertions(+), 23 deletions(-) diff --git a/src/taskgraph/generator.py b/src/taskgraph/generator.py index 4ed2a4152..d6135e8ee 100644 --- a/src/taskgraph/generator.py +++ b/src/taskgraph/generator.py @@ -2,6 +2,7 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +import asyncio import copy import logging import os @@ -44,7 +45,8 @@ def _get_loader(self): loader = "taskgraph.loader.default:loader" return find_object(loader) - def load_tasks(self, parameters, loaded_tasks, write_artifacts): + async def load_tasks(self, parameters, loaded_tasks, write_artifacts): + logger.debug(f"Loading tasks for kind {self.name}") loader = self._get_loader() config = copy.deepcopy(self.config) @@ -85,8 +87,9 @@ def load_tasks(self, parameters, loaded_tasks, write_artifacts): soft_dependencies=task_dict.get("soft-dependencies"), if_dependencies=task_dict.get("if-dependencies"), ) - for task_dict in transforms(trans_config, inputs) + async for task_dict in await transforms(trans_config, inputs) ] + logger.info(f"Generated {len(tasks)} tasks for kind {self.name}") return tasks @classmethod @@ -249,6 +252,57 @@ def _load_kinds(self, graph_config, target_kinds=None): except KindNotFound: continue + async def _load_tasks(self, kinds, kind_graph, parameters): + all_tasks = {} + futures_to_kind = {} + + def add_new_tasks(tasks): + for task in tasks: + if task.label in all_tasks: + raise Exception("duplicate tasks with label " + task.label) + all_tasks[task.label] = task + + def create_futures(kinds, edges): + """Create the next batch of tasks for kinds without dependencies.""" + kinds_with_deps = {edge[0] for edge in edges} + ready_kinds = set(kinds) - kinds_with_deps + futures = set() + for name in ready_kinds: + task = asyncio.create_task( + kinds[name].load_tasks( + parameters, + list(all_tasks.values()), + self._write_artifacts, + ) + ) + futures.add(task) + futures_to_kind[task] = name + return futures + + edges = set(kind_graph.edges) + futures = create_futures(kinds, edges) + while len(kinds) > 0: + done, futures = await asyncio.wait( + futures, return_when=asyncio.FIRST_COMPLETED + ) + + for future in done: + add_new_tasks(future.result()) + name = futures_to_kind[future] + + # Update state for next batch of futures. + del kinds[name] + edges = {e for e in edges if e[1] != name} + + futures |= create_futures(kinds, edges) + + if futures: + done, _ = await asyncio.wait(futures, return_when=asyncio.ALL_COMPLETED) + for future in done: + add_new_tasks(future.result()) + + return all_tasks + def _run(self): logger.info("Loading graph configuration.") graph_config = load_graph_config(self.root_dir) @@ -303,24 +357,8 @@ def _run(self): ) logger.info("Generating full task set") - all_tasks = {} - for kind_name in kind_graph.visit_postorder(): - logger.debug(f"Loading tasks for kind {kind_name}") - kind = kinds[kind_name] - try: - new_tasks = kind.load_tasks( - parameters, - list(all_tasks.values()), - self._write_artifacts, - ) - except Exception: - logger.exception(f"Error loading tasks for kind {kind_name}:") - raise - for task in new_tasks: - if task.label in all_tasks: - raise Exception("duplicate tasks with label " + task.label) - all_tasks[task.label] = task - logger.info(f"Generated {len(new_tasks)} tasks for kind {kind_name}") + all_tasks = asyncio.run(self._load_tasks(kinds, kind_graph, parameters)) + full_task_set = TaskGraph(all_tasks, Graph(set(all_tasks), set())) yield self.verify("full_task_set", full_task_set, graph_config, parameters) diff --git a/src/taskgraph/transforms/base.py b/src/taskgraph/transforms/base.py index e6fcd2400..c633fd3f2 100644 --- a/src/taskgraph/transforms/base.py +++ b/src/taskgraph/transforms/base.py @@ -2,7 +2,8 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. - +import asyncio +import inspect import re from dataclasses import dataclass, field from typing import Dict, List, Union @@ -107,6 +108,12 @@ def repo_configs(self): return repo_configs +async def convert_async(it): + """Convert a synchronous iterator to an async one.""" + for i in it: + yield i + + @dataclass() class TransformSequence: """ @@ -121,11 +128,29 @@ class TransformSequence: _transforms: List = field(default_factory=list) - def __call__(self, config, items): + async def __call__(self, config, items): for xform in self._transforms: - items = xform(config, items) + if isinstance(xform, TransformSequence): + items = await xform(config, items) + elif inspect.isasyncgenfunction(xform): + # Async generator transforms require async generator inputs. + # This can happen if a synchronous transform ran immediately + # prior. + if not inspect.isasyncgen(items): + items = convert_async(items) + items = xform(config, items) + else: + # Creating a synchronous generator from an asynchronous context + # doesn't appear possible, so unfortunately we need to convert + # to a list. + if inspect.isasyncgen(items): + items = [i async for i in items] + items = xform(config, items) if items is None: raise Exception(f"Transform {xform} is not a generator") + + if not inspect.isasyncgen(items): + items = convert_async(items) return items def add(self, func): From 6da6cf9fca7f4a99db8bd5071bda122cac934238 Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Wed, 29 Nov 2023 23:02:29 -0500 Subject: [PATCH 2/3] Convert all transforms to async --- src/taskgraph/transforms/cached_tasks.py | 4 +- src/taskgraph/transforms/chunking.py | 4 +- src/taskgraph/transforms/code_review.py | 4 +- src/taskgraph/transforms/docker_image.py | 6 +-- src/taskgraph/transforms/fetch.py | 8 ++-- src/taskgraph/transforms/from_deps.py | 4 +- src/taskgraph/transforms/job/__init__.py | 24 ++++++------ src/taskgraph/transforms/notify.py | 4 +- src/taskgraph/transforms/task.py | 50 ++++++++++++------------ src/taskgraph/transforms/task_context.py | 4 +- 10 files changed, 55 insertions(+), 57 deletions(-) diff --git a/src/taskgraph/transforms/cached_tasks.py b/src/taskgraph/transforms/cached_tasks.py index 57a55dffb..8f840c2dd 100644 --- a/src/taskgraph/transforms/cached_tasks.py +++ b/src/taskgraph/transforms/cached_tasks.py @@ -50,7 +50,7 @@ def format_task_digest(cached_task): @transforms.add -def cache_task(config, tasks): +async def cache_task(config, tasks): if taskgraph.fast: for task in tasks: yield task @@ -61,7 +61,7 @@ def cache_task(config, tasks): if "cached_task" in task.attributes: digests[task.label] = format_task_digest(task.attributes["cached_task"]) - for task in order_tasks(config, tasks): + for task in order_tasks(config, [t async for t in tasks]): cache = task.pop("cache", None) if cache is None: yield task diff --git a/src/taskgraph/transforms/chunking.py b/src/taskgraph/transforms/chunking.py index 31d7eff82..b6bac16d7 100644 --- a/src/taskgraph/transforms/chunking.py +++ b/src/taskgraph/transforms/chunking.py @@ -50,8 +50,8 @@ @transforms.add -def chunk_tasks(config, tasks): - for task in tasks: +async def chunk_tasks(config, tasks): + async for task in tasks: chunk_config = task.pop("chunk", None) if not chunk_config: yield task diff --git a/src/taskgraph/transforms/code_review.py b/src/taskgraph/transforms/code_review.py index bdb655b97..a6bfa9538 100644 --- a/src/taskgraph/transforms/code_review.py +++ b/src/taskgraph/transforms/code_review.py @@ -12,8 +12,8 @@ @transforms.add -def add_dependencies(config, jobs): - for job in jobs: +async def add_dependencies(config, jobs): + async for job in jobs: job.setdefault("soft-dependencies", []) job["soft-dependencies"] += [ dep_task.label diff --git a/src/taskgraph/transforms/docker_image.py b/src/taskgraph/transforms/docker_image.py index d0c5b9c97..17ba3e7fb 100644 --- a/src/taskgraph/transforms/docker_image.py +++ b/src/taskgraph/transforms/docker_image.py @@ -65,7 +65,7 @@ @transforms.add -def fill_template(config, tasks): +async def fill_template(config, tasks): available_packages = set() for task in config.kind_dependencies_tasks.values(): if task.kind != "packages": @@ -75,13 +75,11 @@ def fill_template(config, tasks): context_hashes = {} - tasks = list(tasks) - if not taskgraph.fast and config.write_artifacts: if not os.path.isdir(CONTEXTS_DIR): os.makedirs(CONTEXTS_DIR) - for task in tasks: + async for task in tasks: image_name = task.pop("name") job_symbol = task.pop("symbol", None) args = task.pop("args", {}) diff --git a/src/taskgraph/transforms/fetch.py b/src/taskgraph/transforms/fetch.py index bcb8ff38a..e24c0183f 100644 --- a/src/taskgraph/transforms/fetch.py +++ b/src/taskgraph/transforms/fetch.py @@ -78,9 +78,9 @@ def wrap(func): @transforms.add -def process_fetch_job(config, jobs): +async def process_fetch_job(config, jobs): # Converts fetch-url entries to the job schema. - for job in jobs: + async for job in jobs: typ = job["fetch"]["type"] name = job["name"] fetch = job.pop("fetch") @@ -103,7 +103,7 @@ def configure_fetch(config, typ, name, fetch): @transforms.add -def make_task(config, jobs): +async def make_task(config, jobs): # Fetch tasks are idempotent and immutable. Have them live for # essentially forever. if config.params["level"] == "3": @@ -111,7 +111,7 @@ def make_task(config, jobs): else: expires = "28 days" - for job in jobs: + async for job in jobs: name = job["name"] artifact_prefix = job.get("artifact-prefix", "public") env = job.get("env", {}) diff --git a/src/taskgraph/transforms/from_deps.py b/src/taskgraph/transforms/from_deps.py index 337d68e4b..c476c1038 100644 --- a/src/taskgraph/transforms/from_deps.py +++ b/src/taskgraph/transforms/from_deps.py @@ -113,8 +113,8 @@ @transforms.add -def from_deps(config, tasks): - for task in tasks: +async def from_deps(config, tasks): + async for task in tasks: # Setup and error handling. from_deps = task.pop("from-deps") kind_deps = config.config.get("kind-dependencies", []) diff --git a/src/taskgraph/transforms/job/__init__.py b/src/taskgraph/transforms/job/__init__.py index d86eff3ef..5ad6def7d 100644 --- a/src/taskgraph/transforms/job/__init__.py +++ b/src/taskgraph/transforms/job/__init__.py @@ -112,8 +112,8 @@ @transforms.add -def rewrite_when_to_optimization(config, jobs): - for job in jobs: +async def rewrite_when_to_optimization(config, jobs): + async for job in jobs: when = job.pop("when", {}) if not when: yield job @@ -132,8 +132,8 @@ def rewrite_when_to_optimization(config, jobs): @transforms.add -def set_implementation(config, jobs): - for job in jobs: +async def set_implementation(config, jobs): + async for job in jobs: impl, os = worker_type_implementation(config.graph_config, job["worker-type"]) if os: job.setdefault("tags", {})["os"] = os @@ -148,8 +148,8 @@ def set_implementation(config, jobs): @transforms.add -def set_label(config, jobs): - for job in jobs: +async def set_label(config, jobs): + async for job in jobs: if "label" not in job: if "name" not in job: raise Exception("job has neither a name nor a label") @@ -160,8 +160,8 @@ def set_label(config, jobs): @transforms.add -def add_resource_monitor(config, jobs): - for job in jobs: +async def add_resource_monitor(config, jobs): + async for job in jobs: if job.get("attributes", {}).get("resource-monitor"): worker_implementation, worker_os = worker_type_implementation( config.graph_config, job["worker-type"] @@ -204,13 +204,13 @@ def get_attribute(dict, key, attributes, attribute_name): @transforms.add -def use_fetches(config, jobs): +async def use_fetches(config, jobs): artifact_names = {} aliases = {} extra_env = {} + jobs = [j async for j in jobs] if config.kind in ("toolchain", "fetch"): - jobs = list(jobs) for job in jobs: run = job.get("run", {}) label = job["label"] @@ -353,12 +353,12 @@ def cmp_artifacts(a): @transforms.add -def make_task_description(config, jobs): +async def make_task_description(config, jobs): """Given a build description, create a task description""" # import plugin modules first, before iterating over jobs import_sibling_modules(exceptions=("common.py",)) - for job in jobs: + async for job in jobs: # always-optimized tasks never execute, so have no workdir if job["worker"]["implementation"] in ("docker-worker", "generic-worker"): job["run"].setdefault("workdir", "/builds/worker") diff --git a/src/taskgraph/transforms/notify.py b/src/taskgraph/transforms/notify.py index a61e7999c..8e9b4229a 100644 --- a/src/taskgraph/transforms/notify.py +++ b/src/taskgraph/transforms/notify.py @@ -140,8 +140,8 @@ def _convert_content(content): @transforms.add -def add_notifications(config, tasks): - for task in tasks: +async def add_notifications(config, tasks): + async for task in tasks: label = "{}-{}".format(config.kind, task["name"]) if "notifications" in task: notify = _convert_legacy(config, task.pop("notifications"), label) diff --git a/src/taskgraph/transforms/task.py b/src/taskgraph/transforms/task.py index f7916ad85..ea8258d36 100644 --- a/src/taskgraph/transforms/task.py +++ b/src/taskgraph/transforms/task.py @@ -835,11 +835,11 @@ def build_dummy_payload(config, task, task_def): @transforms.add -def set_implementation(config, tasks): +async def set_implementation(config, tasks): """ Set the worker implementation based on the worker-type alias. """ - for task in tasks: + async for task in tasks: worker = task.setdefault("worker", {}) if "implementation" in task["worker"]: yield task @@ -859,8 +859,8 @@ def set_implementation(config, tasks): @transforms.add -def set_defaults(config, tasks): - for task in tasks: +async def set_defaults(config, tasks): + async for task in tasks: task.setdefault("always-target", False) task.setdefault("optimization", None) task.setdefault("needs-sccache", False) @@ -903,8 +903,8 @@ def set_defaults(config, tasks): @transforms.add -def task_name_from_label(config, tasks): - for task in tasks: +async def task_name_from_label(config, tasks): + async for task in tasks: if "label" not in task: if "name" not in task: raise Exception("task has neither a name nor a label") @@ -915,8 +915,8 @@ def task_name_from_label(config, tasks): @transforms.add -def validate(config, tasks): - for task in tasks: +async def validate(config, tasks): + async for task in tasks: validate_schema( task_description_schema, task, @@ -953,8 +953,8 @@ def add_generic_index_routes(config, task): @transforms.add -def process_treeherder_metadata(config, tasks): - for task in tasks: +async def process_treeherder_metadata(config, tasks): + async for task in tasks: routes = task.get("routes", []) extra = task.get("extra", {}) task_th = task.get("treeherder") @@ -1025,8 +1025,8 @@ def process_treeherder_metadata(config, tasks): @transforms.add -def add_index_routes(config, tasks): - for task in tasks: +async def add_index_routes(config, tasks): + async for task in tasks: index = task.get("index", {}) # The default behavior is to rank tasks according to their tier @@ -1057,8 +1057,8 @@ def add_index_routes(config, tasks): @transforms.add -def build_task(config, tasks): - for task in tasks: +async def build_task(config, tasks): + async for task in tasks: level = str(config.params["level"]) provisioner_id, worker_type = get_worker_type( @@ -1219,24 +1219,24 @@ def build_task(config, tasks): @transforms.add -def add_github_checks(config, tasks): +async def add_github_checks(config, tasks): """ For git repositories, add checks route to all tasks. This will be replaced by a configurable option in the future. """ if config.params["repository_type"] != "git": - for task in tasks: + async for task in tasks: yield task - for task in tasks: + async for task in tasks: task["task"]["routes"].append("checks") yield task @transforms.add -def chain_of_trust(config, tasks): - for task in tasks: +async def chain_of_trust(config, tasks): + async for task in tasks: if task["task"].get("payload", {}).get("features", {}).get("chainOfTrust"): image = task.get("dependencies", {}).get("docker-image") if image: @@ -1250,12 +1250,12 @@ def chain_of_trust(config, tasks): @transforms.add -def check_task_identifiers(config, tasks): +async def check_task_identifiers(config, tasks): """Ensures that all tasks have well defined identifiers: ``^[a-zA-Z0-9_-]{1,38}$`` """ e = re.compile("^[a-zA-Z0-9_-]{1,38}$") - for task in tasks: + async for task in tasks: for attrib in ("workerType", "provisionerId"): if not e.match(task["task"][attrib]): raise Exception( @@ -1267,9 +1267,9 @@ def check_task_identifiers(config, tasks): @transforms.add -def check_task_dependencies(config, tasks): +async def check_task_dependencies(config, tasks): """Ensures that tasks don't have more than 100 dependencies.""" - for task in tasks: + async for task in tasks: number_of_dependencies = ( len(task["dependencies"]) + len(task["if-dependencies"]) @@ -1315,7 +1315,7 @@ def check_caches_are_volumes(task): @transforms.add -def check_run_task_caches(config, tasks): +async def check_run_task_caches(config, tasks): """Audit for caches requiring run-task. run-task manages caches in certain ways. If a cache managed by run-task @@ -1340,7 +1340,7 @@ def check_run_task_caches(config, tasks): suffix = _run_task_suffix() - for task in tasks: + async for task in tasks: payload = task["task"].get("payload", {}) command = payload.get("command") or [""] diff --git a/src/taskgraph/transforms/task_context.py b/src/taskgraph/transforms/task_context.py index 5c7ed6af8..14db94dda 100644 --- a/src/taskgraph/transforms/task_context.py +++ b/src/taskgraph/transforms/task_context.py @@ -81,8 +81,8 @@ @transforms.add -def render_task(config, jobs): - for job in jobs: +async def render_task(config, jobs): + async for job in jobs: sub_config = job.pop("task-context") params_context = {} for var, path in sub_config.pop("from-parameters", {}).items(): From cbbafa49b744f0a849aea66259f4a2076cdf5726 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 1 Dec 2023 03:46:10 +0000 Subject: [PATCH 3/3] style: pre-commit.ci auto fixes [...] --- src/taskgraph/transforms/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/taskgraph/transforms/base.py b/src/taskgraph/transforms/base.py index c633fd3f2..a40213b89 100644 --- a/src/taskgraph/transforms/base.py +++ b/src/taskgraph/transforms/base.py @@ -2,7 +2,6 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. -import asyncio import inspect import re from dataclasses import dataclass, field