88from bioblend .galaxy import GalaxyInstance , dataset_collections
99from lib import INVOCATIONS_DIR , METRICS_DIR , Keys
1010from lib .common import (Context , _get_dataset_data , _make_dataset_element ,
11- connect , print_json )
11+ connect , print_json , try_for )
1212from lib .history import wait_for
1313
1414log = logging .getLogger ('abm' )
1515
1616
1717def run_cli (context : Context , args : list ):
1818 """
19- Runs a single workflow defined by *args[0]*
19+ Command line handler to run a single benchmark.
2020
21- :param args: a list that contains:
22- args[0] - the path to the benchmark configuration file
23- args[1] - the prefix to use when creating the new history in Galaxy
24- args[2] - the name of the experiment, if part of one. This is used to
25- generate output folder names.
21+ :param context: a context object the defines how to connect to the Galaxy server.
22+ :param args: parameters from the command line
2623
2724 :return: True if the workflows completed sucessfully. False otherwise.
2825 """
@@ -43,11 +40,15 @@ def run_cli(context: Context, args: list):
4340
4441
4542def run (context : Context , workflow_path , history_prefix : str , experiment : str ):
46- # if len(args) > 1:
47- # history_prefix = args[1]
48- # if len(args) > 2:
49- # experiment = args[2].replace(' ', '_').lower()
43+ """
44+ Does the actual work of running a benchmark.
5045
46+ :param context: a context object the defines how to connect to the Galaxy server.
47+ :param workflow_path: path to the ABM workflow file. (benchmark really). NOTE this is NOT the Galaxy .ga file.
48+ :param history_prefix: a prefix value used when generating new history names.
49+ :param experiment: the name of the experiment (arbitrary string). Used to generate new history names.
50+ :return: True if the workflow run completed successfully. False otherwise.
51+ """
5152 if os .path .exists (INVOCATIONS_DIR ):
5253 if not os .path .isdir (INVOCATIONS_DIR ):
5354 print ('ERROR: Can not save invocation status, directory name in use.' )
@@ -76,7 +77,7 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
7677 workflows = parse_workflow (workflow_path )
7778 if not workflows :
7879 print (f"Unable to load any workflow definitions from { workflow_path } " )
79- return
80+ return False
8081
8182 print (f"Found { len (workflows )} workflow definitions" )
8283 for workflow in workflows :
@@ -144,11 +145,13 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
144145 dsid = find_collection_id (gi , dsname )
145146 dsdata = _get_dataset_data (gi , dsid )
146147 if dsdata is None :
147- raise Exception (
148- f"ERROR: unable to resolve { dsname } to a dataset."
149- )
150- dsid = dsdata ['id' ]
151- dssize = dsdata ['size' ]
148+ # raise Exception(
149+ # f"ERROR: unable to resolve {dsname} to a dataset."
150+ # )
151+ dssize = 0
152+ else :
153+ dsid = dsdata ['id' ]
154+ dssize = dsdata ['size' ]
152155 input_data_size .append (dssize )
153156 print (f"Input collection ID: { dsname } [{ dsid } ] { dssize } " )
154157 inputs [input [0 ]] = {'id' : dsid , 'src' : 'hdca' , 'size' : dssize }
@@ -173,7 +176,7 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
173176 histories = gi .histories .get_histories (name = spec ['history' ])
174177 if len (histories ) == 0 :
175178 print (f"ERROR: History { spec ['history' ]} not foune" )
176- return
179+ return False
177180 hid = histories [0 ]['id' ]
178181 pairs = 0
179182 paired_list = spec ['paired' ]
@@ -183,7 +186,13 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
183186 for key in item .keys ():
184187 # print(f"Getting dataset for {key} = {item[key]}")
185188 value = _get_dataset_data (gi , item [key ])
186- size += value ['size' ]
189+ if value is None :
190+ print (
191+ f"ERROR: Unable to find dataset { item [key ]} "
192+ )
193+ return
194+ if size in value :
195+ size += value ['size' ]
187196 elements .append (
188197 _make_dataset_element (key , value ['id' ])
189198 )
@@ -224,16 +233,20 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
224233 else :
225234 raise Exception (f'Invalid input value' )
226235 print (f"Running workflow { wfid } in history { new_history_name } " )
227- invocation = gi .workflows .invoke_workflow (
236+ f = lambda : gi .workflows .invoke_workflow (
228237 wfid , inputs = inputs , history_name = new_history_name
229238 )
239+ invocation = try_for (f , 3 )
230240 id = invocation ['id' ]
231241 # invocations = gi.invocations.wait_for_invocation(id, 86400, 10, False)
242+ f = lambda : gi .invocations .wait_for_invocation (id , 86400 , 10 , False )
232243 try :
233- invocations = gi .invocations .wait_for_invocation (id , 86400 , 10 , False )
234- except :
244+ invocations = try_for (f , 2 )
245+ except Exception as e :
246+ print (f"Exception waiting for invocations" )
235247 pprint (invocation )
236248 sys .exc_info ()
249+ raise e
237250 print ("Waiting for jobs" )
238251 if history_prefix is not None :
239252 parts = history_prefix .split ()
@@ -265,6 +278,14 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
265278
266279
267280def translate (context : Context , args : list ):
281+ """
282+ Translates the human readable names of datasets and workflows in to the Galaxy
283+ ID that is unique to each server.
284+
285+ :param context: the conext object used to connect to the Galaxy server
286+ :param args: [0] the path to the benchmarking YAML file to translate
287+ :return: Nothing. Prints the translated workflow file to stdout.
288+ """
268289 if len (args ) == 0 :
269290 print ('ERROR: no workflow configuration specified' )
270291 return
@@ -307,6 +328,14 @@ def translate(context: Context, args: list):
307328
308329
309330def validate (context : Context , args : list ):
331+ """
332+ Checks to see if the workflow and all datasets defined in the benchmark can
333+ be found on the server.
334+
335+ :param context: the context object used to connect to the Galaxy instance
336+ :param args: [0] the benchmark YAML file to be validated.
337+ :return:
338+ """
310339 if len (args ) == 0 :
311340 print ('ERROR: no workflow configuration specified' )
312341 return
@@ -412,10 +441,10 @@ def validate(context: Context, args: list):
412441
413442
414443def wait_for_jobs (context , gi : GalaxyInstance , invocations : dict ):
415- """Blocks until all jobs defined in the *invocations* to complete.
444+ """Blocks until all jobs defined in *invocations* are complete (in a terminal state) .
416445
417446 :param gi: The *GalaxyInstance** running the jobs
418- :param invocations:
447+ :param invocations: a dictionary containing information about the jobs invoked
419448 :return:
420449 """
421450 wfid = invocations ['workflow_id' ]
@@ -429,6 +458,7 @@ def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict):
429458 jobs = gi .jobs .get_jobs (history_id = hid )
430459 for job in jobs :
431460 data = gi .jobs .show_job (job ['id' ], full_details = True )
461+ data ['job_metrics' ] = gi .jobs .get_job_metrics (job ['id' ])
432462 metrics = {
433463 'run' : run ,
434464 'cloud' : cloud ,
@@ -485,6 +515,11 @@ def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict):
485515
486516
487517def parse_workflow (workflow_path : str ):
518+ """
519+ Loads the benchmark YAML file.
520+ :param workflow_path: the path to the file to be loaded.
521+ :return: a dictionary containing the benchmark.
522+ """
488523 if not os .path .exists (workflow_path ):
489524 print (f'ERROR: could not find workflow file { workflow_path } ' )
490525 return None
@@ -503,6 +538,14 @@ def parse_workflow(workflow_path: str):
503538
504539
505540def find_workflow_id (gi , name_or_id ):
541+ """
542+ Resolves the human-readable name for a workflow into the unique ID on the
543+ Galaxy instance.
544+
545+ :param gi: the connection object to the Galaxy instance
546+ :param name_or_id: the name of the workflow
547+ :return: The Galaxy workflow ID or None if the workflow could not be located
548+ """
506549 try :
507550 wf = gi .workflows .show_workflow (name_or_id )
508551 return wf ['id' ]
@@ -519,7 +562,14 @@ def find_workflow_id(gi, name_or_id):
519562
520563
521564def find_dataset_id (gi , name_or_id ):
522- # print(f"Finding dataset {name_or_id}")
565+ """
566+ Resolves the human-readable name if a dataset into the unique ID on the
567+ Galaxy instance
568+
569+ :param gi: the connection object to the Galaxy instance
570+ :param name_or_id: the name of the dataset.
571+ :return: the Galaxy dataset ID or None if the dataset could not be located.
572+ """
523573 try :
524574 ds = gi .datasets .show_dataset (name_or_id )
525575 return ds ['id' ]
@@ -544,6 +594,14 @@ def find_dataset_id(gi, name_or_id):
544594
545595
546596def find_collection_id (gi , name ):
597+ """
598+ Resolves a human-readable collection name into the unique Galaxy ID.
599+
600+ :param gi: the connection object to the Galaxy instance
601+ :param name: the name of the collection to resolve
602+ :return: The unique Galaxy ID of the collection or None if the collection
603+ can not be located.
604+ """
547605 kwargs = {'limit' : 10000 , 'offset' : 0 }
548606 datasets = gi .datasets .get_datasets (** kwargs )
549607 if len (datasets ) == 0 :
@@ -565,7 +623,22 @@ def find_collection_id(gi, name):
565623
566624
567625def test (context : Context , args : list ):
568- id = 'c90fffcf98b31cd3'
626+ """
627+ Allows running testing code from the command line.
628+
629+ :param context: a connection object to a Galaxy instance
630+ :param args: varies
631+ :return: varies, typically None.
632+ """
633+ # id = 'c90fffcf98b31cd3'
634+ # gi = connect(context)
635+ # inputs = gi.workflows.get_workflow_inputs(id, 'PE fastq input')
636+ # pprint(inputs)
637+
569638 gi = connect (context )
570- inputs = gi .workflows .get_workflow_inputs (id , 'PE fastq input' )
571- pprint (inputs )
639+ print ("Calling find_collection_id" )
640+ dsid = find_collection_id (gi , args [0 ])
641+ print (f"Collection ID: { dsid } " )
642+ print ("Calling _get_dataset_data" )
643+ dsdata = _get_dataset_data (gi , dsid )
644+ pprint (dsdata )
0 commit comments