Skip to content

Commit

Permalink
Stac validation (#9)
Browse files Browse the repository at this point in the history
* Adjust stac output for validator

* change time extent to tuple

* Fix mypy grumbling, add TemporalExtent

* Fix flake8 grumbling
  • Loading branch information
datadavev authored May 21, 2024
1 parent e3f3a30 commit 997bf92
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 9 deletions.
3 changes: 2 additions & 1 deletion export_client_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
@click.option(
"-r",
"--refresh-dir",
help="If specified, will read the manifest.json out of an existing directory and re-execute the query to update results."
help=("If specified, will read the manifest.json out of an existing "
"directory and re-execute the query to update results.")
)
@click.option(
"-t",
Expand Down
25 changes: 24 additions & 1 deletion isamples_export_client/duckdb_utilities.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import json
from typing import Optional

Expand All @@ -14,15 +15,37 @@ def __repr__(self):
return f"GeoFeaturesResult geo_json={self.geo_json}, bbox={self.bbox}"


class TemporalExtent(tuple):

def __new__(self, t0: Optional[datetime.datetime], t1: Optional[datetime.datetime]):
return tuple.__new__(TemporalExtent, (t0, t1))


def read_geo_features_from_jsonl(filename: str) -> Optional[GeoFeaturesResult]:
con = duckdb.connect()
con.install_extension("spatial")
con.load_extension("spatial")
con.read_json(filename, format="newline_delimited")
location_prefix = "produced_by.sampling_site.sample_location."
q = f"select ST_Extent(envelope)as bb, ST_AsGEOJSON(envelope) as poly from (select ST_Envelope_Agg(ST_Point({location_prefix}longitude, {location_prefix}latitude)) as envelope from '{filename}' where {location_prefix}longitude is not null)"
q = ("select ST_Extent(envelope)as bb, ST_AsGEOJSON(envelope) as poly "
f"from (select ST_Envelope_Agg(ST_Point({location_prefix}longitude, {location_prefix}latitude)) as envelope "
f"from '{filename}' where {location_prefix}longitude is not null)")
spatial_results = con.sql(q).fetchone()
if spatial_results is not None:
return GeoFeaturesResult(spatial_results[0], spatial_results[1])
else:
return None


def get_temporal_extent_from_jsonl(filename: str) -> TemporalExtent:
con = duckdb.connect()
con.read_json(filename, format="newline_delimited")
q = f"SET TimeZone='UTC'; CREATE TABLE samples AS SELECT * FROM read_json('{filename}', format='newline_delimited');"
con.sql(q)
q = ("SELECT min(produced_by.result_time::TIMESTAMPTZ) as min_t, "
"max(produced_by.result_time::TIMESTAMPTZ) as max_t "
"from samples where produced_by.result_time is not null")
result = con.sql(q).fetchone()
if result is not None:
return TemporalExtent(result[0], result[1])
return TemporalExtent(None, None)
45 changes: 38 additions & 7 deletions isamples_export_client/export_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
import requests
from requests import Session, Response

from isamples_export_client.duckdb_utilities import GeoFeaturesResult, read_geo_features_from_jsonl
from isamples_export_client.duckdb_utilities import (
GeoFeaturesResult,
TemporalExtent,
read_geo_features_from_jsonl,
get_temporal_extent_from_jsonl
)
from isamples_export_client.geoparquet_utilities import write_geoparquet_from_json_lines

GEOPARQUET = "geoparquet"
Expand All @@ -26,6 +31,7 @@

STAC_COLLECTION_TYPE = "Collection"
STAC_VERSION = "1.0.0"
STAC_DEFAULT_LICENSE = "CC-BY-4.0" # https://spdx.org/licenses/CC-BY-4.0.html
COLLECTION_ID = "isamples-stac-collection-"
COLLECTION_DESCRIPTION = """The Internet of Samples (iSamples) is a multi-disciplinary and multi-institutional
project funded by the National Science Foundation to design, develop, and promote service infrastructure to uniquely,
Expand All @@ -41,6 +47,13 @@ def datetime_to_solr_format(dt):
return dt.strftime(SOLR_TIME_FORMAT)


class JsonDateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime.datetime):
return o.isoformat(timespec="seconds")
return json.JSONEncoder.default(self, o)


class ExportJobStatus(Enum):
CREATED = "created"
STARTED = "started"
Expand Down Expand Up @@ -185,7 +198,15 @@ def write_manifest(self, query: str, uuid: str, tstarted: datetime.datetime, num
f.write(json.dumps(manifests, indent=4))
return manifest_path

def write_stac(self, uuid: str, tstarted: datetime.datetime, geo_result: GeoFeaturesResult, solr_query: str, json_file_path: str, parquet_file_path: str) -> str:
def write_stac(
self,
uuid: str,
tstarted: datetime.datetime,
geo_result: GeoFeaturesResult,
temporal_result: TemporalExtent,
solr_query: str,
json_file_path: str,
parquet_file_path: str) -> str:
assets_dict = {
}
description_string = (
Expand All @@ -210,8 +231,17 @@ def write_stac(self, uuid: str, tstarted: datetime.datetime, geo_result: GeoFeat
"type": STAC_COLLECTION_TYPE,
"id": f"iSamples Export Service result {uuid}",
"collection": f"{COLLECTION_TITLE} {uuid}",
"geometry": geo_result.geo_json_dict,
"bbox": geo_result.bbox,
"license": STAC_DEFAULT_LICENSE,
"extent": {
"spatial": {
"bbox": [geo_result.bbox,]
},
"temporal": {
"interval": [
temporal_result
]
}
},
"properties": {
"datetime": datetime_to_solr_format(tstarted)
},
Expand Down Expand Up @@ -305,8 +335,8 @@ def write_stac(self, uuid: str, tstarted: datetime.datetime, geo_result: GeoFeat
"assets": assets_dict
}
stac_path = ExportClient._stac_file_path(self._destination_directory)
with open(stac_path, "w") as f:
f.write(json.dumps(stac_item, indent=4))
with open(stac_path, "w", encoding="UTF-8") as f:
json.dump(stac_item, f, indent=4, ensure_ascii=False, cls=JsonDateTimeEncoder)
return stac_path

def perform_full_download(self):
Expand Down Expand Up @@ -334,13 +364,14 @@ def perform_full_download(self):
manifest_path = self.write_manifest(self._query, uuid, tstarted, num_results)
logging.info(f"Successfully wrote manifest file to {manifest_path}")
geo_result = read_geo_features_from_jsonl(filename)
temporal_result = get_temporal_extent_from_jsonl(filename)
parquet_filename = None
if self.is_geoparquet:
parquet_filename = write_geoparquet_from_json_lines(filename)
query_string = status_json.get("query").replace("'", "\"")
solr_query_dict = json.loads(query_string)
query = solr_query_dict.pop("q")
stac_path = self.write_stac(uuid, tstarted, geo_result, query, filename, parquet_filename)
stac_path = self.write_stac(uuid, tstarted, geo_result, temporal_result, query, filename, parquet_filename)
logging.info(f"Successfully wrote stac item to {stac_path}")
break
except Exception as e:
Expand Down

0 comments on commit 997bf92

Please sign in to comment.