Skip to content

Commit

Permalink
new previews
Browse files Browse the repository at this point in the history
  • Loading branch information
satchelbaldwin committed Feb 1, 2024
1 parent ddc7a76 commit caa46a3
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 19 deletions.
17 changes: 15 additions & 2 deletions api/dataset/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ def open_remote_dataset(urls: List[str]) -> xarray.Dataset:
concat_dim="time",
combine="nested",
parallel=True,
use_cftime=True,
)
except IOError as e:
print(f"failed to open parallel: {e}")
try:
ds = xarray.open_mfdataset(urls, concat_dim="time", combine="nested")
ds = xarray.open_mfdataset(
urls,
concat_dim="time",
combine="nested",
use_cftime=True,
)
except IOError as e:
print(f"failed to open sequentially, falling back to s3: {e}")
return open_remote_dataset_s3(urls)
Expand All @@ -29,5 +35,12 @@ def open_remote_dataset_s3(urls: List[str]) -> xarray.Dataset:
fs = s3fs.S3FileSystem(anon=True)
urls = ["s3://esgf-world" + url[url.find("/CMIP6") :] for url in urls]
print(urls, flush=True)
files = [xarray.open_dataset(fs.open(url), chunks={"time": 10}) for url in urls]
files = [
xarray.open_dataset(
fs.open(url),
chunks={"time": 10},
use_cftime=True,
)
for url in urls
]
return xarray.merge(files)
13 changes: 10 additions & 3 deletions api/dataset/terarium_hmi.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,25 @@ def construct_hmi_dataset(
) -> str:
terarium_auth = (default_settings.terarium_user, default_settings.terarium_pass)
dataset_name = dataset_id.split("|")[0]
try:
preview = render(ds)
except Exception as e:
preview = ""
print(e, flush=True)
hmi_dataset = {
"userId": "",
"name": f"{dataset_name}-subset-{subset_uuid}",
"description": generate_description(ds, dataset_id, opts),
"dataSourceDate": ds.attrs.get("creation_date", "UNKNOWN"),
"fileNames": [],
"columns": [],
"datasetUrl": ds.attrs.get("further_info_url", "UNKNOWN"),
"format": "netcdf",
"metadata": {
"format": "netcdf",
"parentDatasetId": parent_dataset_id,
"variableId": ds.attrs.get("variable_id", ""),
"subsetDetails": opts,
"preview": render(ds),
"subsetDetails": repr(opts),
"preview": preview,
"dataStructure": {
k: {
"attrs": {
Expand Down
5 changes: 4 additions & 1 deletion api/preview/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def render(
# we're plotting x, y, time - others need to be shortened to the first element
other_axes = [axis for axis in axes if axis not in ["X", "Y", "T"]]
for axis in other_axes:
ds = ds.sel({axes[axis]: ds[axes[axis]][0]})
try:
ds = ds.sel({axes[axis]: ds[axes[axis]][0]})
except Exception as e:
print(f"failed to trim non-relevant axis {axis}: {ds[axes[axis]]}")

ds = ds[variable_index]

Expand Down
7 changes: 5 additions & 2 deletions api/processing/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
TemporalSubsetOptions,
ThinningSubsetOptions,
)
import pandas


def location_bbox(
Expand All @@ -19,10 +20,12 @@ def location_bbox(

def timestamps(dataset: xarray.Dataset, timestamps: List[str], field="time"):
ts = timestamps[:]
print(ts)
if ts[0] == "start":
ts[0] = "0000-01-01T00:00:00"
ts[0] = "0001-01"
if ts[1] == "end":
ts[1] = "9999-01-01T00:00:00"
ts[1] = "9999-01"
print(ts, flush=True)
return dataset.sel({field: slice(ts[0], ts[1])})


Expand Down
5 changes: 2 additions & 3 deletions api/processing/providers/esgf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ def slice_esgf_dataset(


def slice_and_store_dataset(
urls: List[str], dataset_id: str, params: Dict[str, Any], **kwargs
urls: List[str], parent_id: str, dataset_id: str, params: Dict[str, Any], **kwargs
):
job_id = kwargs["job_id"]
filename = f"cmip6-{job_id}.nc"
parent_dataset_id = params.get("parent_dataset_id", "")
print(f"running job esgf subset job for: {job_id}", flush=True)
ds = slice_esgf_dataset(urls, dataset_id, params)
print(f"bytes: {ds.nbytes}", flush=True)
Expand All @@ -40,7 +39,7 @@ def slice_and_store_dataset(
hmi_id = construct_hmi_dataset(
ds,
dataset_id,
parent_dataset_id,
parent_id,
job_id,
filters.options_from_url_parameters(params),
"dataset-netcdf-testuser",
Expand Down
30 changes: 24 additions & 6 deletions api/search/providers/esgf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import pickle

NATURAL_LANGUAGE_PROCESSING_CONTEXT = """
You are a tool to extract search terms by category from a given search request.
You are a tool to extract keyword search terms by category from a given search request.
The given fields are: frequency, nominal_resolution, lower_time_bound, upper_time_bound, and description.
The given keyword fields are: frequency, nominal_resolution, lower_time_bound, upper_time_bound, and description.
The definitions of the fields are as follows.
The definitions of the keyword fields are as follows.
frequency is a duration.
Possible example values for frequency values are: 6 hours, 6hrs, 3hr, daily, day, yearly, 12 hours, 12 hr
Expand All @@ -42,7 +42,6 @@
Input:
100km before 2023 daily air temperature
Output:
{
"frequency": "daily",
Expand All @@ -52,7 +51,6 @@
}
Input: 2x2 degree relative humidity between june 1997 and july 1999 6hr
Output:
{
"frequency": "6hr",
Expand All @@ -61,6 +59,23 @@
"upper_time_bound": "1999-07-00T00:00:00Z",
"description": "relative humidity"
}
Input: ts
Output: {
"description": "ts"
}
Input: Find me datasets with the variable relative humidity
Output: {
"description": "relative humidity"
}
Input: datasets before june 1995 the variable surface temperature model BCC-ESM1
Output: {
"upper_time_bound": "1995-06-00T00:00:00Z",
"description": "surface temperature BCC-ESM1"
}
Only return JSON.
"""

# cosine matching threshold to greedily take term
Expand Down Expand Up @@ -176,7 +191,9 @@ def natural_language_search(
try:
search_terms = json.loads(search_terms_json)
except ValueError as e:
print(f"openAI returned more than just json, retrying query... \n {e}")
print(
f"openAI returned more than just json, retrying query... \n {e} {search_terms_json}"
)
if retries >= 3:
print("openAI returned non-json in multiple retries, exiting")
return []
Expand Down Expand Up @@ -211,6 +228,7 @@ def process_natural_language(self, search_query: str) -> str:
temperature=0.7,
)
query = response.choices[0].message.content
print(query)
query = query[query.find("{") :]
return query

Expand Down
6 changes: 4 additions & 2 deletions api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ async def esgf_fetch(dataset_id: str = ""):


@app.get(path="/subset/esgf")
async def esgf_subset(request: Request, redis=Depends(get_redis), dataset_id: str = ""):
async def esgf_subset(
request: Request, parent_id, dataset_id, redis=Depends(get_redis)
):
params = params_to_dict(request)
urls = esgf.get_access_urls_by_id(dataset_id)
job = create_job(
func=slice_and_store_dataset,
args=[urls, dataset_id, params],
args=[urls, parent_id, dataset_id, params],
redis=redis,
queue="subset",
)
Expand Down

0 comments on commit caa46a3

Please sign in to comment.