Skip to content

Commit 104f38a

Browse files
authored
Merge pull request #6 from ElaadNL/audit-predictions
Audit predictions
2 parents 8c8b52b + 705b77a commit 104f38a

File tree

6 files changed

+81
-19
lines changed

6 files changed

+81
-19
lines changed

src/application/generate_events.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from src.models.predicted_load import PredictedGridAssetLoad
1717

1818

19-
class PredictionActionsBase[ReadOnlySession](ABC):
19+
class PredictionActionsBase[ReadOnlySession, WriteSession](ABC):
2020
"""Abstract class which contains methods used by this workflow.
2121
2222
These methods are implemented on a higher level and provided to the functions of this
@@ -27,6 +27,10 @@ class PredictionActionsBase[ReadOnlySession](ABC):
2727
def get_query_api(self) -> ReadOnlySession:
2828
"""Retrieve a read-only session to the database."""
2929

30+
@abstractmethod
31+
def get_write_api(self) -> WriteSession:
32+
"""Retrieve a write session to the database."""
33+
3034
@abstractmethod
3135
async def get_predicted_grid_asset_load(
3236
self, query_api: ReadOnlySession, from_date: datetime, to_date: datetime
@@ -42,6 +46,19 @@ async def get_predicted_grid_asset_load(
4246
list[PredictedGridAssetLoad]: The list of predicted grid asset loads.
4347
"""
4448

49+
@abstractmethod
50+
async def audit_predicted_grid_asset_loads(
51+
self,
52+
write_api: WriteSession,
53+
predicted_grid_asset_loads: list[PredictedGridAssetLoad],
54+
) -> None:
55+
"""Audit predicted grid asset loads by storing them in the database.
56+
57+
Args:
58+
write_api (WriteSession): The write connection to the database.
59+
predicted_grid_asset_loads (list[PredictedGridAssetLoad]): The list of predicted grid asset loads to audit.
60+
"""
61+
4562

