Skip to content

Commit 4697373

Browse files
committed
Should work on new slurm sluster at geneva now
1 parent 0ca1863 commit 4697373

File tree

10 files changed

+90
-315
lines changed

10 files changed

+90
-315
lines changed

erna/automatic_processing/__main__.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def main(config, verbose):
2828
logging.getLogger('erna').setLevel(logging.DEBUG)
2929

3030
stream_handler = logging.StreamHandler()
31-
file_handler = logging.FileHandler(config['submitter']['logfile'])
31+
file_handler = logging.FileHandler(config['submitter'].pop('logfile'))
3232
formatter = logging.Formatter(
3333
'%(asctime)s|%(levelname)s|%(name)s|%(message)s'
3434
)
@@ -44,16 +44,7 @@ def main(config, verbose):
4444
database.close()
4545

4646
job_monitor = JobMonitor(port=config['submitter']['port'])
47-
job_submitter = JobSubmitter(
48-
interval=config['submitter']['interval'],
49-
max_queued_jobs=config['submitter']['max_queued_jobs'],
50-
data_directory=config['submitter']['data_directory'],
51-
host=config['submitter']['host'],
52-
port=config['submitter']['port'],
53-
group=config['submitter']['group'],
54-
mail_address=config['submitter']['mail_address'],
55-
mail_settings=config['submitter']['mail_settings'],
56-
)
47+
job_submitter = JobSubmitter(**config['submitter'])
5748

5849
log.info('Starting main loop')
5950
try:
@@ -75,6 +66,6 @@ def main(config, verbose):
7566
inserted = ProcessingState.get(description='inserted')
7667

7768
for job in Job.select().where((Job.status == running) | (Job.status == queued)):
78-
sp.run(['qdel', 'erna_{}'.format(job.id)])
69+
sp.run(['scancel', '--jobname=erna_{}'.format(job.id)])
7970
job.status = inserted
8071
job.save()

erna/automatic_processing/database.py

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
__all__ = [
1616
'RawDataFile', 'DrsFile',
17-
'Jar', 'XML', 'Job', 'Queue',
17+
'Jar', 'XML', 'Job',
1818
'ProcessingState',
1919
'database', 'setup_database',
2020
]
@@ -31,12 +31,6 @@
3131
'walltime_exceeded',
3232
]
3333

34-
WALLTIMES = {
35-
'fact_short': 60 * 60,
36-
'fact_medium': 6 * 60 * 60,
37-
'fact_long': 7 * 24 * 60 * 60,
38-
}
39-
4034

4135
class RetryMySQLDatabase(RetryOperationalError, MySQLDatabase):
4236
''' Automatically reconnect when connection went down'''
@@ -71,9 +65,6 @@ def setup_database(database, drop=False):
7165
for description in PROCESSING_STATES:
7266
ProcessingState.get_or_create(description=description)
7367

74-
for name, walltime in WALLTIMES.items():
75-
Queue.get_or_create(name=name, walltime=walltime)
76-
7768

7869
class File(Model):
7970
night = NightField()
@@ -85,9 +76,9 @@ class Meta:
8576
database = database
8677
indexes = ((('night', 'run_id'), True), ) # unique index
8778

88-
def get_path(self):
79+
def get_path(self, basepath='/fact/raw'):
8980
return os.path.join(
90-
'/fact/raw',
81+
basepath,
9182
str(self.night.year),
9283
'{:02d}'.format(self.night.month),
9384
'{:02d}'.format(self.night.day),
@@ -166,25 +157,16 @@ def __repr__(self):
166157
return '{}'.format(self.description)
167158

168159

169-
class Queue(Model):
170-
name = CharField(unique=True)
171-
walltime = IntegerField()
172-
173-
class Meta:
174-
database = database
175-
db_table = 'queues'
176-
177-
178160
class Job(Model):
179161
raw_data_file = ForeignKeyField(RawDataFile, related_name='raw_data_file')
180162
drs_file = ForeignKeyField(DrsFile, related_name='drs_file')
181163
jar = ForeignKeyField(Jar, related_name='jar')
182164
result_file = CharField(null=True)
183165
status = ForeignKeyField(ProcessingState, related_name='status')
184166
priority = IntegerField(default=5)
167+
walltime = IntegerField(default=60)
185168
xml = ForeignKeyField(XML)
186169
md5hash = FixedCharField(32, null=True)
187-
queue = ForeignKeyField(Queue, related_name='queue')
188170

189171
class Meta:
190172
database = database
@@ -193,7 +175,8 @@ class Meta:
193175
(('raw_data_file', 'jar', 'xml'), True), # unique constraint
194176
)
195177

196-
MODELS = [RawDataFile, DrsFile, Jar, XML, Job, ProcessingState, Queue]
178+
179+
MODELS = [RawDataFile, DrsFile, Jar, XML, Job, ProcessingState]
197180

198181

199182
@wrapt.decorator

