@@ -132,7 +132,15 @@ def run_job(self, gpu_slot: GPUSlot, job: Job, job_index: int) -> int:
132
132
return return_code
133
133
134
134
def run_jobs (self , jobs : List [Job ]) -> List [int ]:
135
- """Run multiple jobs across available GPU slots."""
135
+ """Run multiple jobs across available GPU slots.
136
+
137
+ Args:
138
+ jobs: List of Job objects to execute
139
+
140
+ Returns:
141
+ List[int]: Status code for each job
142
+ (0 for success, non-zero for failure)
143
+ """
136
144
total_slots = len (self .gpu_slots )
137
145
self .logger .info (
138
146
f"Starting { len (jobs )} jobs across { self .n_gpus } "
@@ -147,6 +155,7 @@ def run_jobs(self, jobs: List[Job]) -> List[int]:
147
155
for slot in self .gpu_slots :
148
156
slot_queue .put (slot )
149
157
158
+ # Initialize results list with None values
150
159
results = [None ] * len (jobs )
151
160
active_jobs = set ()
152
161
job_lock = threading .Lock ()
@@ -184,12 +193,22 @@ def worker():
184
193
) as executor :
185
194
futures = []
186
195
196
+ # Queue up all jobs
187
197
for i , job in enumerate (jobs ):
188
198
job_queue .put ((i , job ))
189
199
200
+ # Start workers
190
201
for _ in range (total_slots ):
191
202
futures .append (executor .submit (worker ))
192
203
204
+ # Wait for all workers to complete
193
205
concurrent .futures .wait (futures )
194
206
195
207
self .logger .info ("\n All jobs completed!" )
208
+
209
+ # Ensure all jobs have a result
210
+ assert all (
211
+ result is not None for result in results
212
+ ), "Some jobs did not complete"
213
+
214
+ return results
0 commit comments