4663
def _generate_capacity_limitation_intervals(
4764
interval_id: int,
@@ -123,7 +140,7 @@ async def get_capacity_limitation_event(
123140
Args:
124141
actions (PredictionActionsBase): The actions to use.
125142
from_date (datetime): The start time (inclusive) from which to fetch OpenADR events.
126-
to_date (datetime): The end time (exclusive) from which to fetch OpenADR events.
143+
to_date (datetime): The end time (inclusive) from which to fetch OpenADR events.
127144
128145
Returns:
129146
Event | None: The OpenADR3 capacity limitation event. None if no data to base the event on could be retrieved.
@@ -140,4 +157,9 @@ async def get_capacity_limitation_event(
140157
)
141158
return None
142159

160+
write_api = actions.get_write_api()
161+
await actions.audit_predicted_grid_asset_loads(
162+
write_api, predicted_grid_asset_loads
163+
)
164+
143165
return _generate_capacity_limitation_event(predicted_grid_asset_loads, MAX_CAPACITY)

src/config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
STANDARD_PROFILES_BUCKET_NAME = config(
2929
"STANDARD_PROFILES_BUCKET_NAME", default="ditm_standard_profiles"
3030
)
31-
DALIDATA_BUCKET_NAME = config("DALIDATA_BUCKET_NAME", default="dalidata")
31+
DALIDATA_BUCKET_NAME = config(
32+
"DALIDATA_BUCKET_NAME", default="ditm-dali-data-processed"
33+
)
3234

3335
# External services URLs
3436
WEATHER_FORECAST_API_URL = config("WEATHER_FORECAST_API_URL")

src/infrastructure/azureml/feature_generation.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,28 +144,28 @@ async def _get_lag_features_for_dates(
144144
)
145145

146146
predict_datetimes_df["lag_1_year"] = dalidata_df.reindex(lag_1_year_dt)[
147-
"value"
147+
"WAARDE"
148148
].values
149149
predict_datetimes_df["lag_1_days"] = dalidata_df.reindex(lag_1_day_dt)[
150-
"value"
150+
"WAARDE"
151151
].values
152152
predict_datetimes_df["lag_2_days"] = dalidata_df.reindex(lag_2_day_dt)[
153-
"value"
153+
"WAARDE"
154154
].values
155155
predict_datetimes_df["lag_3_days"] = dalidata_df.reindex(lag_3_day_dt)[
156-
"value"
156+
"WAARDE"
157157
].values
158158
predict_datetimes_df["lag_4_days"] = dalidata_df.reindex(lag_4_day_dt)[
159-
"value"
159+
"WAARDE"
160160
].values
161161
predict_datetimes_df["lag_5_days"] = dalidata_df.reindex(lag_5_day_dt)[
162-
"value"
162+
"WAARDE"
163163
].values
164164
predict_datetimes_df["lag_6_days"] = dalidata_df.reindex(lag_6_day_dt)[
165-
"value"
165+
"WAARDE"
166166
].values
167167
predict_datetimes_df["lag_7_days"] = dalidata_df.reindex(lag_7_day_dt)[
168-
"value"
168+
"WAARDE"
169169
].values
170170

171171
return predict_datetimes_df

src/infrastructure/influxdb/dalidata/query_dali_data.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,22 @@ async def retrieve_dali_data_between(
1111
start_date_inclusive: datetime,
1212
end_date_inclusive: datetime,
1313
) -> pd.DataFrame:
14-
"""Retrieve standard profiles from InfluxDB between the given dates.
14+
"""Retrieve dalidata from InfluxDB between the given dates.
1515
1616
Args:
1717
start_date_inclusive (datetime): The start date (inclusive)
1818
end_date_inclusive (datetime): The end date (inclusive)
1919
2020
Returns:
21-
pd.DataFrame: The dataframe containing standard profiles for the date range.
21+
pd.DataFrame: The dataframe containing dalidata for the date range.
2222
"""
2323
start_date_str = start_date_inclusive.strftime(format="%Y-%m-%dT%H:%M:%SZ")
2424
end_date_str = end_date_inclusive.strftime(format="%Y-%m-%dT%H:%M:%SZ")
2525

2626
query = f"""from(bucket: "{DALIDATA_BUCKET_NAME}")
2727
|> range(start: {start_date_str}, stop: {end_date_str})
28-
|> filter(fn: (r) => r["_measurement"] == "kinesis_data")
29-
|> filter(fn: (r) => r["_field"] == "value")
30-
|> filter(fn: (r) => r["boxid"] == "CVD.091030-1")
31-
|> filter(fn: (r) => r["description"] == "Trafometing vermogen som 3 fases - gem. 15 min." or r["description"] == "Trafometing_vermogen_som_3_fases_-_gem._15_min.")
28+
|> filter(fn: (r) => r["_measurement"] == "WAARDE")
29+
|> filter(fn: (r) => r["_field"] == "WAARDE")
3230
|> group(columns: [])
3331
|> sort(columns: ["_time"])
3432
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

src/infrastructure/prediction_actions_impl.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44

55
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
66
from influxdb_client.client.query_api_async import QueryApiAsync
7+
from influxdb_client.client.write_api_async import WriteApiAsync
78

89
from src.application.generate_events import PredictionActionsBase
910
from src.infrastructure.azureml.feature_generation import get_features_between_dates
1011
from src.infrastructure.azureml.predictions import get_predictions_for_features
1112
from src.models.predicted_load import PredictedGridAssetLoad
13+
from src.infrastructure.influxdb.trafo_load_audit import store_predictions_for_audit
1214

1315

14-
class PredictionActionsInfluxDB(PredictionActionsBase[QueryApiAsync]):
16+
class PredictionActionsInfluxDB(PredictionActionsBase[QueryApiAsync, WriteApiAsync]):
1517
"""Implementation of the prediction actions using influxDB.
1618
1719
This is implemented seperately in the infrastructure layer to promote decoupling of business
@@ -34,6 +36,10 @@ def get_query_api(self) -> QueryApiAsync:
3436
"""Retrieve a read-only connection for the database."""
3537
return self.client.query_api()
3638

39+
def get_write_api(self) -> WriteApiAsync:
40+
"""Retrieve a write connection for the database."""
41+
return self.client.write_api()
42+
3743
async def get_predicted_grid_asset_load(
3844
self, query_api: QueryApiAsync, from_date: datetime, to_date: datetime
3945
) -> list[PredictedGridAssetLoad]:
@@ -53,3 +59,19 @@ async def get_predicted_grid_asset_load(
5359
end_date_inclusive=to_date,
5460
)
5561
return get_predictions_for_features(features=features_for_time_range)
62+
63+
async def audit_predicted_grid_asset_loads(
64+
self,
65+
write_api: WriteApiAsync,
66+
predicted_grid_asset_loads: list[PredictedGridAssetLoad],
67+
) -> None:
68+
"""Audit predicted grid asset loads by storing them in the database.
69+
70+
Args:
71+
write_api (WriteApi): The write connection to the database.
72+
predicted_grid_asset_loads (list[PredictedGridAssetLoad]): The list of predicted grid asset loads to audit.
73+
"""
74+
await store_predictions_for_audit(
75+
write_api=write_api,
76+
predicted_loads=predicted_grid_asset_loads,
77+
)

src/infrastructure/predictions_actions_stub_impl.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from src.models.predicted_load import PredictedGridAssetLoad
1010

1111

12-
class PredictionActionsStub(PredictionActionsBase[None]):
12+
class PredictionActionsStub(PredictionActionsBase[None, None]):
1313
"""Stub implementation of the prediction actions, generates predicted grid asset loads when called.
1414
1515
Args:
@@ -24,6 +24,10 @@ def get_query_api(self) -> None:
2424
"""Retrieve a read-only connection for the database."""
2525
return None
2626

27+
def get_write_api(self) -> None:
28+
"""Retrieve a write connection for the database."""
29+
return None
30+
2731
async def get_predicted_grid_asset_load(
2832
self, query_api: None, from_date: datetime, to_date: datetime
2933
) -> list[PredictedGridAssetLoad]:
@@ -54,3 +58,17 @@ async def get_predicted_grid_asset_load(
5458
current_time += step
5559

5660
return predicted_grid_asset_loads
61+
62+
async def audit_predicted_grid_asset_loads(
63+
self,
64+
write_api: None,
65+
predicted_grid_asset_loads: list[PredictedGridAssetLoad],
66+
) -> None:
67+
"""Stub implementation of auditing predicted grid asset loads.
68+
69+
Args:
70+
write_api (None): The write connection.
71+
predicted_grid_asset_loads (list[PredictedGridAssetLoad]): The list of predicted grid asset loads to audit.
72+
"""
73+
# In this stub implementation, we do nothing.
74+
pass

0 commit comments

Comments
 (0)