14
14
from snipe .api .snipe_sig import SnipeSig
15
15
from snipe .api .reference_QC import ReferenceQC
16
16
17
+ import concurrent .futures
18
+ import signal
17
19
18
20
def process_sample (sample_path : str , ref_path : str , amplicon_path : Optional [str ],
19
21
advanced : bool , roi : bool , debug : bool ,
@@ -426,7 +428,7 @@ def qc(ref: str, sample: List[str], samples_from_file: Optional[str],
426
428
if samples_from_file :
427
429
logger .debug (f"Reading samples from file: { samples_from_file } " )
428
430
try :
429
- with open (samples_from_file , encoding = 'utf-8' ) as f :
431
+ with open (samples_from_file , 'r' , encoding = 'utf-8' ) as f :
430
432
file_samples = {line .strip () for line in f if line .strip ()}
431
433
samples_set .update (file_samples )
432
434
logger .debug (f"Collected { len (file_samples )} samples from file." )
@@ -492,32 +494,58 @@ def qc(ref: str, sample: List[str], samples_from_file: Optional[str],
492
494
"ychr" : ychr ,
493
495
"vars_paths" : vars_paths
494
496
})
497
+
498
+ results = []
495
499
500
+ # Define a handler for graceful shutdown
501
+ def shutdown (signum , frame ):
502
+ logger .warning ("Shutdown signal received. Terminating all worker processes..." )
503
+ executor .shutdown (wait = False , cancel_futures = True )
504
+ sys .exit (1 )
505
+
506
+ # Register signal handlers
507
+ signal .signal (signal .SIGINT , shutdown )
508
+ signal .signal (signal .SIGTERM , shutdown )
496
509
497
- # Process samples in parallel with progress bar
498
- results = []
499
- with concurrent .futures .ProcessPoolExecutor (max_workers = cores ) as executor :
500
- # Submit all tasks
501
- futures = {
502
- executor .submit (process_sample , ** args ): args for args in dict_process_args
503
- }
504
- # Iterate over completed futures with a progress bar
505
- for future in tqdm (concurrent .futures .as_completed (futures ), total = len (futures ), desc = "Processing samples" ):
506
- sample = futures [future ]
507
- try :
508
- result = future .result ()
509
- results .append (result )
510
- except Exception as exc :
511
- logger .error (f"Sample { sample } generated an exception: { exc } " )
512
- results .append ({
513
- "sample" : os .path .splitext (os .path .basename (sample ))[0 ],
514
- "file_path" : sample ,
515
- "QC_Error" : str (exc )
516
- })
517
-
518
- # Create pandas DataFrame
519
- logger .info ("Aggregating results into DataFrame." )
520
- df = pd .DataFrame (results )
510
+ try :
511
+ with concurrent .futures .ProcessPoolExecutor (max_workers = cores ) as executor :
512
+ futures = {
513
+ executor .submit (process_sample , ** args ): args for args in dict_process_args
514
+ }
515
+
516
+ for future in tqdm (concurrent .futures .as_completed (futures ), total = len (futures ), desc = "Processing samples" ):
517
+ sample = futures [future ]
518
+ try :
519
+ result = future .result ()
520
+ results .append (result )
521
+ except Exception as exc :
522
+ logger .error (f"Sample { sample ['sample_path' ]} generated an exception: { exc } " )
523
+ results .append ({
524
+ "sample" : os .path .splitext (os .path .basename (sample ['sample_path' ]))[0 ],
525
+ "file_path" : sample ['sample_path' ],
526
+ "QC_Error" : str (exc )
527
+ })
528
+ except KeyboardInterrupt :
529
+ logger .warning ("KeyboardInterrupt received. Shutting down..." )
530
+ sys .exit (1 )
531
+ except Exception as e :
532
+ logger .error (f"An unexpected error occurred: { e } " )
533
+ sys .exit (1 )
534
+
535
+ # Separate successful and failed results
536
+ succeeded = [res for res in results if "QC_Error" not in res ]
537
+ failed = [res for res in results if "QC_Error" in res ]
538
+
539
+ # Handle complete failure
540
+ if len (succeeded ) == 0 :
541
+ logger .error ("All samples failed during QC processing. Output TSV will not be generated." )
542
+ sys .exit (1 )
543
+
544
+ # Prepare the command-line invocation for comments
545
+ command_invocation = ' ' .join (sys .argv )
546
+
547
+ # Create pandas DataFrame for succeeded samples
548
+ df = pd .DataFrame (succeeded )
521
549
522
550
# Reorder columns to have 'sample' and 'file_path' first, if they exist
523
551
cols = list (df .columns )
@@ -529,14 +557,25 @@ def qc(ref: str, sample: List[str], samples_from_file: Optional[str],
529
557
reordered_cols += cols
530
558
df = df [reordered_cols ]
531
559
532
- # Export to TSV
560
+ # Export to TSV with comments
533
561
try :
534
- df .to_csv (output , sep = '\t ' , index = False )
562
+ with open (output , 'w' , encoding = 'utf-8' ) as f :
563
+ # Write comment with command invocation
564
+ f .write (f"# Command: { command_invocation } \n " )
565
+ # Write the DataFrame to the file
566
+ df .to_csv (f , sep = '\t ' , index = False )
535
567
logger .info (f"QC results successfully exported to { output } " )
536
568
except Exception as e :
537
569
logger .error (f"Failed to export QC results to { output } : { e } " )
538
570
sys .exit (1 )
539
571
572
+ # Report failed samples if any
573
+ if failed :
574
+ failed_samples = [res ['sample' ] for res in failed ]
575
+ logger .warning (f"The following { len (failed_samples )} sample(s) failed during QC processing:" )
576
+ for sample in failed_samples :
577
+ logger .warning (f"- { sample } " )
578
+
540
579
end_time = time .time ()
541
580
elapsed_time = end_time - start_time
542
581
logger .info (f"QC process completed in { elapsed_time :.2f} seconds." )
0 commit comments