11
11
import time
12
12
from multiprocessing import Pool
13
13
import pickle
14
- #icetray.I3Logger.global_logger = icetray.I3NullLogger()
14
+ import argparse
15
+ import sys
16
+
17
+ parser = argparse .ArgumentParser ()
18
+ parser .add_argument ("-config" , "--config" , type = str , required = True )
19
+ args = parser .parse_args ()
20
+
21
+
15
22
16
23
def Contains_RetroReco (frame ):
17
24
try :
@@ -248,6 +255,7 @@ def WriteDicts(settings):
248
255
if verbose > 0 :
249
256
print ('Worker %s Reading %s' % (id ,input_file .split ('/' )[- 1 ]))
250
257
print (input_file )
258
+ sys .stdout .flush ()
251
259
gcd_count += 1
252
260
253
261
while i3_file .more () :
@@ -281,6 +289,7 @@ def WriteDicts(settings):
281
289
if len (retros )> 0 :
282
290
retro_big = retro_big .append (retro , ignore_index = True , sort = True )
283
291
if len (truth_big ) >= max_dict_size :
292
+ print ('saving' )
284
293
engine = sqlalchemy .create_engine ('sqlite:///' + outdir + '/%s/tmp/worker-%s-%s.db' % (db_name ,id ,output_count ))
285
294
truth_big .to_sql ('truth' ,engine ,index = False , if_exists = 'append' )
286
295
if len (retro_big )> 0 :
@@ -296,6 +305,7 @@ def WriteDicts(settings):
296
305
if verbose > 0 :
297
306
print ('Worker %s has finished %s/%s I3 files.' % (id , file_counter , len (input_files )))
298
307
if (len (feature_big ) > 0 ):
308
+ print ('saving eof' )
299
309
engine = sqlalchemy .create_engine ('sqlite:///' + outdir + '/%s/tmp/worker-%s-%s.db' % (db_name ,id ,output_count ))
300
310
truth_big .to_sql ('truth' ,engine ,index = False , if_exists = 'append' )
301
311
if len (retro_big )> 0 :
@@ -363,8 +373,18 @@ def WalkDirectory(dir, extensions):
363
373
gcds_folder .append (gcd_folder )
364
374
files_list .extend (i3files_folder )
365
375
GCD_list .extend (gcds_folder )
376
+
377
+ files_list , GCD_list = PairWiseShuffle (files_list , GCD_list )
366
378
return files_list , GCD_list
367
379
380
+ def PairWiseShuffle (files_list , gcd_list ):
381
+ df = pd .DataFrame ({'i3' : files_list , 'gcd' : gcd_list })
382
+ df_shuffled = df .sample (frac = 1 )
383
+ i3_shuffled = df_shuffled ['i3' ].tolist ()
384
+ gcd_shuffled = df_shuffled ['gcd' ].tolist ()
385
+ return i3_shuffled , gcd_shuffled
386
+
387
+
368
388
def FindFiles (paths ,outdir ,db_name ,gcd_rescue , extensions = None ):
369
389
print ('Counting files in: \n %s\n This might take a few minutes...' % paths )
370
390
if extensions == None :
@@ -379,10 +399,10 @@ def FindFiles(paths,outdir,db_name,gcd_rescue, extensions = None):
379
399
input_files .extend (input_files_mid )
380
400
gcd_files .extend (gcd_files_mid )
381
401
382
-
402
+ input_files , gcd_files = PairWiseShuffle ( input_files , gcd_files )
383
403
Save_Filenames (input_files , outdir , db_name )
384
-
385
404
return input_files , gcd_files
405
+
386
406
def Save_Filenames (input_files ,outdir , db_name ):
387
407
input_files = pd .DataFrame (input_files )
388
408
input_files .columns = ['filename' ]
@@ -405,8 +425,8 @@ def PickleCleaner(List):
405
425
clean_list .append (str (element ))
406
426
return clean_list
407
427
408
- def Extract_Config ():
409
- with open ('tmp /config/config .pkl' , 'rb' ) as handle :
428
+ def Extract_Config (config_path ):
429
+ with open ('%s /config.pkl' % config_path , 'rb' ) as handle :
410
430
config = pickle .load (handle )
411
431
paths = PickleCleaner (config ['paths' ])
412
432
@@ -429,10 +449,10 @@ def Extract_Config():
429
449
custom_truth = None
430
450
return paths , outdir , workers , pulse_keys , db_name , max_dictionary_size , custom_truth , gcd_rescue , verbose
431
451
def Transmit_Start_Time (start_time ,config_path ):
432
- with open (config_path , 'rb' ) as handle :
452
+ with open ('%s/config.pkl' % config_path , 'rb' ) as handle :
433
453
config = pickle .load (handle )
434
454
config ['start_time' ] = start_time
435
- with open (config_path , 'wb' ) as handle :
455
+ with open ('%s/config.pkl' % config_path , 'wb' ) as handle :
436
456
pickle .dump (config , handle , protocol = 2 )
437
457
return
438
458
@@ -443,7 +463,9 @@ def PrintMessage(workers, input_files):
443
463
print ('----------------------' )
444
464
def CreateTemporaryDatabases (paths , outdir , workers , pulse_keys ,config_path , start_time ,db_name ,gcd_rescue ,verbose ,max_dictionary_size = 10000 , custom_truth = None ):
445
465
if __name__ == "__main__" :
446
- start_time = time .time ()
466
+ start_time = time .time ()
467
+ if verbose == 0 :
468
+ icetray .I3Logger .global_logger = icetray .I3NullLogger ()
447
469
directory_exists = CreateOutDirectory (outdir + '/%s/tmp' % db_name )
448
470
input_files , gcd_files = FindFiles (paths , outdir ,db_name ,gcd_rescue )
449
471
if workers > len (input_files ):
@@ -463,13 +485,15 @@ def CreateTemporaryDatabases(paths, outdir, workers, pulse_keys,config_path, sta
463
485
p .map_async (WriteDicts , settings )
464
486
p .close ()
465
487
p .join ()
488
+ print ('transmitting..' )
466
489
Transmit_Start_Time (start_time , config_path )
467
490
468
491
469
492
start_time = time .time ()
470
493
471
- paths , outdir , workers , pulse_keys , db_name , max_dictionary_size , custom_truth , gcd_rescue , verbose = Extract_Config ()
472
- CreateTemporaryDatabases (paths , outdir , workers , pulse_keys ,'tmp/config/config.pkl' , start_time ,db_name ,gcd_rescue ,verbose , max_dictionary_size , custom_truth )
494
+ paths , outdir , workers , pulse_keys , db_name , max_dictionary_size , custom_truth , gcd_rescue , verbose = Extract_Config (args .config )
495
+ print (args .config )
496
+ CreateTemporaryDatabases (paths , outdir , workers , pulse_keys ,args .config , start_time ,db_name ,gcd_rescue ,verbose , max_dictionary_size , custom_truth )
473
497
474
498
475
499
0 commit comments