Skip to content

Commit 35354c7

Browse files
authored
Merge pull request #138 from allenai/favyen/20250527-beaker-upgrad
Use Beaker queues instead of Google Cloud Pub/Sub for the worker system.
2 parents 0b18cec + 76c8b84 commit 35354c7

File tree

14 files changed

+262
-438
lines changed

14 files changed

+262
-438
lines changed

.github/workflows/build_test.yaml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,9 @@ jobs:
130130
echo "Executing integration tests in ${TEST_DIRS}"
131131
docker compose -f docker-compose.yaml run \
132132
-e CI="true" \
133-
-e AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \
134-
-e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \
133+
-e AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY_ID}" \
134+
-e AWS_SECRET_ACCESS_KEY="${AWS_SECRET_ACCESS_KEY}" \
135+
-e BEAKER_TOKEN="${BEAKER_TOKEN}" \
135136
-v ${{env.GOOGLE_GHA_CREDS_PATH}}:/tmp/gcp-credentials.json:ro \
136137
-e GOOGLE_APPLICATION_CREDENTIALS=/tmp/gcp-credentials.json \
137138
-e RSLP_BUCKET=rslearn-eai \
@@ -143,8 +144,12 @@ jobs:
143144
-e TEST_PUBSUB_PROJECT=earthsystem-dev-c3po \
144145
-e TEST_PUBSUB_TOPIC=rslearn_projects_test_topic \
145146
-e TEST_PUBSUB_SUBSCRIPTION=rslearn_projects_test_subscription \
147+
-e TEST_QUEUE_NAME=favyen/rslearn-projects-test-queue \
146148
test pytest ${TEST_DIRS} -vv
147-
149+
env:
150+
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
151+
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
152+
BEAKER_TOKEN: ${{ secrets.BEAKER_TOKEN_2 }}
148153

149154
- name: Clean up
150155
if: always()

helios.Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM pytorch/pytorch:2.5.0-cuda11.8-cudnn9-runtime@sha256:d15e9803095e462e351f097fb1f5e7cdaa4f5e855d7ff6d6f36ec4c2aa2938ea
1+
FROM pytorch/pytorch:2.7.0-cuda11.8-cudnn9-runtime
22

33
RUN apt update
44
RUN apt install -y libpq-dev ffmpeg libsm6 libxext6 git wget

