Skip to content

Commit

Permalink
Update to allow processing of a list of scenes as opposed to generati…
Browse files Browse the repository at this point in the history
…ng a list of scenes to process from within NBAR.
  • Loading branch information
sixy6e committed Sep 14, 2016
1 parent a2a9982 commit ed8fce1
Showing 1 changed file with 61 additions and 41 deletions.
102 changes: 61 additions & 41 deletions workflow/nbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import os
from os.path import join as pjoin, basename, dirname, exists
import subprocess
import string
import logging
import glob
import shutil
Expand Down Expand Up @@ -1716,7 +1717,7 @@ def run(self):
shutil.rmtree(self.work_path)


def is_valid_directory(parser, arg):
def is_valid_path(parser, arg):
"""Used by argparse"""
if not exists(arg):
parser.error("{path} does not exist".format(path=arg))
Expand All @@ -1734,27 +1735,18 @@ def scatter(iterable, P=1, p=1):
return itertools.islice(iterable, p-1, None, P)


def main(inpath, outpath, workpath, nnodes=1, nodenum=1):
l1t_files = sorted([pjoin(inpath, f) for f in os.listdir(inpath) if
'_OTH_' in f])
filtered_l1t = []
for l1t in l1t_files:
acq = gaip.acquisitions(l1t)[0]
if ((87 <= acq.path <= 116) & (67 <= acq.row <= 91)):
completed = pjoin(workpath, (basename(l1t).replace('OTH', 'NBAR') +
'.completed'))
if not exists(completed):
filtered_l1t.append(l1t)
else:
msg = "Skipping {}".format(acq.dir_name)
logging.info(msg)

# create product output dirs
products = CONFIG.get('packaging', 'products').split(',')
for product in products:
product_dir = pjoin(outpath, product)
if not exists(product_dir):
os.makedirs(product_dir)
def main(l1t_path, outpath, workpath, l1t_list, nnodes=1, nodenum=1):

# allow to use "{year}" and "{month}" in l1t_path, outpath, and workpath
# first determine "{year}" and "{month}" positions
year_pos = month_pos = -1
formatter = string.Formatter()
tmp = formatter.parse(l1t_path)
pos = 0
for lstr, fname, fs, cv in tmp:
pos += len(lstr)
if fname == "year": year_pos = pos; pos += 4
if fname == "month": month_pos = pos; pos += 2

# Setup Software Versions for Packaging
ptype.register_software_version(
Expand All @@ -1768,35 +1760,65 @@ def main(inpath, outpath, workpath, nnodes=1, nodenum=1):
repo_url='http://www.ontar.com/software/productdetails.aspx?item=modtran'
)

l1t_files = [f for f in scatter(filtered_l1t, nnodes, nodenum)]
nbar_files = [pjoin(workpath, os.path.basename(f).replace('OTH', 'NBAR'))
for f in l1t_files]
# tasks = [TerrainCorrection(l1t, nbar) for l1t, nbar in
# tasks = [WriteMetadata(l1t, nbar) for l1t, nbar in
tasks = [PackageTC(l1t, nbar, outpath) for l1t, nbar in
zip(l1t_files, nbar_files)]
tasks = []
products = CONFIG.get('packaging', 'products').split(',')
for l1t in open(l1t_list).readlines():
l1t = l1t.strip()
if l1t == '': continue
# get {year} and {month} values
year = month = 0
if year_pos > -1: year = int(l1t[year_pos:year_pos+4])
if month_pos > -1: month = int(l1t[month_pos:month_pos+2])
bf = basename(l1t)

acq = gaip.acquisitions(l1t)[0]
workdir = workpath.format(year = year, month = month)
if not exists(workdir): os.makedirs(workdir)
if ((87 <= acq.path <= 116) & (67 <= acq.row <= 91)):
completed = pjoin(workdir,
(bf.replace('OTH', 'NBAR') + '.completed'))
if exists(completed):
msg = "Skipping {}".format(l1t)
logging.info(msg)
continue

# create product output dirs
outdir = outpath.format(year = year, month = month)
if not exists(outdir):
os.makedirs(outdir)
os.makedirs(pjoin(outdir, "nbar"))
os.makedirs(pjoin(outdir, "nbart"))
for product in products:
product_dir = pjoin(outdir, product)
if not exists(product_dir): os.makedirs(product_dir)

nbar = pjoin(workdir, bf.replace('OTH', 'NBAR'))
tasks.append(PackageTC(l1t, nbar, outdir))

tasks = [f for f in scatter(tasks, nnodes, nodenum)]
ncpus = int(os.getenv('PBS_NCPUS', '1'))
luigi.build(tasks, local_scheduler=True, workers=ncpus / nnodes)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--l1t_path", help=("Path to directory containing L1T "
"datasets"), required=True,
type=lambda x: is_valid_directory(parser, x))
"datasets"), required=True)
parser.add_argument("--out_path", help=("Path to directory where NBAR "
"dataset are to be written"), required=True,
type=lambda x: is_valid_directory(parser, x))
"dataset are to be written"), required=True)
parser.add_argument('--cfg',
help='Path to a user defined configuration file.')
parser.add_argument("--log_path", help=("Path to directory where where log"
" files will be written"), default='.',
type=lambda x: is_valid_directory(parser, x))
type=lambda x: is_valid_path(parser, x))
parser.add_argument("--l1t_list",
help="A file listing full path of L1T datasets",
required=True,
type=lambda x: is_valid_path(parser, x))
parser.add_argument("--debug", help=("Selects more detail logging (default"
" is INFO)"), default=False, action='store_true')
parser.add_argument("--work_path", help=("Path to a directory where the "
"intermediate files will be written."), required=False,
type=lambda x: is_valid_directory(parser, x))
"intermediate files will be written."), required=False)

args = parser.parse_args()

Expand All @@ -1823,12 +1845,10 @@ def main(inpath, outpath, workpath, nnodes=1, nodenum=1):
format=("%(asctime)s: [%(name)s] (%(levelname)s) "
"%(message)s "), datefmt='%H:%M:%S')

# use the disk of the local node if we can
# working directly off the lustre drive seems to flaky
if args.work_path is None:
work_path = tempfile.mkdtemp()
else:
work_path = args.out_path
else:
work_path = args.work_path

logging.info("nbar.py started")
logging.info('l1t_path={path}'.format(path=args.l1t_path))
Expand All @@ -1837,4 +1857,4 @@ def main(inpath, outpath, workpath, nnodes=1, nodenum=1):

size = int(os.getenv('PBS_NNODES', '1'))
rank = int(os.getenv('PBS_VNODENUM', '1'))
main(args.l1t_path, args.out_path, work_path, size, rank)
main(args.l1t_path, args.out_path, work_path, args.l1t_list, size, rank)

0 comments on commit ed8fce1

Please sign in to comment.