-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(adapters):update to latest delft fiat release dod #666
base: main
Are you sure you want to change the base?
Changes from 17 commits
e4fdcbd
b0a304c
054fce7
62d9a69
59f1777
725137a
0e63ad7
9c306c2
8b95156
758a36a
c71cd4f
fe8deac
ddd0756
39b5ee1
6abd3bf
83c40d3
ffe24a5
854f919
1937f4f
03abb39
506c533
8c7ee69
1268de8
b4cc6e9
1ce9e78
e189e14
57e2c88
3e6c27d
91c5136
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,20 +43,29 @@ | |
class FiatColumns: | ||
"""Object with mapping of FIAT attributes to columns names.""" | ||
|
||
object_id = "Object ID" | ||
object_name = "Object Name" | ||
primary_object_type = "Primary Object Type" | ||
secondary_object_type = "Secondary Object Type" | ||
extraction_method = "Extraction Method" | ||
ground_floor_height = "Ground Floor Height" | ||
ground_elevation = "Ground Elevation" | ||
damage_function = "Damage Function: " | ||
max_potential_damage = "Max Potential Damage: " | ||
aggregation_label = "Aggregation Label: " | ||
inundation_depth = "Inundation Depth" | ||
damage = "Damage: " | ||
total_damage = "Total Damage" | ||
risk_ead = "Risk (EAD)" | ||
object_id = "object_id" | ||
object_name = "object_name" | ||
primary_object_type = "primary_object_type" | ||
secondary_object_type = "secondary_object_type" | ||
extraction_method = "extract_method" | ||
ground_floor_height = "ground_flht" | ||
ground_elevation = "ground_elevtn" | ||
inundation_depth = "inun_depth" | ||
damage_structure = "damage_structure" | ||
damage_content = "damage_content" | ||
damage_other = "damage_other" | ||
total_damage = "total_damage" | ||
ead_damage = "ead_damage" | ||
segment_length = "segment_length" | ||
damage_default = "damage" | ||
fn_damage_structure = "fn_damage_structure" | ||
fn_damage_content = "fn_damage_content" | ||
fn_damage_other = "fn_damage_other" | ||
max_pot_damage_default = "max_damage_" | ||
max_pot_damage_structure = "max_damage_structure" | ||
max_pot_damage_content = "max_damage_content" | ||
max_pot_damage_other = "max_damage_other" | ||
aggr_label_default = "aggregation_label" | ||
|
||
|
||
class FiatAdapter(IImpactAdapter): | ||
|
@@ -91,6 +100,34 @@ def __init__( | |
self.config_base_path = config_base_path | ||
self.exe_path = exe_path | ||
self.delete_crashed_runs = delete_crashed_runs | ||
self.fiat_columns_dict = { | ||
k: v for k, v in FiatColumns.__dict__.items() if not k.startswith("__") | ||
} | ||
self.fiat_output_mapping = { | ||
FiatColumns.object_id: "Object ID", | ||
FiatColumns.object_name: "Object Name", | ||
FiatColumns.primary_object_type: "Primary Object Type", | ||
FiatColumns.secondary_object_type: "Secondary Object Type", | ||
FiatColumns.extraction_method: "Extraction Method", | ||
FiatColumns.ground_floor_height: "Ground Floor Height", | ||
FiatColumns.ground_elevation: "Ground Elevation", | ||
FiatColumns.inundation_depth: "Inundation Depth", | ||
FiatColumns.damage_default: "Damage: ", | ||
FiatColumns.damage_structure: "Damage: Structure", | ||
FiatColumns.damage_content: "Damage: Content", | ||
FiatColumns.damage_other: "Damage: Other", | ||
FiatColumns.total_damage: "Total Damage", | ||
FiatColumns.ead_damage: "Risk (EAD)", | ||
FiatColumns.segment_length: "Segment Length", | ||
FiatColumns.fn_damage_structure: "Damage Function: Structure", | ||
FiatColumns.fn_damage_content: "Damage Function: Content", | ||
FiatColumns.fn_damage_other: "Damage Function: Other", | ||
FiatColumns.max_pot_damage_default: "Max Potential Damage:", | ||
FiatColumns.max_pot_damage_structure: "Max Potential Damage: Structure", | ||
FiatColumns.max_pot_damage_content: "Max Potential Damage: Content", | ||
FiatColumns.max_pot_damage_other: "Max Potential Damage: Other", | ||
FiatColumns.aggr_label_default: "Aggregation level:", | ||
} | ||
self._model = FiatModel(root=str(model_root.resolve()), mode="r") | ||
self._model.read() | ||
|
||
|
@@ -405,6 +442,16 @@ def postprocess(self, scenario): | |
|
||
mode = scenario.event.attrs.mode | ||
|
||
# Map exposure columns | ||
for aggregation in self.config.aggregation: | ||
name = aggregation.name | ||
self.fiat_output_mapping[f"{FiatColumns.aggr_label_default}:{name}"] = ( | ||
f"Aggregation Label: {name}" | ||
) | ||
metrics_exposure = self.outputs["table"].rename( | ||
columns=self.fiat_output_mapping | ||
) | ||
|
||
# Define scenario output path | ||
scenario_output_path = scenario.impacts.results_path | ||
impacts_output_path = scenario.impacts.impacts_path | ||
|
@@ -416,10 +463,12 @@ def postprocess(self, scenario): | |
config_path = scenario.database.static_path.joinpath( | ||
"templates", "infometrics", "metrics_additional_risk_configs.toml" | ||
) | ||
|
||
with open(config_path, mode="rb") as fp: | ||
config = tomli.load(fp)["flood_exceedance"] | ||
self.add_exceedance_probability( | ||
column=config["column"], | ||
|
||
metrics_exposure = self.add_exceedance_probability( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this called metrics exposure? It is the output table per object right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmmm because it's the exposure data but for the output metrics. |
||
column=FiatColumns.inundation_depth, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not always the inundation depth. It is provided in the config and could be the total damage for example. So probably the mapping dictionary should be used here as well? |
||
threshold=config["threshold"], | ||
period=config["period"], | ||
) | ||
|
@@ -428,7 +477,8 @@ def postprocess(self, scenario): | |
fiat_results_path = impacts_output_path.joinpath( | ||
f"Impacts_detailed_{scenario.attrs.name}.csv" | ||
) | ||
self.outputs["table"].to_csv(fiat_results_path) | ||
|
||
metrics_exposure.to_csv(fiat_results_path) | ||
|
||
# Create the infometrics files | ||
if mode == Mode.risk: | ||
|
@@ -453,7 +503,9 @@ def postprocess(self, scenario): | |
metrics_outputs_path = scenario_output_path.joinpath( | ||
f"Infometrics_{scenario.attrs.name}.csv" | ||
) | ||
self.create_infometrics(metric_config_paths, metrics_outputs_path) | ||
self.create_infometrics( | ||
metric_config_paths, metrics_outputs_path, metrics_exposure | ||
) | ||
|
||
# Get paths of created aggregated infometrics | ||
aggr_metrics_paths = list( | ||
|
@@ -511,6 +563,38 @@ def postprocess(self, scenario): | |
if not self.config.save_simulation: | ||
self.delete_model() | ||
|
||
def map_risk_metrics(self, metrics_exposure: pd.DataFrame): | ||
for column in metrics_exposure: | ||
if FiatColumns.inundation_depth in column: | ||
inun_rp = column.split(FiatColumns.inundation_depth + "_")[-1] | ||
rp = f"({inun_rp.split('.0y')[0]}Y)" | ||
new_column = str( | ||
self.fiat_output_mapping[FiatColumns.inundation_depth] + " " + rp | ||
) | ||
metrics_exposure = metrics_exposure.rename(columns={column: new_column}) | ||
elif FiatColumns.damage_default in column: | ||
if FiatColumns.total_damage in column: | ||
damage_rp = column.split(FiatColumns.total_damage + "_")[-1] | ||
rp = f"({damage_rp.split('.0y')[0]}Y)" | ||
new_column = str( | ||
self.fiat_output_mapping[FiatColumns.total_damage] + " " + rp | ||
) | ||
else: | ||
damage_rp = column.split(FiatColumns.damage_default + "_")[-1] | ||
damage = damage_rp.split("_")[0].capitalize() | ||
rp = f"({damage_rp.split('_')[1].split('.0y')[0]}Y)" | ||
new_column = str( | ||
self.fiat_output_mapping[FiatColumns.damage_default] | ||
+ damage | ||
+ " " | ||
+ rp | ||
) | ||
metrics_exposure = metrics_exposure.rename(columns={column: new_column}) | ||
else: | ||
metrics_exposure = metrics_exposure | ||
|
||
return metrics_exposure | ||
|
||
def add_measure(self, measure: IMeasure): | ||
""" | ||
Add and apply a specific impact measure to the properties of the FIAT model. | ||
|
@@ -803,7 +887,7 @@ def apply_population_growth_new( | |
percent_growth=population_growth, | ||
geom_file=Path(area_path), | ||
ground_floor_height=ground_floor_height, | ||
damage_types=["Structure", "Content"], | ||
damage_types=["structure", "content"], | ||
vulnerability=self._model.vulnerability, | ||
ground_elevation=ground_elevation, | ||
aggregation_area_fn=aggregation_areas, | ||
|
@@ -952,7 +1036,7 @@ def floodproof_properties(self, floodproof: FloodProof) -> None: | |
self._model.exposure.truncate_damage_function( | ||
objectids=objectids, | ||
floodproof_to=floodproof.attrs.elevation.value, | ||
damage_function_types=["Structure", "Content"], | ||
damage_function_types=["structure", "content"], | ||
vulnerability=self._model.vulnerability, | ||
) | ||
|
||
|
@@ -1074,7 +1158,10 @@ def get_object_ids(self, measure: IMeasure) -> list[Any]: | |
# POST-PROCESSING METHODS | ||
|
||
def add_exceedance_probability( | ||
self, column: str, threshold: float, period: int | ||
self, | ||
column: str, | ||
threshold: float, | ||
period: int, | ||
) -> pd.DataFrame: | ||
"""Calculate exceedance probabilities and append them to the results table. | ||
|
||
|
@@ -1096,11 +1183,16 @@ def add_exceedance_probability( | |
fiat_results_df = ExceedanceProbabilityCalculator(column).append_probability( | ||
self.outputs["table"], threshold, period | ||
) | ||
self.outputs["table"] = fiat_results_df | ||
return self.outputs["table"] | ||
fiat_results_df = fiat_results_df.rename(columns=self.fiat_output_mapping) | ||
fiat_results_df = self.map_risk_metrics(fiat_results_df) | ||
|
||
return fiat_results_df | ||
|
||
def create_infometrics( | ||
self, metric_config_paths: list[os.PathLike], metrics_output_path: os.PathLike | ||
self, | ||
metric_config_paths: list[os.PathLike], | ||
metrics_output_path: os.PathLike, | ||
metrics_exposure: pd.DataFrame, | ||
Santonia27 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) -> None: | ||
""" | ||
Create infometrics files based on the provided metric configuration paths. | ||
|
@@ -1124,16 +1216,18 @@ def create_infometrics( | |
# Check if type of metric configuration is available | ||
for metric_file in metric_config_paths: | ||
if metric_file.exists(): | ||
metrics_writer = MetricsFileWriter(metric_file) | ||
metrics_writer = MetricsFileWriter( | ||
metric_file, aggregation_label="Aggregation Label: " | ||
) | ||
|
||
metrics_writer.parse_metrics_to_file( | ||
df_results=self.outputs["table"], | ||
df_results=metrics_exposure, | ||
metrics_path=metrics_output_path, | ||
write_aggregate=None, | ||
) | ||
|
||
metrics_writer.parse_metrics_to_file( | ||
df_results=self.outputs["table"], | ||
df_results=metrics_exposure, | ||
metrics_path=metrics_output_path, | ||
write_aggregate="all", | ||
) | ||
|
@@ -1349,9 +1443,14 @@ def save_building_footprints(self, output_path: os.PathLike): | |
how="left", | ||
) | ||
) | ||
|
||
footprints.aggregate(fiat_results_df) | ||
footprints.calc_normalized_damages() | ||
# Add aggregation to fiat_columns | ||
for aggregation in self.config.aggregation: | ||
self.fiat_columns_dict[f"aggr_label_default:{aggregation.name}"] = ( | ||
f"{self.fiat_columns_dict['aggr_label_default']}:{aggregation.name}" | ||
) | ||
# Get damages | ||
footprints.aggregate(fiat_results_df, self.fiat_columns_dict) | ||
footprints.calc_normalized_damages(self.fiat_columns_dict) | ||
|
||
# Save footprint | ||
footprints.write(output_path) | ||
|
@@ -1374,7 +1473,10 @@ def save_roads(self, output_path: os.PathLike): | |
aggr_cols = [ | ||
name | ||
for name in self.outputs["table"].columns | ||
if FiatColumns.aggregation_label in name | ||
if any( | ||
f"aggregation_label:{aggregation.name}" in name | ||
for aggregation in self.config.aggregation | ||
) | ||
] | ||
inun_cols = [ | ||
name for name in roads.columns if FiatColumns.inundation_depth in name | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed? You redefine all the names bellow in the fiat_output_mapping dictionary, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because I only need the output mapping for the metrics after calculating the metrics.
Beforehand to actually do the calculation I used the self,fiat_columns_dict