Skip to content

Commit

Permalink
wait for skydriver to say there are workers running to proceed
Browse files Browse the repository at this point in the history
  • Loading branch information
ric-evans committed Feb 13, 2025
1 parent 54e1560 commit 8ecac63
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
8 changes: 3 additions & 5 deletions skymap_scanner/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,9 @@ class EnvConfig:

# TIMEOUTS
#
# seconds -- how long server waits before thinking all clients are dead
# - set to duration of first reco + client launch (condor)
# - important if clients launch *AFTER* server
# - normal expiration scenario: all clients died (bad condor submit file), otherwise never (server knows when all recos are done)
SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS: int = 3 * 24 * 60 * 60 # 3 days
# seconds -- how long server waits before thinking *all* clients are dead
# - set to duration of longest reco + on-client startup time
SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS: int = 2 * 60 * 60 # 2 hours

# SKYDRIVER VARS
SKYSCAN_SKYDRIVER_ADDRESS: str = "" # SkyDriver REST interface address
Expand Down
8 changes: 8 additions & 0 deletions skymap_scanner/server/start_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
fetch_event_contents_from_skydriver,
get_mqclient_connections,
kill_switch_check_from_skydriver,
wait_for_workers_to_start,
)
from .. import config as cfg, recos
from ..recos import RecoInterface, set_pointing_ra_dec
Expand Down Expand Up @@ -360,6 +361,13 @@ async def scan(
)
await reporter.precomputing_report()

# Before doing anything further, are the workers ready?
# NOTE: this is an optimization -- without this check, we'd have to rely on the
# mq's 'to_clients_queue' timeout. however, if the workforce is slow to start
# (aka condor is very busy) then this wait could be *long*, much longer than
# the normal 'to_clients_queue' timeout.
await wait_for_workers_to_start()

# Start the scan iteration loop
total_n_pixfin = await _serve_and_collect(
to_clients_queue,
Expand Down
30 changes: 30 additions & 0 deletions skymap_scanner/server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import cachetools.func
import mqclient as mq
from rest_tools.client import CalcRetryFromWaittimeMax, RestClient
from wipac_dev_tools.timing_tools import IntervalTimer

from . import ENV

Expand Down Expand Up @@ -101,6 +102,35 @@ async def kill_switch_check_from_skydriver() -> None:
os.kill(os.getpid(), signal.SIGINT) # NOTE - sys.exit only exits thread


async def wait_for_workers_to_start() -> None:
"""Wait until SkyDriver indicates there are workers currently running."""
if not ENV.SKYSCAN_SKYDRIVER_ADDRESS:
return

skydriver_rc = connect_to_skydriver(urgent=False)
timer = IntervalTimer(30, LOGGER) # fyi: skydriver (feb '25) updates every 60s
prev = {}

while True: # yes, we are going to wait forever
resp = await skydriver_rc.request(
"GET", f"/scan/{ENV.SKYSCAN_SKYDRIVER_SCAN_ID}/ewms/workforce"
)

if resp != prev:
LOGGER.info(f"workers: {resp}") # why not log this, but just when updated
prev = resp

if resp["n_running"]:
LOGGER.info("SkyDriver says there are workers running!")
return
else:
LOGGER.info(
f"SkyDriver says there no workers are running (yet)"
f"--checking again in {timer.seconds}s..."
)
await timer.wait_until_interval()


########################################################################################


Expand Down

0 comments on commit 8ecac63

Please sign in to comment.