Skip to content

Commit

Permalink
Merge pull request noaa-oar-arl#125 from zmoon/aeronet-parallel-dates
Browse files Browse the repository at this point in the history
AERONET dates when `parallel` option used
  • Loading branch information
zmoon authored Sep 25, 2023
2 parents 4769bca + 1c1a3da commit ce5d939
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 9 deletions.
21 changes: 13 additions & 8 deletions monetio/obs/aeronet.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,21 @@ def add_data(
interp_to_aod_values=interp_to_aod_values,
)

requested_parallel = n_procs > 1 or n_procs == -1
if has_joblib and requested_parallel:
# Split up by day
requested_parallel = n_procs != 1

# Split up by day
dates = pd.to_datetime(dates)
if dates is not None:
min_date = dates.min()
max_date = dates.max()
days = pd.date_range(start=min_date, end=max_date, freq="D") # TODO: subtract 1?
days1 = days + pd.Timedelta(days=1)
time_bounds = pd.date_range(start=min_date, end=max_date, freq="D")
if max_date not in time_bounds:
time_bounds = time_bounds.append(pd.DatetimeIndex([max_date]))

if has_joblib and requested_parallel and dates is not None and len(time_bounds) > 2:
dfs = Parallel(n_jobs=n_procs, verbose=verbose)(
delayed(_parallel_aeronet_call)(pd.DatetimeIndex([d1, d2]), **kwargs, freq=None)
for d1, d2 in zip(days, days1)
delayed(_parallel_aeronet_call)(pd.DatetimeIndex([t1, t2]), **kwargs, freq=None)
for t1, t2 in zip(time_bounds[:-1], time_bounds[1:])
)
df = pd.concat(dfs, ignore_index=True).drop_duplicates()
if freq is not None:
Expand Down Expand Up @@ -462,7 +467,7 @@ def add_data(
now = datetime.utcnow()
self.dates = pd.date_range(start=now.date(), end=now, freq="H")
else:
self.dates = dates
self.dates = pd.DatetimeIndex(dates)
if product is not None:
self.prod = product.upper()
else:
Expand Down
29 changes: 29 additions & 0 deletions tests/test_aeronet.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,32 @@ def test_interp_daily_with_pytspack():
df = aeronet.add_data(dates, daily=True, n_procs=1, interp_to_aod_values=standard_wavelengths)

assert {f"aod_{int(wl)}nm" for wl in standard_wavelengths}.issubset(df.columns)


@pytest.mark.parametrize(
"dates",
[
pd.to_datetime(["2019-09-01", "2019-09-02"]),
pd.to_datetime(["2019-09-01", "2019-09-03"]),
pd.to_datetime(["2019-09-01", "2019-09-01 12:00"]),
],
ids=[
"one day",
"two days",
"half day",
],
)
def test_issue100(dates, request):
df1 = aeronet.add_data(dates, n_procs=1)
df2 = aeronet.add_data(dates, n_procs=2)
assert len(df1) == len(df2)
if request.node.callspec.id == "two days":
# Sort first (can use `df1.compare(df2)` for debugging)
# Seems the sorting is site then time, not time then site
# which is why this is necessary
df1_ = df1.sort_values(["time", "siteid"]).reset_index(drop=True)
df2_ = df2.sort_values(["time", "siteid"]).reset_index(drop=True)
assert df1_.equals(df2_)
else:
assert df1.equals(df2)
assert dates[0] < df1.time.min() < df1.time.max() < dates[-1]
2 changes: 1 addition & 1 deletion tests/test_aqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


@pytest.mark.xfail(
not ssl_version < (2,), strict=True, reason="Doesn't work with newer OpenSSL", raises=SSLError
not ssl_version < (2,), strict=False, reason="Doesn't work with newer OpenSSL", raises=SSLError
)
def test_aqs():
# For MM data proc example
Expand Down

0 comments on commit ce5d939

Please sign in to comment.