From ed8fce1e5f3cb23c696bd39952b7dc0344546e58 Mon Sep 17 00:00:00 2001 From: Josh Sixsmith Date: Wed, 14 Sep 2016 17:21:48 +1000 Subject: [PATCH] Update to allow processing of a list of scenes as opposed to generating a list of scenes to process from within NBAR. --- workflow/nbar.py | 102 ++++++++++++++++++++++++++++------------------- 1 file changed, 61 insertions(+), 41 deletions(-) diff --git a/workflow/nbar.py b/workflow/nbar.py index 5a56fc4..363194f 100755 --- a/workflow/nbar.py +++ b/workflow/nbar.py @@ -14,6 +14,7 @@ import os from os.path import join as pjoin, basename, dirname, exists import subprocess +import string import logging import glob import shutil @@ -1716,7 +1717,7 @@ def run(self): shutil.rmtree(self.work_path) -def is_valid_directory(parser, arg): +def is_valid_path(parser, arg): """Used by argparse""" if not exists(arg): parser.error("{path} does not exist".format(path=arg)) @@ -1734,27 +1735,18 @@ def scatter(iterable, P=1, p=1): return itertools.islice(iterable, p-1, None, P) -def main(inpath, outpath, workpath, nnodes=1, nodenum=1): - l1t_files = sorted([pjoin(inpath, f) for f in os.listdir(inpath) if - '_OTH_' in f]) - filtered_l1t = [] - for l1t in l1t_files: - acq = gaip.acquisitions(l1t)[0] - if ((87 <= acq.path <= 116) & (67 <= acq.row <= 91)): - completed = pjoin(workpath, (basename(l1t).replace('OTH', 'NBAR') + - '.completed')) - if not exists(completed): - filtered_l1t.append(l1t) - else: - msg = "Skipping {}".format(acq.dir_name) - logging.info(msg) - - # create product output dirs - products = CONFIG.get('packaging', 'products').split(',') - for product in products: - product_dir = pjoin(outpath, product) - if not exists(product_dir): - os.makedirs(product_dir) +def main(l1t_path, outpath, workpath, l1t_list, nnodes=1, nodenum=1): + + # allow to use "{year}" and "{month}" in l1t_path, outpath, and workpath + # first determine "{year}" and "{month}" positions + year_pos = month_pos = -1 + formatter = string.Formatter() + tmp = formatter.parse(l1t_path) + pos = 0 + for lstr, fname, fs, cv in tmp: + pos += len(lstr) + if fname == "year": year_pos = pos; pos += 4 + if fname == "month": month_pos = pos; pos += 2 # Setup Software Versions for Packaging ptype.register_software_version( @@ -1768,13 +1760,42 @@ def main(inpath, outpath, workpath, nnodes=1, nodenum=1): repo_url='http://www.ontar.com/software/productdetails.aspx?item=modtran' ) - l1t_files = [f for f in scatter(filtered_l1t, nnodes, nodenum)] - nbar_files = [pjoin(workpath, os.path.basename(f).replace('OTH', 'NBAR')) - for f in l1t_files] - # tasks = [TerrainCorrection(l1t, nbar) for l1t, nbar in - # tasks = [WriteMetadata(l1t, nbar) for l1t, nbar in - tasks = [PackageTC(l1t, nbar, outpath) for l1t, nbar in - zip(l1t_files, nbar_files)] + tasks = [] + products = CONFIG.get('packaging', 'products').split(',') + for l1t in open(l1t_list).readlines(): + l1t = l1t.strip() + if l1t == '': continue + # get {year} and {month} values + year = month = 0 + if year_pos > -1: year = int(l1t[year_pos:year_pos+4]) + if month_pos > -1: month = int(l1t[month_pos:month_pos+2]) + bf = basename(l1t) + + acq = gaip.acquisitions(l1t)[0] + workdir = workpath.format(year = year, month = month) + if not exists(workdir): os.makedirs(workdir) + if ((87 <= acq.path <= 116) & (67 <= acq.row <= 91)): + completed = pjoin(workdir, + (bf.replace('OTH', 'NBAR') + '.completed')) + if exists(completed): + msg = "Skipping {}".format(l1t) + logging.info(msg) + continue + + # create product output dirs + outdir = outpath.format(year = year, month = month) + if not exists(outdir): + os.makedirs(outdir) + os.makedirs(pjoin(outdir, "nbar")) + os.makedirs(pjoin(outdir, "nbart")) + for product in products: + product_dir = pjoin(outdir, product) + if not exists(product_dir): os.makedirs(product_dir) + + nbar = pjoin(workdir, bf.replace('OTH', 'NBAR')) + tasks.append(PackageTC(l1t, nbar, outdir)) + + tasks = [f for f in scatter(tasks, nnodes, nodenum)] ncpus = int(os.getenv('PBS_NCPUS', '1')) luigi.build(tasks, local_scheduler=True, workers=ncpus / nnodes) @@ -1782,21 +1803,22 @@ def main(inpath, outpath, workpath, nnodes=1, nodenum=1): if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("--l1t_path", help=("Path to directory containing L1T " - "datasets"), required=True, - type=lambda x: is_valid_directory(parser, x)) + "datasets"), required=True) parser.add_argument("--out_path", help=("Path to directory where NBAR " - "dataset are to be written"), required=True, - type=lambda x: is_valid_directory(parser, x)) + "dataset are to be written"), required=True) parser.add_argument('--cfg', help='Path to a user defined configuration file.') parser.add_argument("--log_path", help=("Path to directory where where log" " files will be written"), default='.', - type=lambda x: is_valid_directory(parser, x)) + type=lambda x: is_valid_path(parser, x)) + parser.add_argument("--l1t_list", + help="A file listing full path of L1T datasets", + required=True, + type=lambda x: is_valid_path(parser, x)) parser.add_argument("--debug", help=("Selects more detail logging (default" " is INFO)"), default=False, action='store_true') parser.add_argument("--work_path", help=("Path to a directory where the " - "intermediate files will be written."), required=False, - type=lambda x: is_valid_directory(parser, x)) + "intermediate files will be written."), required=False) args = parser.parse_args() @@ -1823,12 +1845,10 @@ def main(inpath, outpath, workpath, nnodes=1, nodenum=1): format=("%(asctime)s: [%(name)s] (%(levelname)s) " "%(message)s "), datefmt='%H:%M:%S') - # use the disk of the local node if we can - # working directly off the lustre drive seems to flaky if args.work_path is None: - work_path = tempfile.mkdtemp() - else: work_path = args.out_path + else: + work_path = args.work_path logging.info("nbar.py started") logging.info('l1t_path={path}'.format(path=args.l1t_path)) @@ -1837,4 +1857,4 @@ def main(inpath, outpath, workpath, nnodes=1, nodenum=1): size = int(os.getenv('PBS_NNODES', '1')) rank = int(os.getenv('PBS_VNODENUM', '1')) - main(args.l1t_path, args.out_path, work_path, size, rank) + main(args.l1t_path, args.out_path, work_path, args.l1t_list, size, rank)