Skip to content

Commit

Permalink
error resilience
Browse files Browse the repository at this point in the history
  • Loading branch information
satchelbaldwin committed Feb 7, 2024
1 parent caa46a3 commit b58521a
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 43 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ Each dataset contains a `metadata` field.

The `metadata` field contains an `id` field that is used for subsequent processing and lookups, containing the full dataset ID with revision and node information, such as: `CMIP6.CMIP.NCAR.CESM2.historical.r11i1p1f1.CFday.ua.gn.v20190514|esgf-data.ucar.edu`

#### Preview

`/preview/esgf`

Required Parameters:
* `dataset_id`: ID of the dataset provided by search in full format.

Optional Parameters:
* `variable_id`: override the variable to render in the preview.
* `timestamps`: plot over a list of times. much slower, work in progress
* `time_index`: override time index to use.


#### Subset

`/subset/esgf`
Expand Down
2 changes: 0 additions & 2 deletions api/dataset/job_queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import time
import uuid
from typing import Any, Dict, Optional
from fastapi import Response, status
from redis import Redis
from rq import Queue
Expand Down
60 changes: 43 additions & 17 deletions api/dataset/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,53 @@
from typing import List
import s3fs

# we have to operate on urls / dataset_ids due to the fact that
# we have to operate on urls, paths / dataset_ids due to the fact that
# rq jobs can't pass the context of a loaded xarray dataset in memory (json serialization)


def open_remote_dataset(urls: List[str]) -> xarray.Dataset:
try:
ds = xarray.open_mfdataset(
urls,
chunks={"time": 10},
concat_dim="time",
combine="nested",
parallel=True,
use_cftime=True,
)
except IOError as e:
print(f"failed to open parallel: {e}")
def open_dataset(paths: List[List[str]]) -> xarray.Dataset:
for mirror in paths:
try:
ds = xarray.open_mfdataset(
urls,
mirror,
chunks={"time": 10},
concat_dim="time",
combine="nested",
parallel=True,
use_cftime=True,
)
return ds
except IOError as e:
print(f"failed to open sequentially, falling back to s3: {e}")
return open_remote_dataset_s3(urls)
return ds
print(f"failed to open parallel: {e}")
try:
ds = xarray.open_mfdataset(
mirror,
concat_dim="time",
combine="nested",
use_cftime=True,
)
return ds
except IOError as e:
print(f"failed to open sequentially {e}")

print("failed to find dataset in all mirrors.")
try:
# function handles stripping out url part, so any mirror will have the same result
ds = open_remote_dataset_s3(paths[0])
return ds
except IOError as e:
print(f"file not found in s3 mirroring: {e}")

for mirror in paths:
try:
ds = open_remote_dataset_http(mirror)
return ds
except IOError as e:
print(f"failed to download via plain http: {e}")

raise IOError(
f"Failed to download dataset via parallel dap, sequential dap, s3 mirror, and http: {paths}"
)


def open_remote_dataset_s3(urls: List[str]) -> xarray.Dataset:
Expand All @@ -44,3 +64,9 @@ def open_remote_dataset_s3(urls: List[str]) -> xarray.Dataset:
for url in urls
]
return xarray.merge(files)


def open_remote_dataset_http(urls: List[str]) -> xarray.Dataset:
raise IOError(
"failed to attempt http downloading: dataset requires authorization when in plain http download"
)
2 changes: 0 additions & 2 deletions api/dataset/terarium_hmi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from uuid import uuid4
import xarray
from datetime import datetime, timezone
from api.dataset.models import DatasetSubsetOptions
from api.settings import default_settings
import requests
Expand Down
16 changes: 11 additions & 5 deletions api/preview/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import xarray
from matplotlib import pyplot as plt
from typing import List
from api.dataset.remote import open_remote_dataset
from api.dataset.remote import open_dataset


def buffer_to_b64_png(buffer: io.BytesIO) -> str:
Expand All @@ -16,14 +16,17 @@ def buffer_to_b64_png(buffer: io.BytesIO) -> str:

# handles loading as to not share xarray over rq-worker boundaries
def render_preview_for_dataset(
urls: List[str],
urls: List[List[str]],
variable_index: str = "",
time_index: str = "",
timestamps: str = "",
**kwargs,
):
ds = open_remote_dataset(urls)
return {"png": render(ds, variable_index, time_index, timestamps)}
try:
ds = open_dataset(urls)
return {"png": render(ds, variable_index, time_index, timestamps)}
except IOError as e:
return {"error": f"upstream hosting is likely having a problem. {e}"}


