Skip to content

Commit

Permalink
http smart fallbacks and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
satchelbaldwin committed Feb 7, 2024
1 parent ff471c0 commit ce4492c
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 31 deletions.
78 changes: 70 additions & 8 deletions api/dataset/remote.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,39 @@
from concurrent.futures import ThreadPoolExecutor
import glob
import xarray
from typing import List
from api.search.provider import AccessURLs
from api.settings import default_settings
import os
import s3fs
import requests

# we have to operate on urls, paths / dataset_ids due to the fact that
# rq jobs can't pass the context of a loaded xarray dataset in memory (json serialization)

# list of ordered download priorities:
# all mirrors are checked in each method
# ------------------------------------
# opendap [parallel]
# opendap [sequential]
# s3 mirror - s3://esgf-world netcdf4 bucket
# plain http
# s3 mirror - zarr format


def open_dataset(paths: AccessURLs, job_id=None) -> xarray.Dataset:
if len(paths) == 0:
raise IOError(
"paths was provided an empty list - does the dataset exist? no URLs found."
)

def open_dataset(paths: List[List[str]]) -> xarray.Dataset:
for mirror in paths:
opendap_urls = mirror["opendap"]
if len(opendap_urls) == 0:
continue
try:
ds = xarray.open_mfdataset(
mirror,
opendap_urls,
chunks={"time": 10},
concat_dim="time",
combine="nested",
Expand All @@ -22,7 +45,7 @@ def open_dataset(paths: List[List[str]]) -> xarray.Dataset:
print(f"failed to open parallel: {e}")
try:
ds = xarray.open_mfdataset(
mirror,
opendap_urls,
concat_dim="time",
combine="nested",
use_cftime=True,
Expand All @@ -34,14 +57,21 @@ def open_dataset(paths: List[List[str]]) -> xarray.Dataset:
print("failed to find dataset in all mirrors.")
try:
# function handles stripping out url part, so any mirror will have the same result
ds = open_remote_dataset_s3(paths[0])
ds = open_remote_dataset_s3(paths[0]["opendap"])
return ds
except IOError as e:
print(f"file not found in s3 mirroring: {e}")

for mirror in paths:
http_urls = mirror["http"]
if len(http_urls) == 0:
continue
try:
ds = open_remote_dataset_http(mirror)
if job_id is None:
raise IOError(
"http downloads must have an associated job id for cleanup purposes"
)
ds = open_remote_dataset_http(http_urls, job_id)
return ds
except IOError as e:
print(f"failed to download via plain http: {e}")
Expand All @@ -66,7 +96,39 @@ def open_remote_dataset_s3(urls: List[str]) -> xarray.Dataset:
return xarray.merge(files)


def open_remote_dataset_http(urls: List[str]) -> xarray.Dataset:
raise IOError(
"failed to attempt http downloading: dataset requires authorization when in plain http download"
def download_file_http(url: str, dir: str):
rs = requests.get(url, stream=True)
if rs.status_code == 401:
rs = requests.get(url, stream=True, auth=default_settings.esgf_openid)
filename = url.split("/")[-1]
print("writing ", os.path.join(dir, filename))
with open(os.path.join(dir, filename), mode="wb") as file:
for chunk in rs.iter_content(chunk_size=10 * 1024):
file.write(chunk)


def open_remote_dataset_http(urls: List[str], job_id) -> xarray.Dataset:
temp_directory = os.path.join(".", str(job_id))
if not os.path.exists(temp_directory):
os.makedirs(temp_directory)
with ThreadPoolExecutor() as executor:
executor.map(lambda url: download_file_http(url, temp_directory), urls)
files = [os.path.join(temp_directory, f) for f in os.listdir(temp_directory)]
ds = xarray.open_mfdataset(
files,
parallel=True,
concat_dim="time",
combine="nested",
use_cftime=True,
chunks={"time": 10},
)
return ds


def cleanup_potential_artifacts(job_id):
temp_directory = os.path.join(".", str(job_id))
if os.path.exists(temp_directory):
print(f"cleaning http artifact: {temp_directory}")
for file in glob.glob(os.path.join(temp_directory, "*.nc")):
os.remove(file)
os.removedirs(temp_directory)
12 changes: 8 additions & 4 deletions api/preview/render.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import io
import base64
from api.search.provider import AccessURLs
import cartopy.crs as ccrs
import xarray
from matplotlib import pyplot as plt
from typing import List
from api.dataset.remote import open_dataset
from api.dataset.remote import cleanup_potential_artifacts, open_dataset


def buffer_to_b64_png(buffer: io.BytesIO) -> str:
Expand All @@ -16,15 +17,18 @@ def buffer_to_b64_png(buffer: io.BytesIO) -> str:

# handles loading as to not share xarray over rq-worker boundaries
def render_preview_for_dataset(
urls: List[List[str]],
urls: AccessURLs,
variable_index: str = "",
time_index: str = "",
timestamps: str = "",
**kwargs,
):
job_id = kwargs["job_id"]
try:
ds = open_dataset(urls)
return {"png": render(ds, variable_index, time_index, timestamps)}
ds = open_dataset(urls, job_id)
png = render(ds, variable_index, time_index, timestamps)
cleanup_potential_artifacts(job_id)
return {"png": png}
except IOError as e:
return {"error": f"upstream hosting is likely having a problem. {e}"}

Expand Down
9 changes: 6 additions & 3 deletions api/processing/providers/esgf.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from api.search.provider import AccessURLs
from .. import filters
import xarray
from typing import Any, Dict, List
from api.dataset.terarium_hmi import construct_hmi_dataset
from api.dataset.remote import open_dataset
from api.dataset.remote import cleanup_potential_artifacts, open_dataset


def slice_esgf_dataset(
urls: List[List[str]], dataset_id: str, params: Dict[str, Any]
urls: AccessURLs, dataset_id: str, params: Dict[str, Any]
) -> xarray.Dataset:
ds = open_dataset(urls)
options = filters.options_from_url_parameters(params)
Expand All @@ -15,7 +16,7 @@ def slice_esgf_dataset(


def slice_and_store_dataset(
urls: List[List[str]],
urls: AccessURLs,
parent_id: str,
dataset_id: str,
params: Dict[str, Any],
Expand Down Expand Up @@ -56,3 +57,5 @@ def slice_and_store_dataset(
return {"status": "ok", "dataset_id": hmi_id}
except Exception as e:
return {"status": "failed", "error": str(e), "dataset_id": ""}
finally:
cleanup_potential_artifacts(job_id)
5 changes: 3 additions & 2 deletions api/search/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ def __init__(self, metadata):


DatasetSearchResults = List[Dataset]
AccessURLs = List[Dict[str, List[str]]] # mirrors : [ method -> urls ]


class BaseSearchProvider:
def search(self, query: str, page: int) -> DatasetSearchResults:
return []

# [mirrors... [dataset urls...]]
def get_access_paths(self, dataset: Dataset) -> List[List[str]]:
return []
def get_access_paths(self, dataset: Dataset) -> AccessURLs:
return {}
33 changes: 22 additions & 11 deletions api/search/providers/esgf.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from api.settings import default_settings
from api.search.provider import BaseSearchProvider, DatasetSearchResults, Dataset
from api.search.provider import (
AccessURLs,
BaseSearchProvider,
DatasetSearchResults,
Dataset,
)
import requests
from urllib.parse import urlencode
from typing import List, Dict
Expand Down Expand Up @@ -142,7 +147,7 @@ def search(
self.initialize_embeddings(force_refresh_cache)
return self.natural_language_search(query, page)

def get_all_access_paths_by_id(self, dataset_id: str) -> List[List[str]]:
def get_all_access_paths_by_id(self, dataset_id: str) -> AccessURLs:
return [
self.get_access_paths_by_id(id)
for id in self.get_mirrors_for_dataset(dataset_id)
Expand All @@ -155,7 +160,7 @@ def get_mirrors_for_dataset(self, dataset_id: str) -> List[str]:
full_ids = [d.metadata["id"] for d in response]
return full_ids

def get_access_paths_by_id(self, dataset_id: str) -> List[str]:
def get_access_paths_by_id(self, dataset_id: str) -> Dict[str, List[str]]:
"""
returns a list of OPENDAP URLs for use in processing given a dataset.
"""
Expand All @@ -182,18 +187,24 @@ def get_access_paths_by_id(self, dataset_id: str) -> List[str]:
raise ConnectionError(
f"Failed to extract files from dataset: empty list {full_url}"
)

# file url responses are lists of strings with their protocols separated by |
# e.x. https://esgf-node.example|mimetype|OPENDAP
opendap_urls = [
url.split("|")[0]
for url in itertools.chain.from_iterable([f["url"] for f in files])
if "OPENDAP" in url
]
def select(files, selector):
return [
url.split("|")[0]
for url in itertools.chain.from_iterable([f["url"] for f in files])
if selector in url
]

http_urls = select(files, "HTTP")
# sometimes the opendap request form is returned. we strip the trailing suffix if needed
opendap_urls = select(files, "OPENDAP")
opendap_urls = [u[:-5] if u.endswith(".nc.html") else u for u in opendap_urls]
return opendap_urls

def get_access_paths(self, dataset: Dataset) -> List[List[str]]:
return {"opendap": opendap_urls, "http": http_urls}

def get_access_paths(self, dataset: Dataset) -> AccessURLs:
return self.get_all_access_paths_by_id(dataset.metadata["id"])

def natural_language_search(
Expand Down Expand Up @@ -243,7 +254,7 @@ def process_natural_language(self, search_query: str) -> str:
],
temperature=0.7,
)
query = response.choices[0].message.content
query = response.choices[0].message.content or ""
print(query)
query = query[query.find("{") :]
return query
Expand Down
13 changes: 10 additions & 3 deletions api/settings.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from typing import Dict, Any
from typing import Tuple
from pydantic import Field
from pydantic_settings import BaseSettings
import os


class Settings(BaseSettings):
esgf_url: str = Field(os.environ.get("ESGF_URL", "https://esgf-node.llnl.gov/esg-search"))
esgf_url: str = Field(
os.environ.get("ESGF_URL", "https://esgf-node.llnl.gov/esg-search")
)
esgf_openid: Tuple[str, str] = Field(
(os.environ.get("ESGF_OPENID_USER", ""), os.environ.get("ESGF_OPENID_PASS", ""))
)
default_facets: str = Field("project,experiment_family")
entries_per_page: int = Field(20)

Expand All @@ -15,7 +20,9 @@ class Settings(BaseSettings):
minio_url: str = Field(os.environ.get("MINIO_URL", "http://minio:9000"))
minio_user: str = Field(os.environ.get("MINIO_USER", "miniouser"))
minio_pass: str = Field(os.environ.get("MINIO_PASS", "miniopass"))
bucket_name: str = Field(os.environ.get("MINIO_BUCKET_NAME", "climate-data-test-bucket"))
bucket_name: str = Field(
os.environ.get("MINIO_BUCKET_NAME", "climate-data-test-bucket")
)

terarium_url: str = Field(
os.environ.get("TERARIUM_URL", "https://server.staging.terarium.ai")
Expand Down
2 changes: 2 additions & 0 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ TERARIUM_USER={TERARIUM_USER}
TERARIUM_PASS={TERARIUM_PASS}

ESGF_URL="https://esgf-node.llnl.gov/esg-search"
ESGF_OPENID_USER={ESGF_OPENID_USER}
ESGF_OPENID_PASS={ESGF_OPENID_PASS}

REDIS_HOST="redis-climate-data"
REDIS_PORT=6379
Expand Down

0 comments on commit ce4492c

Please sign in to comment.