Skip to content

Commit

Permalink
fix async generator
Browse files Browse the repository at this point in the history
  • Loading branch information
dsschult committed Jan 24, 2025
1 parent 5026ec5 commit a9335d4
Showing 1 changed file with 45 additions and 44 deletions.
89 changes: 45 additions & 44 deletions iceprod/server/plugins/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,50 +967,51 @@ async def check_iceprod(self):
)
await self.task_reset(job, reason='task missing from HTCondor queue')

@AsyncPromTimer(lambda self: self.prometheus.histogram('iceprod_grid_check_submit_dir', 'IceProd grid check calls'))
async def check_submit_dir(self):
@PromWrapper(lambda self: self.prometheus.histogram('iceprod_grid_check_submit_dir', 'IceProd grid check calls'))
async def check_submit_dir(self, prom_histogram):
"""
Return directory paths that should be cleaned up.
"""
# get time limits
queue_tasks = {j.task_id for j in self.jobs.values()}
queued_time = self.cfg['queue'].get('max_task_queued_time', 86400*2)
processing_time = self.cfg['queue'].get('max_task_processing_time', 86400*2)
suspend_time = self.cfg['queue'].get('suspend_submit_dir_time', 86400)
now = time.time()
job_clean_logs_time = now - suspend_time
job_old_time = now - (queued_time + processing_time)
dir_old_time = now - (queued_time + processing_time + suspend_time)
logger.debug('now: %r, job_clean_logs_time: %r, job_old_time: %r, dir_old_time: %r', now, job_clean_logs_time, job_old_time, dir_old_time)

for daydir in self.submit_dir.glob('[0-9][0-9][0-9][0-9]*'):
logger.debug('looking at daydir %s', daydir)
if daydir.is_dir():
empty = True
for path in daydir.iterdir():
job_active = path.name.split('_')[0] in queue_tasks
logger.debug('looking at path %s, active: %r', path, job_active)
st = path.lstat()
logger.debug('stat: %r', st)
if stat.S_ISDIR(st.st_mode):
empty = False
if not job_active:
if st.st_mtime < job_clean_logs_time:
logger.info('cleaning up submit dir %s', path)
shutil.rmtree(path)
elif st.st_mtime < job_old_time:
yield path
if st.st_mtime < dir_old_time:
logger.info('cleaning up submit dir %s', path)
shutil.rmtree(path)
if empty:
logger.info('cleaning up daydir %s', daydir)
for path in self.jels.copy():
if Path(path).parent == daydir:
logger.info('removing JEL')
self.jels[path].close()
del self.jels[path]
shutil.rmtree(daydir)
continue
# let other processing happen
await asyncio.sleep(0)
with prom_histogram.time():
# get time limits
queue_tasks = {j.task_id for j in self.jobs.values()}
queued_time = self.cfg['queue'].get('max_task_queued_time', 86400*2)
processing_time = self.cfg['queue'].get('max_task_processing_time', 86400*2)
suspend_time = self.cfg['queue'].get('suspend_submit_dir_time', 86400)
now = time.time()
job_clean_logs_time = now - suspend_time
job_old_time = now - (queued_time + processing_time)
dir_old_time = now - (queued_time + processing_time + suspend_time)
logger.debug('now: %r, job_clean_logs_time: %r, job_old_time: %r, dir_old_time: %r', now, job_clean_logs_time, job_old_time, dir_old_time)

for daydir in self.submit_dir.glob('[0-9][0-9][0-9][0-9]*'):
logger.debug('looking at daydir %s', daydir)
if daydir.is_dir():
empty = True
for path in daydir.iterdir():
job_active = path.name.split('_')[0] in queue_tasks
logger.debug('looking at path %s, active: %r', path, job_active)
st = path.lstat()
logger.debug('stat: %r', st)
if stat.S_ISDIR(st.st_mode):
empty = False
if not job_active:
if st.st_mtime < job_clean_logs_time:
logger.info('cleaning up submit dir %s', path)
shutil.rmtree(path)
elif st.st_mtime < job_old_time:
yield path
if st.st_mtime < dir_old_time:
logger.info('cleaning up submit dir %s', path)
shutil.rmtree(path)
if empty:
logger.info('cleaning up daydir %s', daydir)
for path in self.jels.copy():
if Path(path).parent == daydir:
logger.info('removing JEL')
self.jels[path].close()
del self.jels[path]
shutil.rmtree(daydir)
continue
# let other processing happen
await asyncio.sleep(0)

0 comments on commit a9335d4

Please sign in to comment.