Skip to content

Commit 6c09040

Browse files
authored
Patch wait_for_workers (#1681)
1 parent 1e9d697 commit 6c09040

File tree

6 files changed

+24
-9
lines changed

6 files changed

+24
-9
lines changed

.github/workflows/tests.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ on:
44
# Runs at 00:00 on Sunday, Tuesday, and Thursday. (see https://crontab.guru)
55
- cron: "0 0 * * 0,2,4"
66
workflow_dispatch:
7-
87
concurrency:
98
# Include `github.event_name` to avoid pushes to `main` and
109
# scheduled jobs canceling one another

detect_regressions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def detect_regressions(database_file, is_pr=False):
8787
df_test[metric].iloc[-3] * units_norm,
8888
)
8989
reg = (
90-
f"{runtime = }, {name = }, {category = }, "
90+
f"{runtime=}, {name=}, {category=}, "
9191
f"last_three_{metric} {u} = "
9292
f"{last_three}, "
9393
f"{metric}_threshold {u} = {metric_threshold * units_norm} \n"

tests/benchmarks/test_parquet.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ def parquet_cluster(dask_env_variables, cluster_kwargs, github_cluster_tags):
4040

4141

4242
@pytest.fixture
43-
def parquet_client(parquet_cluster, cluster_kwargs, benchmark_all):
43+
def parquet_client(parquet_cluster, cluster_kwargs, benchmark_all, wait_for_workers):
4444
n_workers = cluster_kwargs["parquet_cluster"]["n_workers"]
4545
with distributed.Client(parquet_cluster) as client:
4646
parquet_cluster.scale(n_workers)
47-
client.wait_for_workers(n_workers, timeout=600)
47+
wait_for_workers(client, n_workers, timeout=600)
4848
client.restart()
4949
with benchmark_all(client):
5050
yield client

tests/benchmarks/test_spill.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ def spill_cluster(dask_env_variables, cluster_kwargs, github_cluster_tags):
3535

3636

3737
@pytest.fixture
38-
def spill_client(spill_cluster, cluster_kwargs, benchmark_all):
38+
def spill_client(spill_cluster, cluster_kwargs, benchmark_all, wait_for_workers):
3939
n_workers = cluster_kwargs["spill_cluster"]["n_workers"]
4040
with Client(spill_cluster) as client:
4141
spill_cluster.scale(n_workers)
42-
client.wait_for_workers(n_workers, timeout=600)
42+
wait_for_workers(client, n_workers, timeout=600)
4343
client.restart()
4444
with benchmark_all(client):
4545
yield client

tests/benchmarks/test_xarray.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ def group_reduction_cluster(dask_env_variables, cluster_kwargs, github_cluster_t
3636

3737

3838
@pytest.fixture
39-
def group_reduction_client(group_reduction_cluster, cluster_kwargs, benchmark_all):
39+
def group_reduction_client(
40+
group_reduction_cluster, cluster_kwargs, benchmark_all, wait_for_workers
41+
):
4042
n_workers = cluster_kwargs["group_reduction_cluster"]["n_workers"]
4143
with Client(group_reduction_cluster) as client:
4244
group_reduction_cluster.scale(n_workers)
43-
client.wait_for_workers(n_workers, timeout=600)
45+
wait_for_workers(client, n_workers, timeout=600)
4446
client.restart()
4547
with benchmark_all(client):
4648
yield client

tests/conftest.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from functools import lru_cache
1616

1717
import adlfs
18+
import coiled
1819
import dask
1920
import dask.array as da
2021
import distributed
@@ -565,14 +566,15 @@ def small_client(
565566
small_cluster,
566567
cluster_kwargs,
567568
benchmark_all,
569+
wait_for_workers,
568570
):
569571
n_workers = cluster_kwargs["small_cluster"]["n_workers"]
570572
test_label = f"{request.node.name}, session_id={testrun_uid}"
571573
with Client(small_cluster) as client:
572574
log_on_scheduler(client, "Starting client setup of %s", test_label)
573575
client.restart()
574576
small_cluster.scale(n_workers)
575-
client.wait_for_workers(n_workers, timeout=600)
577+
wait_for_workers(client, n_workers, timeout=600)
576578

577579
log_on_scheduler(client, "Finished client setup of %s", test_label)
578580

@@ -895,3 +897,15 @@ def _performance_report():
895897
test_run_benchmark.performance_report_url = filename
896898

897899
yield _performance_report
900+
901+
902+
@pytest.fixture
903+
def wait_for_workers():
904+
def _(client, n_workers, timeout):
905+
# https://github.com/coiled/platform/pull/8226
906+
if Version(coiled.__version__) <= Version("1.87"):
907+
client.sync(client._wait_for_workers, n_workers, timeout=timeout)
908+
else:
909+
client.wait_for_workers(n_workers, timeout=timeout)
910+
911+
return _

0 commit comments

Comments
 (0)