24
24
from beeflow .common import cli_connection
25
25
26
26
27
+ bc .init ()
28
+ # Max number of times a component can be restarted
29
+ MAX_RESTARTS = bc .get ('DEFAULT' , 'max_restarts' )
30
+
31
+
27
32
class ComponentManager :
28
33
"""Component manager class."""
29
34
@@ -40,6 +45,8 @@ def wrap(fn):
40
45
self .components [name ] = {
41
46
'fn' : fn ,
42
47
'deps' : deps ,
48
+ 'restart_count' : 0 ,
49
+ 'failed' : False ,
43
50
}
44
51
45
52
return wrap
@@ -87,12 +94,24 @@ def run(self, base_components):
87
94
self .procs [name ] = component ['fn' ]()
88
95
89
96
def poll (self ):
90
- """Poll each process to check for errors."""
91
- for name , proc in self .procs .items ():
92
- returncode = proc .poll ()
97
+ """Poll each process to check for errors, restart failed processes."""
98
+ for name in self .procs : # noqa no need to iterate with items() since self.procs may be set
99
+ component = self .components [name ]
100
+ if component ['failed' ]:
101
+ continue
102
+ returncode = self .procs [name ].poll ()
93
103
if returncode is not None :
94
104
log = log_fname (name )
95
105
print (f'Component "{ name } " failed, check log "{ log } "' )
106
+ if component ['restart_count' ] >= MAX_RESTARTS :
107
+ print (f'Component "{ name } " has been restarted { MAX_RESTARTS } '
108
+ 'times, not restarting again' )
109
+ component ['failed' ] = True
110
+ else :
111
+ restart_count = component ['restart_count' ]
112
+ print (f'Attempting restart { restart_count } of "{ name } "...' )
113
+ self .procs [name ] = component ['fn' ]()
114
+ component ['restart_count' ] += 1
96
115
97
116
def status (self ):
98
117
"""Return the statuses for each process in a dict."""
@@ -140,6 +159,12 @@ def open_log(component):
140
159
return open (log , 'a' , encoding = 'utf-8' )
141
160
142
161
162
+ # Slurmrestd will be started only if we're running with Slurm and
163
+ # slurm::use_commands is not True
164
+ NEED_SLURMRESTD = (bc .get ('DEFAULT' , 'workload_scheduler' ) == 'Slurm'
165
+ and not bc .get ('slurm' , 'use_commands' ))
166
+
167
+
143
168
@MGR .component ('wf_manager' , ('scheduler' ,))
144
169
def start_wfm ():
145
170
"""Start the WFM."""
@@ -149,7 +174,12 @@ def start_wfm():
149
174
sock_path , stdout = fp , stderr = fp )
150
175
151
176
152
- @MGR .component ('task_manager' , ('slurmrestd' ,))
177
+ TM_DEPS = []
178
+ if NEED_SLURMRESTD :
179
+ TM_DEPS .append ('slurmrestd' )
180
+
181
+
182
+ @MGR .component ('task_manager' , TM_DEPS )
153
183
def start_task_manager ():
154
184
"""Start the TM."""
155
185
fp = open_log ('task_manager' )
@@ -168,21 +198,22 @@ def start_scheduler():
168
198
169
199
170
200
# Workflow manager and task manager need to be opened with PIPE for their stdout/stderr
171
- @MGR .component ('slurmrestd' )
172
- def start_slurm_restd ():
173
- """Start BEESlurmRestD. Returns a Popen process object."""
174
- bee_workdir = bc .get ('DEFAULT' , 'bee_workdir' )
175
- slurmrestd_log = '/' .join ([bee_workdir , 'logs' , 'restd.log' ])
176
- slurm_socket = bc .get ('slurmrestd' , 'slurm_socket' )
177
- slurm_args = bc .get ('slurmrestd' , 'slurm_args' )
178
- slurm_args = slurm_args if slurm_args is not None else ''
179
- subprocess .run (['rm' , '-f' , slurm_socket ], check = True )
180
- # log.info("Attempting to open socket: {}".format(slurm_socket))
181
- fp = open (slurmrestd_log , 'w' , encoding = 'utf-8' ) # noqa
182
- cmd = ['slurmrestd' ]
183
- cmd .extend (slurm_args .split ())
184
- cmd .append (f'unix:{ slurm_socket } ' )
185
- return subprocess .Popen (cmd , stdout = fp , stderr = fp )
201
+ if NEED_SLURMRESTD :
202
+ @MGR .component ('slurmrestd' )
203
+ def start_slurm_restd ():
204
+ """Start BEESlurmRestD. Returns a Popen process object."""
205
+ bee_workdir = bc .get ('DEFAULT' , 'bee_workdir' )
206
+ slurmrestd_log = '/' .join ([bee_workdir , 'logs' , 'restd.log' ])
207
+ slurm_socket = bc .get ('slurm' , 'slurmrestd_socket' )
208
+ openapi_version = bc .get ('slurm' , 'openapi_version' )
209
+ slurm_args = f'-s openapi/{ openapi_version } '
210
+ subprocess .run (['rm' , '-f' , slurm_socket ], check = True )
211
+ # log.info("Attempting to open socket: {}".format(slurm_socket))
212
+ fp = open (slurmrestd_log , 'w' , encoding = 'utf-8' ) # noqa
213
+ cmd = ['slurmrestd' ]
214
+ cmd .extend (slurm_args .split ())
215
+ cmd .append (f'unix:{ slurm_socket } ' )
216
+ return subprocess .Popen (cmd , stdout = fp , stderr = fp )
186
217
187
218
188
219
def handle_terminate (signum , stack ): # noqa
@@ -192,7 +223,7 @@ def handle_terminate(signum, stack): # noqa
192
223
sys .exit (1 )
193
224
194
225
195
- MIN_CHARLIECLOUD_VERSION = (0 , 27 )
226
+ MIN_CHARLIECLOUD_VERSION = (0 , 32 )
196
227
197
228
198
229
def version_str (version ):
@@ -288,13 +319,21 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',
288
319
beeflow_log = log_fname ('beeflow' )
289
320
check_dependencies ()
290
321
sock_path = bc .get ('DEFAULT' , 'beeflow_socket' )
322
+ if bc .get ('DEFAULT' , 'workload_scheduler' ) == 'Slurm' and not NEED_SLURMRESTD :
323
+ warn ('Not using slurmrestd. Command-line interface will be used.' )
291
324
# Note: there is a possible race condition here, however unlikely
292
325
if os .path .exists (sock_path ):
293
326
# Try to contact for a status
294
- resp = cli_connection .send (sock_path , {'type' : 'status' })
327
+ try :
328
+ resp = cli_connection .send (sock_path , {'type' : 'status' })
329
+ except (ConnectionResetError , ConnectionRefusedError ):
330
+ resp = None
295
331
if resp is None :
296
332
# Must be dead, so remove the socket path
297
- os .remove (sock_path )
333
+ try :
334
+ os .remove (sock_path )
335
+ except FileNotFoundError :
336
+ pass
298
337
else :
299
338
# It's already running, so print an error and exit
300
339
warn (f'Beeflow appears to be running. Check the beeflow log: "{ beeflow_log } "' )
@@ -352,6 +391,14 @@ def stop():
352
391
print (f'Beeflow has stopped. Check the log at "{ beeflow_log } ".' )
353
392
354
393
394
+ @app .command ()
395
+ def restart (foreground : bool = typer .Option (False , '--foreground' , '-F' ,
396
+ help = 'run in the foreground' )):
397
+ """Attempt to stop and restart the beeflow daemon."""
398
+ stop ()
399
+ start (foreground )
400
+
401
+
355
402
@app .callback (invoke_without_command = True )
356
403
def version_callback (version : bool = False ):
357
404
"""Beeflow."""
@@ -364,7 +411,6 @@ def version_callback(version: bool = False):
364
411
365
412
def main ():
366
413
"""Start the beeflow app."""
367
- bc .init ()
368
414
app ()
369
415
370
416
0 commit comments