Skip to content

Commit 37b5d19

Browse files
authored
Merge pull request #317 from nasa/issue_302
Issue 302: Temporal start date when revision_date query
2 parents ac5d334 + 6f7d467 commit 37b5d19

31 files changed

+90
-192
lines changed

cluster_provisioning/modules/common/main.tf

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2395,7 +2395,6 @@ resource "aws_lambda_permission" "event-misfire_lambda" {
23952395
# "JOB_QUEUE": "${var.project}-job_worker-hls_data_download",
23962396
# "JOB_TYPE": local.hls_download_job_type,
23972397
# "JOB_RELEASE": var.pcm_branch,
2398-
# "ISL_BUCKET_NAME": local.isl_bucket,
23992398
# "ENDPOINT": "OPS",
24002399
# "SMOKE_RUN": "true",
24012400
# "DRY_RUN": "true"
@@ -2451,7 +2450,6 @@ resource "aws_lambda_function" "hlsl30_query_timer" {
24512450
"JOB_QUEUE": "opera-job_worker-hls_data_query",
24522451
"JOB_TYPE": local.hlsl30_query_job_type,
24532452
"JOB_RELEASE": var.pcm_branch,
2454-
"ISL_BUCKET_NAME": local.isl_bucket,
24552453
"MINUTES": var.hlsl30_query_timer_trigger_frequency,
24562454
"PROVIDER": var.hls_provider,
24572455
"ENDPOINT": "OPS",
@@ -2487,7 +2485,6 @@ resource "aws_lambda_function" "hlss30_query_timer" {
24872485
"JOB_QUEUE": "opera-job_worker-hls_data_query",
24882486
"JOB_TYPE": local.hlss30_query_job_type,
24892487
"JOB_RELEASE": var.pcm_branch,
2490-
"ISL_BUCKET_NAME": local.isl_bucket,
24912488
"PROVIDER": var.hls_provider,
24922489
"ENDPOINT": "OPS",
24932490
"MINUTES": var.hlss30_query_timer_trigger_frequency,
@@ -2569,7 +2566,6 @@ resource "aws_lambda_permission" "hlss30_query_timer" {
25692566
# "JOB_QUEUE": "${var.project}-job_worker-slc_data_download",
25702567
# "JOB_TYPE": local.slc_download_job_type,
25712568
# "JOB_RELEASE": var.pcm_branch,
2572-
# "ISL_BUCKET_NAME": local.isl_bucket,
25732569
# "ENDPOINT": "OPS",
25742570
# "SMOKE_RUN": "true",
25752571
# "DRY_RUN": "true"
@@ -2625,7 +2621,6 @@ resource "aws_lambda_function" "slcs1a_query_timer" {
26252621
"JOB_QUEUE": "opera-job_worker-slc_data_query",
26262622
"JOB_TYPE": local.slcs1a_query_job_type,
26272623
"JOB_RELEASE": var.pcm_branch,
2628-
"ISL_BUCKET_NAME": local.isl_bucket,
26292624
"MINUTES": var.slcs1a_query_timer_trigger_frequency,
26302625
"PROVIDER": var.slc_provider,
26312626
"ENDPOINT": "OPS",

data_subscriber/daac_data_subscriber.py

100755100644
Lines changed: 50 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,13 @@ def rebuild_auth(self, prepared_request, response):
5757
headers = prepared_request.headers
5858
url = prepared_request.url
5959

60-
if 'Authorization' in headers:
60+
if "Authorization" in headers:
6161
original_parsed = requests.utils.urlparse(response.request.url)
6262
redirect_parsed = requests.utils.urlparse(url)
6363
if (original_parsed.hostname != redirect_parsed.hostname) and \
6464
redirect_parsed.hostname != self.auth_host and \
6565
original_parsed.hostname != self.auth_host:
66-
del headers['Authorization']
66+
del headers["Authorization"]
6767

6868

6969
async def run(argv: list[str]):
@@ -75,8 +75,8 @@ async def run(argv: list[str]):
7575
raise v
7676

7777
settings = SettingsConf().cfg
78-
edl = settings['DAAC_ENVIRONMENTS'][args.endpoint]['EARTHDATA_LOGIN']
79-
cmr = settings['DAAC_ENVIRONMENTS'][args.endpoint]['BASE_URL']
78+
edl = settings["DAAC_ENVIRONMENTS"][args.endpoint]["EARTHDATA_LOGIN"]
79+
cmr = settings["DAAC_ENVIRONMENTS"][args.endpoint]["BASE_URL"]
8080
netloc = urlparse(f"https://{edl}").netloc
8181
provider_esconn_map = {"LPCLOUD": get_hls_catalog_connection(logging.getLogger(__name__)),
8282
"ASF": get_slc_catalog_connection(logging.getLogger(__name__))}
@@ -87,7 +87,7 @@ async def run(argv: list[str]):
8787
update_url_index(es_conn, f.readlines(), None, None, None)
8888
exit(0)
8989

90-
loglevel = 'DEBUG' if args.verbose else 'INFO'
90+
loglevel = "DEBUG" if args.verbose else "INFO"
9191
logging.basicConfig(level=loglevel)
9292
logging.info("Log level set to " + loglevel)
9393

@@ -146,7 +146,7 @@ def create_parser():
146146
provider = {"positionals": ["-p", "--provider"],
147147
"kwargs": {"dest": "provider",
148148
"choices": ["LPCLOUD", "ASF"],
149-
"default": 'LPCLOUD',
149+
"default": "LPCLOUD",
150150
"help": "Specify a provider for collection search. Default is LPCLOUD."}}
151151

152152
collection = {"positionals": ["-c", "--collection-shortname"],
@@ -183,11 +183,6 @@ def create_parser():
183183
"script as a cron, this value should be equal to or greater than how often your "
184184
"cron runs (default: 60 minutes)."}}
185185

186-
isl_bucket = {"positionals": ["-i", "--isl-bucket"],
187-
"kwargs": {"dest": "isl_bucket",
188-
"required": True,
189-
"help": "The incoming storage location s3 bucket where data products will be downloaded."}}
190-
191186
transfer_protocol = {"positionals": ["-x", "--transfer-protocol"],
192187
"kwargs": {"dest": "transfer_protocol",
193188
"choices": ["s3", "https"],
@@ -233,6 +228,12 @@ def create_parser():
233228
"action": "store_true",
234229
"help": "Toggle for using temporal range rather than revision date (range) in the query."}}
235230

231+
temporal_start_date = {"positionals": ["--temporal-start-date"],
232+
"kwargs": {"dest": "temporal_start_date",
233+
"default": None,
234+
"help": "The ISO date time after which data should be retrieved. Only valid when --use-temporal is false/omitted. For Example, "
235+
"--temporal-start-date 2021-01-14T00:00:00Z"}}
236+
236237
native_id = {"positionals": ["--native-id"],
237238
"kwargs": {"dest": "native_id",
238239
"help": "The native ID of a single product granule to be queried, overriding other query arguments if present."}}
@@ -250,20 +251,20 @@ def create_parser():
250251
"help": "The native ID of a single product granule to be queried, overriding other query arguments if present."}}
251252

252253
full_parser = subparsers.add_parser("full")
253-
full_parser_arg_list = [verbose, endpoint, provider, collection, start_date, end_date, bbox, minutes, isl_bucket,
254+
full_parser_arg_list = [verbose, endpoint, provider, collection, start_date, end_date, bbox, minutes,
254255
transfer_protocol, dry_run, smoke_run, no_schedule_download, release_version, job_queue,
255-
chunk_size, batch_ids, use_temporal, native_id]
256+
chunk_size, batch_ids, use_temporal, temporal_start_date, native_id]
256257
_add_arguments(full_parser, full_parser_arg_list)
257258

258259
query_parser = subparsers.add_parser("query")
259-
query_parser_arg_list = [verbose, endpoint, provider, collection, start_date, end_date, bbox, minutes, isl_bucket,
260+
query_parser_arg_list = [verbose, endpoint, provider, collection, start_date, end_date, bbox, minutes,
260261
dry_run, smoke_run, no_schedule_download, release_version, job_queue, chunk_size,
261-
native_id, use_temporal]
262+
native_id, use_temporal, temporal_start_date]
262263
_add_arguments(query_parser, query_parser_arg_list)
263264

264265
download_parser = subparsers.add_parser("download")
265-
download_parser_arg_list = [verbose, file, endpoint, provider, isl_bucket, transfer_protocol, dry_run, smoke_run,
266-
batch_ids, start_date, end_date, use_temporal]
266+
download_parser_arg_list = [verbose, file, endpoint, provider, transfer_protocol, dry_run, smoke_run,
267+
batch_ids, start_date, end_date, use_temporal, temporal_start_date]
267268
_add_arguments(download_parser, download_parser_arg_list)
268269

269270
return parser
@@ -289,7 +290,7 @@ def validate(args):
289290

290291

291292
def _validate_bounds(bbox):
292-
bounds = bbox.split(',')
293+
bounds = bbox.split(",")
293294
value_error = ValueError(
294295
f"Error parsing bounds: {bbox}. Format is <W Longitude>,<S Latitude>,<E Longitude>,<N Latitude> without spaces")
295296

@@ -303,9 +304,9 @@ def _validate_bounds(bbox):
303304
raise value_error
304305

305306

306-
def _validate_date(date, prefix='start'):
307+
def _validate_date(date, prefix="start"):
307308
try:
308-
datetime.strptime(date, '%Y-%m-%dT%H:%M:%SZ')
309+
datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
309310
except ValueError:
310311
raise ValueError(
311312
f"Error parsing {prefix} date: {date}. Format must be like 2021-01-14T00:00:00Z")
@@ -364,10 +365,10 @@ def _get_tokens(edl: str, username: str, password: str) -> list[dict]:
364365
def _revoke_expired_tokens(token_list: list[dict], edl: str, username: str, password: str) -> None:
365366
for token_dict in token_list:
366367
now = datetime.utcnow().date()
367-
expiration_date = datetime.strptime(token_dict['expiration_date'], "%m/%d/%Y").date()
368+
expiration_date = datetime.strptime(token_dict["expiration_date"], "%m/%d/%Y").date()
368369

369370
if expiration_date <= now:
370-
_delete_token(edl, username, password, token_dict['access_token'])
371+
_delete_token(edl, username, password, token_dict["access_token"])
371372
del token_dict
372373

373374

@@ -380,7 +381,7 @@ def _create_token(edl: str, username: str, password: str) -> str:
380381
response_content = create_response.json()
381382

382383
if "error" in response_content.keys():
383-
raise Exception(response_content['error'])
384+
raise Exception(response_content["error"])
384385

385386
token = response_content["access_token"]
386387

@@ -391,7 +392,7 @@ def _delete_token(edl: str, username: str, password: str, token: str) -> None:
391392
url = f"https://{edl}/api/users/revoke_token"
392393
try:
393394
resp = requests.post(url, auth=HTTPBasicAuth(username, password),
394-
params={'token': token})
395+
params={"token": token})
395396
resp.raise_for_status()
396397
except Exception as e:
397398
logging.warning(f"Error deleting the token: {e}")
@@ -472,11 +473,6 @@ async def run_query(args, token, es_conn, cmr, job_id, settings):
472473
release_version=args.release_version,
473474
provider=args.provider,
474475
params=[
475-
{
476-
"name": "isl_bucket_name",
477-
"value": f"--isl-bucket={args.isl_bucket}",
478-
"from": "value"
479-
},
480476
{
481477
"name": "batch_ids",
482478
"value": "--batch-ids " + " ".join(chunk_batch_ids) if chunk_batch_ids else "",
@@ -560,43 +556,48 @@ def query_cmr(args, token, cmr, settings, timerange: DateTimeRange, now: datetim
560556

561557
request_url = f"https://{cmr}/search/granules.umm_json"
562558
params = {
563-
'page_size': PAGE_SIZE,
564-
'sort_key': "-start_date",
565-
'provider': args.provider,
566-
'ShortName': args.collection,
567-
'token': token,
568-
'bounding_box': args.bbox,
559+
"page_size": PAGE_SIZE,
560+
"sort_key": "-start_date",
561+
"provider": args.provider,
562+
"ShortName": args.collection,
563+
"token": token,
564+
"bounding_box": args.bbox,
569565
}
570566

571567
if args.native_id:
572-
params['native-id'] = args.native_id
568+
params["native-id"] = args.native_id
573569

574570
# derive and apply param "temporal"
575571
now_date = now.strftime("%Y-%m-%dT%H:%M:%SZ")
576572
temporal_range = _get_temporal_range(timerange.start_date, timerange.end_date, now_date)
577573
logging.info("Temporal Range: " + temporal_range)
578574

579575
if args.use_temporal:
580-
params['temporal'] = temporal_range
576+
params["temporal"] = temporal_range
581577
else:
582578
params["revision_date"] = temporal_range
583579

580+
# if a temporal start-date is provided, set temporal
581+
if args.temporal_start_date:
582+
logging.info(f"{args.temporal_start_date=}")
583+
params["temporal"] = dateutil.parser.isoparse(args.temporal_start_date).strftime("%Y-%m-%dT%H:%M:%SZ")
584+
584585
logging.info(f"{request_url=} {params=}")
585586
product_granules, search_after = _request_search(args, request_url, params)
586587

587588
while search_after:
588589
granules, search_after = _request_search(args, request_url, params, search_after=search_after)
589590
product_granules.extend(granules)
590591

591-
if args.collection in settings['SHORTNAME_FILTERS']:
592+
if args.collection in settings["SHORTNAME_FILTERS"]:
592593
product_granules = [granule
593594
for granule in product_granules
594595
if _match_identifier(settings, args, granule)]
595596

596597
logging.info(f"Found {str(len(product_granules))} total granules")
597598

598599
for granule in product_granules:
599-
granule['filtered_urls'] = _filter_granules(granule, args)
600+
granule["filtered_urls"] = _filter_granules(granule, args)
600601

601602
return product_granules
602603

@@ -616,17 +617,17 @@ def _get_temporal_range(start: str, end: str, now: str):
616617

617618

618619
def _request_search(args, request_url, params, search_after=None):
619-
response = requests.get(request_url, params=params, headers={'CMR-Search-After': search_after}) \
620+
response = requests.get(request_url, params=params, headers={"CMR-Search-After": search_after}) \
620621
if search_after else requests.get(request_url, params=params)
621622

622623
results = response.json()
623-
items = results.get('items')
624-
next_search_after = response.headers.get('CMR-Search-After')
624+
items = results.get("items")
625+
next_search_after = response.headers.get("CMR-Search-After")
625626

626627
collection_identifier_map = {"HLSL30": "LANDSAT_PRODUCT_ID",
627628
"HLSS30": "PRODUCT_URI"}
628629

629-
if items and 'umm' in items[0]:
630+
if items and "umm" in items[0]:
630631
return [{"granule_id": item.get("umm").get("GranuleUR"),
631632
"provider": item.get("meta").get("provider-id"),
632633
"production_datetime": item.get("umm").get("DataGranule").get("ProductionDateTime"),
@@ -668,8 +669,8 @@ def _filter_granules(granule, args):
668669

669670

670671
def _match_identifier(settings, args, granule) -> bool:
671-
for filter in settings['SHORTNAME_FILTERS'][args.collection]:
672-
if re.match(filter, granule['identifier']):
672+
for filter in settings["SHORTNAME_FILTERS"][args.collection]:
673+
if re.match(filter, granule["identifier"]):
673674
return True
674675

675676
return False
@@ -861,7 +862,7 @@ def download_from_asf(
861862
logging.info("downloading associated orbit file")
862863
dataset_dir = extract_one_to_one(product, settings_cfg, working_dir=Path.cwd())
863864
stage_orbit_file_args = stage_orbit_file.get_parser().parse_args([
864-
f'--output-directory={str(dataset_dir)}',
865+
f"--output-directory={str(dataset_dir)}",
865866
str(product_filepath)
866867
])
867868
stage_orbit_file.main(stage_orbit_file_args)
@@ -1088,7 +1089,7 @@ def _https_transfer(url, bucket_name, token, staging_area=""):
10881089
upload_end_time = datetime.utcnow()
10891090
upload_duration = upload_end_time - upload_start_time
10901091
upload_stats = {"file_name": file_name,
1091-
"file_size (in bytes)": r.headers.get('Content-Length'),
1092+
"file_size (in bytes)": r.headers.get("Content-Length"),
10921093
"upload_duration (in seconds)": upload_duration.total_seconds(),
10931094
"upload_start_time": _convert_datetime(upload_start_time),
10941095
"upload_end_time": _convert_datetime(upload_end_time)}
@@ -1147,7 +1148,7 @@ def _s3_download(url, s3, tmp_dir, staging_area=""):
11471148
file_name = PurePath(url).name
11481149
target_key = str(Path(staging_area, file_name))
11491150

1150-
source = url[len("s3://"):].partition('/')
1151+
source = url[len("s3://"):].partition("/")
11511152
source_bucket = source[0]
11521153
source_key = source[2]
11531154

@@ -1167,5 +1168,5 @@ def _s3_upload(url, bucket_name, tmp_dir, staging_area=""):
11671168
return target_key
11681169

11691170

1170-
if __name__ == '__main__':
1171+
if __name__ == "__main__":
11711172
asyncio.run(run(sys.argv))

docker/hysds-io.json.hls_download

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,6 @@
1010
"type": "text",
1111
"default": "--endpoint=OPS"
1212
},
13-
{
14-
"name": "isl_bucket_name",
15-
"from": "submitter",
16-
"placeholder": "e.g. --isl-bucket=<isl_bucket>"
17-
},
1813
{
1914
"name": "batch_ids",
2015
"from": "submitter",

docker/hysds-io.json.hlsl30_query

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,6 @@
4444
"type": "text",
4545
"default": "--job-queue=opera-job_worker-hls_data_download"
4646
},
47-
{
48-
"name": "isl_bucket_name",
49-
"from": "submitter",
50-
"placeholder": "e.g. --isl-bucket=<isl_bucket>"
51-
},
5247
{
5348
"name": "chunk_size",
5449
"from": "submitter",
@@ -74,6 +69,12 @@
7469
"from": "submitter",
7570
"placeholder": "e.g. --use-temporal",
7671
"optional": true
72+
},
73+
{
74+
"name": "temporal_start_datetime",
75+
"from": "submitter",
76+
"placeholder": "e.g. --temporal-start-date=1970-01-01T00:00:00Z",
77+
"optional": true
7778
}
7879
]
7980
}

docker/hysds-io.json.hlsl30_query_minutes

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,6 @@
4242
"type": "text",
4343
"default": "--job-queue=opera-job_worker-hls_data_download"
4444
},
45-
{
46-
"name": "isl_bucket_name",
47-
"from": "submitter",
48-
"placeholder": "e.g. --isl-bucket=<isl_bucket>"
49-
},
5045
{
5146
"name": "chunk_size",
5247
"from": "submitter",

docker/hysds-io.json.hlsl30_query_native_id

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,6 @@
4141
"type": "text",
4242
"default": "--job-queue=opera-job_worker-hls_data_download"
4343
},
44-
{
45-
"name": "isl_bucket_name",
46-
"from": "submitter",
47-
"placeholder": "e.g. --isl-bucket=<isl_bucket>"
48-
},
4944
{
5045
"name": "chunk_size",
5146
"from": "submitter",

0 commit comments

Comments
 (0)