erna/automatic_processing/database_utils.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ def find_drs_file(raw_data_file, closest=True):
9999
'''
100100
query = DrsFile.select()
101101
query = query.where(DrsFile.night == raw_data_file.night)
102-
query = query.where(DrsFile.available)
103102

104103
if raw_data_file.roi == 300:
105104
query = query.where((DrsFile.drs_step == 2) & (DrsFile.roi == 300))
@@ -129,7 +128,7 @@ def insert_new_job(
129128
raw_data_file,
130129
jar,
131130
xml,
132-
queue,
131+
walltime,
133132
priority=5,
134133
closest_drs_file=True,
135134
):
@@ -144,8 +143,8 @@ def insert_new_job(
144143
the fact-tools jar to use
145144
xml: XML
146145
the xml to use
147-
queue: Queue
148-
the queue to use
146+
walltime: walltime
147+
the walltime to use
149148
priority: int
150149
Priority for the Job. Lower numbers mean more important.
151150
closest_drs_file: bool
@@ -169,7 +168,7 @@ def insert_new_job(
169168
raw_data_file=raw_data_file,
170169
drs_file=drs_file,
171170
jar=jar,
172-
queue=queue,
171+
walltime=walltime,
173172
status=ProcessingState.get(description='inserted'),
174173
priority=priority,
175174
xml=xml,
@@ -179,7 +178,7 @@ def insert_new_job(
179178

180179

181180
@requires_database_connection
182-
def insert_new_jobs(raw_data_files, jar, xml, queue, progress=True, **kwargs):
181+
def insert_new_jobs(raw_data_files, jar, xml, walltime, progress=True, **kwargs):
183182

184183
if isinstance(raw_data_files, list):
185184
total = len(raw_data_files)
@@ -189,7 +188,7 @@ def insert_new_jobs(raw_data_files, jar, xml, queue, progress=True, **kwargs):
189188
failed_files = []
190189
for f in tqdm(raw_data_files, total=total, disable=not progress):
191190
try:
192-
insert_new_job(f, jar=jar, xml=xml, queue=queue, **kwargs)
191+
insert_new_job(f, jar=jar, xml=xml, walltime=walltime, **kwargs)
193192
except peewee.IntegrityError:
194193
log.warning('Job already submitted: {}_{:03d}'.format(f.night, f.run_id))
195194
except ValueError as e:
@@ -281,17 +280,12 @@ def build_output_base_name(job):
281280

282281

283282
@requires_database_connection
284-
def resubmit_walltime_exceeded(old_queue, new_queue):
283+
def resubmit_walltime_exceeded(factor=1.5):
285284
'''
286285
Resubmit jobs where walltime was exceeded.
287-
Change queue from old_queue to new_queue
288286
'''
289-
if old_queue.walltime >= new_queue.walltime:
290-
raise ValueError('New queue must have longer walltime for this to make sense')
291-
292287
return (
293288
Job
294-
.update(queue=new_queue, status=ProcessingState.get(description='inserted'))
289+
.update(walltime=factor * Job.walltime, status=ProcessingState.get(description='inserted'))
295290
.where(Job.status == ProcessingState.get(description='walltime_exceeded'))
296-
.where(Job.queue == old_queue)
297291
).execute()

erna/automatic_processing/job_submitter.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from .database import ProcessingState, requires_database_connection
77
from .database_utils import count_jobs, get_pending_jobs
8-
from .qsub import submit_job, get_current_jobs
8+
from .slurm import submit_job, get_current_jobs
99

1010
log = logging.getLogger(__name__)
1111

@@ -16,7 +16,10 @@ def __init__(
1616
self,
1717
interval,
1818
max_queued_jobs,
19-
data_directory,
19+
raw_dir,
20+
aux_dir,
21+
erna_dir,
22+
script,
2023
host,
2124
port,
2225
group,
@@ -32,7 +35,7 @@ def __init__(
3235
Maximum number of jobs in the queue of the grid engine
3336
No new jobs are submitted if the number of jobs in the queue is
3437
higher than this value
35-
data_directory: str
38+
erna_directory: str
3639
patch to the basic structure for erna. Logfiles, jars, xmls and
3740
analysis output are stored in subdirectories to this directory.
3841
host: str
@@ -48,12 +51,15 @@ def __init__(
4851
self.event = Event()
4952
self.interval = interval
5053
self.max_queued_jobs = max_queued_jobs
51-
self.data_directory = data_directory
54+
self.erna_dir = erna_dir
55+
self.aux_dir = aux_dir
56+
self.raw_dir = raw_dir
5257
self.host = host
5358
self.port = port
5459
self.group = group
5560
self.mail_settings = mail_settings
5661
self.mail_address = mail_address
62+
self.script = script
5763

5864
def run(self):
5965
while not self.event.is_set():
@@ -62,7 +68,7 @@ def run(self):
6268
except peewee.OperationalError:
6369
log.warning('Lost database connection')
6470
except Exception as e:
65-
log.exception('Error during submission')
71+
log.exception('Error during submission: {}'.format(e))
6672
self.event.wait(self.interval)
6773

6874
def terminate(self):
@@ -92,8 +98,11 @@ def process_pending_jobs(self):
9298
try:
9399
submit_job(
94100
job,
95-
output_base_dir=os.path.join(self.data_directory, 'fact-tools'),
96-
data_dir=self.data_directory,
101+
script=self.script,
102+
output_base_dir=os.path.join(self.erna_dir, 'fact-tools'),
103+
raw_dir=self.raw_dir,
104+
aux_dir=self.aux_dir,
105+
erna_dir=self.erna_dir,
97106
mail_address=self.mail_address,
98107
mail_settings=self.mail_settings,
99108
submitter_host=self.host,
@@ -103,5 +112,5 @@ def process_pending_jobs(self):
103112
log.info('New job with id {} queued'.format(job.id))
104113
except:
105114
log.exception('Could not submit job')
106-
job.status = ProcessingState.get(description='error')
115+
job.status = ProcessingState.get(description='failed')
107116
job.save()

0 commit comments

Comments
 (0)