one_off_projects/convert_satlas_webmercator_to_rslearn/lib/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def convert_window(
109109

110110
layer_name = "label"
111111
layer_dir = window.get_layer_dir(layer_name)
112-
GeojsonVectorFormat().encode_vector(layer_dir, dst_projection, features)
112+
GeojsonVectorFormat().encode_vector(layer_dir, features)
113113
window.mark_layer_completed(layer_name)
114114

115115
# (3) Write mask corresponding to old window projected onto new window.

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
beaker-py>=1.32,<2.0
1+
beaker-py>=2.0
22
fastapi>=0.115
33
google-cloud-bigtable>=2.18
44
google-cloud-pubsub>=2.18

rslp/common/beaker_launcher.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import uuid
44
from datetime import datetime
55

6-
from beaker import Beaker, EnvVar, ExperimentSpec, ImageSource
7-
from beaker.exceptions import ImageNotFound
6+
from beaker import Beaker, BeakerEnvVar, BeakerExperimentSpec, BeakerImageSource
7+
from beaker.exceptions import BeakerImageNotFound
88

99
from rslp.log_utils import get_logger
1010
from rslp.utils.beaker import (
@@ -45,7 +45,7 @@ def launch_job(
4545
gpu_count: int = 0,
4646
shared_memory: str | None = None,
4747
priority: str = DEFAULT_PRIORITY,
48-
task_specific_env_vars: list[EnvVar] = [],
48+
task_specific_env_vars: list[BeakerEnvVar] = [],
4949
budget: str = DEFAULT_BUDGET,
5050
workspace: str = DEFAULT_WORKSPACE,
5151
preemptible: bool = True,
@@ -80,8 +80,7 @@ def launch_job(
8080

8181
logger.info("Starting Beaker client...")
8282
logger.info(f"Workspace: {workspace}")
83-
beaker = Beaker.from_env(default_workspace=workspace)
84-
with beaker.session():
83+
with Beaker.from_env(default_workspace=workspace) as beaker:
8584
logger.info("Getting base env vars...")
8685
base_env_vars = get_base_env_vars()
8786
logger.info("Generating task name...")
@@ -92,8 +91,8 @@ def launch_job(
9291
try:
9392
beaker.image.get(image)
9493
logger.info(f"Image already exists: {image}")
95-
image_source = ImageSource(beaker=image)
96-
except ImageNotFound:
94+
image_source = BeakerImageSource(beaker=image)
95+
except BeakerImageNotFound:
9796
logger.info(f"Uploading image: {image}")
9897
# Handle image upload
9998
image_source = upload_image(image, workspace, beaker)
@@ -102,7 +101,7 @@ def launch_job(
102101
logger.info("Creating experiment spec...")
103102
datasets = [create_gcp_credentials_mount()]
104103
datasets += [weka_mount.to_data_mount() for weka_mount in weka_mounts]
105-
experiment_spec = ExperimentSpec.new(
104+
experiment_spec = BeakerExperimentSpec.new(
106105
budget=budget,
107106
task_name=unique_task_name,
108107
beaker_image=image_source.beaker,

rslp/common/beaker_train.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44
import shutil
55
import uuid
66

7-
from beaker import Beaker, Constraints, EnvVar, ExperimentSpec, Priority, TaskResources
7+
from beaker import (
8+
Beaker,
9+
BeakerConstraints,
10+
BeakerEnvVar,
11+
BeakerExperimentSpec,
12+
BeakerTaskResources,
13+
)
814

915
from rslp import launcher_lib
1016
from rslp.utils.beaker import (
@@ -31,6 +37,7 @@ def beaker_train(
3137
project_id: str | None = None,
3238
experiment_id: str | None = None,
3339
extra_args: list[str] = [],
40+
priority: str = "high",
3441
) -> None:
3542
"""Launch training for the specified config on Beaker.
3643
@@ -51,6 +58,7 @@ def beaker_train(
5158
project_id: override the project ID.
5259
experiment_id: override the experiment ID.
5360
extra_args: extra arguments to pass in the Beaker job.
61+
priority: the priority to assign to the Beaker job.
5462
"""
5563
hparams_configs_dir = None
5664

@@ -80,41 +88,39 @@ def beaker_train(
8088
if hparams_configs_dir is not None:
8189
shutil.rmtree(hparams_configs_dir)
8290

83-
beaker = Beaker.from_env(default_workspace=workspace)
84-
85-
for run_id, config_path in config_paths.items():
86-
with beaker.session():
91+
with Beaker.from_env(default_workspace=workspace) as beaker:
92+
for run_id, config_path in config_paths.items():
8793
env_vars = get_base_env_vars()
8894
env_vars.extend(
8995
[
90-
EnvVar(
96+
BeakerEnvVar(
9197
name="RSLP_PROJECT", # nosec
9298
value=project_id,
9399
),
94-
EnvVar(
100+
BeakerEnvVar(
95101
name="RSLP_EXPERIMENT",
96102
value=experiment_id,
97103
),
98-
EnvVar(
104+
BeakerEnvVar(
99105
name="RSLP_RUN_ID",
100106
value=run_id,
101107
),
102108
]
103109
)
104110
if username:
105111
env_vars.append(
106-
EnvVar(
112+
BeakerEnvVar(
107113
name="WANDB_USERNAME",
108114
value=username,
109115
)
110116
)
111117
datasets = [create_gcp_credentials_mount()]
112118
datasets += [weka_mount.to_data_mount() for weka_mount in weka_mounts]
113-
spec = ExperimentSpec.new(
119+
spec = BeakerExperimentSpec.new(
114120
budget=DEFAULT_BUDGET,
115121
description=f"{project_id}/{experiment_id}/{run_id}",
116122
beaker_image=image_name,
117-
priority=Priority.high,
123+
priority=priority,
118124
command=["python", "-m", "rslp.docker_entrypoint"],
119125
arguments=[
120126
"model",
@@ -130,13 +136,17 @@ def beaker_train(
130136
project_id,
131137
]
132138
+ extra_args,
133-
constraints=Constraints(
139+
constraints=BeakerConstraints(
134140
cluster=cluster,
135141
),
136142
preemptible=True,
137143
datasets=datasets,
138144
env_vars=env_vars,
139-
resources=TaskResources(gpu_count=gpus, shared_memory=shared_memory),
145+
resources=BeakerTaskResources(
146+
gpu_count=gpus, shared_memory=shared_memory
147+
),
140148
)
141149
unique_id = str(uuid.uuid4())[0:8]
142-
beaker.experiment.create(f"{project_id}_{experiment_id}_{unique_id}", spec)
150+
beaker.experiment.create(
151+
name=f"{project_id}_{experiment_id}_{unique_id}", spec=spec
152+
)

0 commit comments

Comments
 (0)