def render(
Expand Down Expand Up @@ -53,12 +56,15 @@ def render(
ds = ds.sel({time_index: slice(timestamps.split(","))})

# we're plotting x, y, time - others need to be shortened to the first element
print(axes, flush=True)
other_axes = [axis for axis in axes if axis not in ["X", "Y", "T"]]
for axis in other_axes:
try:
ds = ds.sel({axes[axis]: ds[axes[axis]][0]})
except Exception as e:
print(f"failed to trim non-relevant axis {axis}: {ds[axes[axis]]}")
print(
f"failed to trim non-relevant axis {axis}: {ds[axes[axis]]}: {e}: (this can be safely ignored if expected)"
)

ds = ds[variable_index]

Expand Down
1 change: 0 additions & 1 deletion api/processing/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
TemporalSubsetOptions,
ThinningSubsetOptions,
)
import pandas


def location_bbox(
Expand Down
22 changes: 15 additions & 7 deletions api/processing/providers/esgf.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
import base64
from .. import filters
from api.settings import default_settings
import xarray
from typing import Any, Dict, List
from api.dataset.terarium_hmi import construct_hmi_dataset
from api.dataset.remote import open_remote_dataset
from api.dataset.remote import open_dataset


def slice_esgf_dataset(
urls: List[str], dataset_id: str, params: Dict[str, Any]
urls: List[List[str]], dataset_id: str, params: Dict[str, Any]
) -> xarray.Dataset:
ds = open_remote_dataset(urls)
ds = open_dataset(urls)
options = filters.options_from_url_parameters(params)
print(f"original size: {ds.nbytes}\nslicing with options {options}", flush=True)
return filters.subset_with_options(ds, options)


def slice_and_store_dataset(
urls: List[str], parent_id: str, dataset_id: str, params: Dict[str, Any], **kwargs
urls: List[List[str]],
parent_id: str,
dataset_id: str,
params: Dict[str, Any],
**kwargs,
):
job_id = kwargs["job_id"]
filename = f"cmip6-{job_id}.nc"
print(f"running job esgf subset job for: {job_id}", flush=True)
ds = slice_esgf_dataset(urls, dataset_id, params)
try:
ds = slice_esgf_dataset(urls, dataset_id, params)
except IOError as e:
return {
"status": "failed",
"error": f"upstream is likely having a problem. {e}",
}
print(f"bytes: {ds.nbytes}", flush=True)
try:
print("pulling sliced dataset from remote", flush=True)
Expand Down
6 changes: 5 additions & 1 deletion api/search/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from dask.delayed import Delayed
import dask

# consistent interface for handling search results and paths
# across multiple sources.


class Dataset:
metadata: Dict[str, Any] # json
Expand All @@ -17,5 +20,6 @@ class BaseSearchProvider:
def search(self, query: str, page: int) -> DatasetSearchResults:
return []

def get_access_urls(self, dataset: Dataset) -> List[str]:
# [mirrors... [dataset urls...]]
def get_access_paths(self, dataset: Dataset) -> List[List[str]]:
return []
74 changes: 74 additions & 0 deletions api/search/providers/era5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from api.search.provider import BaseSearchProvider, DatasetSearchResults, Dataset
from api.settings import default_settings
import ast
from openai import OpenAI
from typing import List


class ERA5ApiCallNodeVisitor(ast.NodeVisitor):
"""
walks the ast to strip out a GPT-4 generated ERA5 api call to get the important details from it
for storage, rather than keeping a chunk to eval - easier to reconstitute and make changes from
frontend arguments for subsetting.
"""

def visit_Call(self, node):
if isinstance(node.func.value, ast.Name) and node.func.attr == "retrieve":
args = [arg for arg in node.args]
expected = [ast.Constant, ast.Dict, ast.Constant]
type_match = [isinstance(args[i], expected[i]) for i in range(len(args))]
if not all(type_match):
raise IOError(
f"malformed API call: type match failed for args: {type_match}; expected {expected}; got {args}"
)
dataset_name = args[0].value
api_arguments = eval(
compile(ast.Expression(args[1]), "<ast dictionary>", "eval")
)
filename = args[2].value
return (dataset_name, api_arguments, filename)
self.generic_visit(node)


class ERA5(BaseSearchProvider):
def __init__(self, openai_client):
print("initializing ERA5 search provider")
self.client: OpenAI = openai_client

def search(self, query: str, *_) -> DatasetSearchResults:
"""
unpaginated - search ERA5 datasets and download file. the ERA5 api downloads to disk
as the only API call rather than search / subset / fetch as two operations.
this generates the *api call* for preview and extracts the information - subsetting it
will construct it from that data. not quite ideal, but ERA5 is not fun to work with
"""
code_output = self.natural_language_search(query)
retrieve_call = code_output[code_output.find("c.retrieve(") :]
visitor = ERA5ApiCallNodeVisitor()
(name, args, filename) = visitor.visit(ast.parse(retrieve_call))
return [Dataset(dict(dataset_name=name, arguments=args, filename=filename))]

def generate_natural_language_context(self, search: str):
return f"""
construct a python api call for the CDS ERA5 climate data store to retrieve {search} in netcdf format.
return only the python code with no additional explanation.
"""

def natural_language_search(self, query: str) -> str:
context = self.generate_natural_language_context(query)
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": context},
],
temperature=0.7,
)
return response.choices[0].message.content or ""

def get_access_paths(self, dataset: Dataset) -> List[str]:
"""
era5 metadata is less useful than other sources with a search / fetch workflow.
additionally, the loaded dataset used for subsetting / renders must be done in one step.
"""
return []
24 changes: 20 additions & 4 deletions api/search/providers/esgf.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,32 @@ def search(
self.initialize_embeddings(force_refresh_cache)
return self.natural_language_search(query, page)

def get_access_urls_by_id(self, dataset_id: str) -> List[str]:
def get_all_access_paths_by_id(self, dataset_id: str) -> List[List[str]]:
return [
self.get_access_paths_by_id(id)
for id in self.get_mirrors_for_dataset(dataset_id)
]

def get_mirrors_for_dataset(self, dataset_id: str) -> List[str]:
# strip vert bar if provided with example mirror attached
dataset_id = dataset_id.split("|")[0]
response = self.run_esgf_query(f"id:{dataset_id}*", 1, {})
full_ids = [d.metadata["id"] for d in response]
return full_ids

def get_access_paths_by_id(self, dataset_id: str) -> List[str]:
"""
returns a list of OPENDAP URLs for use in processing given a dataset.
"""
if dataset_id == "":
return []
self.get_mirrors_for_dataset(dataset_id)
params = urlencode(
{
"type": "File",
"format": "application/solr+json",
"dataset_id": dataset_id,
"limit": 50,
"limit": 200,
}
)
full_url = f"{default_settings.esgf_url}/search?{params}"
Expand All @@ -177,8 +193,8 @@ def get_access_urls_by_id(self, dataset_id: str) -> List[str]:
opendap_urls = [u[:-5] if u.endswith(".nc.html") else u for u in opendap_urls]
return opendap_urls

def get_access_urls(self, dataset: Dataset) -> List[str]:
return self.get_access_urls_by_id(dataset.metadata["id"])
def get_access_paths(self, dataset: Dataset) -> List[List[str]]:
return self.get_all_access_paths_by_id(dataset.metadata["id"])

def natural_language_search(
self, search_query: str, page: int, retries=0
Expand Down
8 changes: 4 additions & 4 deletions api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ async def esgf_search(query: str = "", page: int = 1, refresh_cache=False):


@app.get("/fetch/esgf")
async def esgf_fetch(dataset_id: str = ""):
urls = esgf.get_access_urls_by_id(dataset_id)
async def esgf_fetch(dataset_id):
urls = esgf.get_all_access_paths_by_id(dataset_id)
return {"dataset": dataset_id, "urls": urls}


Expand All @@ -38,7 +38,7 @@ async def esgf_subset(
request: Request, parent_id, dataset_id, redis=Depends(get_redis)
):
params = params_to_dict(request)
urls = esgf.get_access_urls_by_id(dataset_id)
urls = esgf.get_all_access_paths_by_id(dataset_id)
job = create_job(
func=slice_and_store_dataset,
args=[urls, parent_id, dataset_id, params],
Expand All @@ -50,7 +50,7 @@ async def esgf_subset(

@app.get(path="/preview/esgf")
async def esgf_preview(dataset_id: str, redis=Depends(get_redis)):
urls = esgf.get_access_urls_by_id(dataset_id)
urls = esgf.get_all_access_paths_by_id(dataset_id)
job = create_job(
func=render_preview_for_dataset, args=[urls], redis=redis, queue="preview"
)
Expand Down

0 comments on commit b58521a

Please sign in to comment.