From 2484767298b7174c57ff926eb757e0f28b9da31e Mon Sep 17 00:00:00 2001 From: hhs732 Date: Tue, 18 Jun 2024 19:12:28 +0000 Subject: [PATCH 01/24] litting applied --- fim_post_processing.sh | 20 +- src/bash_variables.env | 3 +- src/bathymetric_adjustment.py | 338 +++++++++++++++++++++++++++++----- 3 files changed, 295 insertions(+), 66 deletions(-) diff --git a/fim_post_processing.sh b/fim_post_processing.sh index 2a7965efb..ae3307e7f 100755 --- a/fim_post_processing.sh +++ b/fim_post_processing.sh @@ -112,7 +112,8 @@ if [ "$bathymetry_adjust" = "True" ]; then Tstart python3 $srcDir/bathymetric_adjustment.py \ -fim_dir $outputDestDir \ - -bathy $bathymetry_file \ + -bathy_ehydro $bathymetry_file_ehydro \ + -bathy_aibased $bathymetry_file_aibased \ -buffer $wbd_buffer \ -wbd $inputsDir/wbd/WBD_National_EPSG_5070_WBDHU8_clip_dem_domain.gpkg \ -j $jobLimit @@ -218,21 +219,8 @@ python3 $toolsDir/combine_crosswalk_tables.py \ Tcount date -u -echo -e $startDiv"Resetting Permissions" -Tstart - find $outputDestDir -type d -exec chmod -R 777 {} + - find $outputDestDir -type f -exec chmod 777 {} + # just root level files -Tcount -date -u - -echo -echo -e $startDiv"Scanning logs for errors. Results be saved in root not inside the log folder." -Tstart - # grep -H -r -i -n "error" $outputDestDir/logs/ > $outputDestDir/all_errors_from_logs.log - find $outputDestDir -type f | grep -H -r -i -n "error" $outputDestDir/logs/ > $outputDestDir/all_errors_from_logs.log -Tcount -date -u - +find $outputDestDir -type d -exec chmod -R 777 {} + +find $outputDestDir -type f -exec chmod -R 777 {} + echo echo "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++" diff --git a/src/bash_variables.env b/src/bash_variables.env index 42513fad8..2e2ce911d 100644 --- a/src/bash_variables.env +++ b/src/bash_variables.env @@ -29,7 +29,8 @@ export input_WBD_gdb=${inputsDir}/wbd/WBD_National_EPSG export input_WBD_gdb_Alaska=${inputsDir}/wbd/WBD_National_South_Alaska.gpkg export input_calib_points_dir=${inputsDir}/rating_curve/water_edge_database/calibration_points/ export pre_clip_huc_dir=${inputsDir}/pre_clip_huc8/240502 -export bathymetry_file=${inputsDir}/bathymetry/bathymetry_adjustment_data.gpkg +export bathymetry_file_ehydro=${inputsDir}/bathymetry/bathymetric_adjustment_data.gpkg +export bathymetry_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet export osm_bridges=${inputsDir}/osm/bridges/240426/osm_all_bridges.gpkg # input file location with nwm feature_id and recurrence flow values diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index 9c114173b..0787a700c 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -4,19 +4,19 @@ import os import re import sys -import traceback from argparse import ArgumentParser -from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ProcessPoolExecutor, as_completed from os.path import join import geopandas as gpd import pandas as pd -from synthesize_test_cases import progress_bar_handler -def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): +# ------------------------------------------------------- +# Adjusting synthetic rating curves using 'USACE eHydro' bathymetry data +def correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file_ehydro, verbose): """Function for correcting synthetic rating curves. It will correct each branch's - SRCs in serial based on the feature_ids in the input bathy_file. + SRCs in serial based on the feature_ids in the input eHydro bathy_file. Parameters ---------- @@ -24,8 +24,8 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): Directory path for fim_pipeline output. huc : str HUC-8 string. - bathy_file : str - Path to bathymetric adjustment geopackage, e.g. + bathy_file_ehydro : str + Path to eHydro bathymetric adjustment geopackage, e.g. "/data/inputs/bathymetry/bathymetry_adjustment_data.gpkg". verbose : bool Verbose printing. @@ -36,12 +36,12 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): """ - log_text = f'Calculating bathymetry adjustment: {huc}\n' + log_text = f'Calculating eHydro bathymetry adjustment: {huc}\n' # Load wbd and use it as a mask to pull the bathymetry data fim_huc_dir = join(fim_dir, huc) wbd8_clp = gpd.read_file(join(fim_huc_dir, 'wbd8_clp.gpkg'), engine="pyogrio", use_arrow=True) - bathy_data = gpd.read_file(bathy_file, mask=wbd8_clp, engine="fiona") + bathy_data = gpd.read_file(bathy_file_ehydro, mask=wbd8_clp, engine="fiona") bathy_data = bathy_data.rename(columns={'ID': 'feature_id'}) # Get src_full from each branch @@ -54,14 +54,14 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): # Update src parameters with bathymetric data for src in src_all_branches: - src_df = pd.read_csv(src) + src_df = pd.read_csv(src, low_memory=False) if 'Bathymetry_source' in src_df.columns: src_df = src_df.drop(columns='Bathymetry_source') branch = re.search(r'branches/(\d{10}|0)/', src).group()[9:-1] log_text += f' Branch: {branch}\n' if bathy_data.empty: - log_text += ' There were no bathymetry feature_ids for this branch' + log_text += ' There were no eHydro bathymetry feature_ids for this branch' src_df['Bathymetry_source'] = [""] * len(src_df) src_df.to_csv(src, index=False) return log_text @@ -85,6 +85,7 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): 'Bathymetry_source' ].first() src_df = src_df.merge(reconciled_bathy_data, on='feature_id', how='left', validate='many_to_one') + # Exit if there are no recalculations to be made if ~src_df['Bathymetry_source'].any(axis=None): log_text += ' No matching feature_ids in this branch\n' @@ -118,10 +119,206 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): # Write src back to file src_df.to_csv(src, index=False) log_text += f' Successfully recalculated {count} HydroIDs\n' + + return log_text + + +# ------------------------------------------------------- +# Adjusting synthetic rating curves using 'AI-based' bathymetry data +def correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_aibased, verbose): + """ + Function for correcting synthetic rating curves. It will correct each branch's + SRCs in serial based on the feature_ids in the input AI-based bathy_file. + + Parameters + ---------- + fim_dir : str + Directory path for fim_pipeline output. + huc : str + HUC-8 string. + strm_order : int + stream order on or higher for which you want to apply AI-based bathymetry data. + default = 6 + bathy_file_aibased : str + Path to AI-based bathymetric adjustment file, e.g. + "/data/inputs/bathymetry/ml_outputs_v1.01.parquet". + verbose : bool + Verbose printing. + + Returns + ---------- + log_text : str + + """ + log_text = f'Calculating AI-based bathymetry adjustment: {huc}\n' + + # Load AI-based bathymetry data + ml_bathy_data = pd.read_parquet(bathy_file_aibased, engine='pyarrow') + ml_bathy_data_df = ml_bathy_data[ + ['COMID', 'owp_tw_inchan', 'owp_inchan_channel_area', 'owp_inchan_channel_perimeter'] + ] + + fim_huc_dir = join(fim_dir, huc) + + path_nwm_streams = join(fim_huc_dir, "nwm_subset_streams.gpkg") + nwm_stream = gpd.read_file(path_nwm_streams) + + wbd8 = gpd.read_file(join(fim_huc_dir, 'wbd.gpkg'), engine="pyogrio", use_arrow=True) + nwm_stream_clp = nwm_stream.clip(wbd8) + + ml_bathy_data_df = ml_bathy_data_df.merge( + nwm_stream_clp[['ID', 'order_']], left_on='COMID', right_on='ID' + ) + aib_bathy_data_df = ml_bathy_data_df.drop(columns=['ID']) + + aib_bathy_data_df = aib_bathy_data_df.rename(columns={'COMID': 'feature_id'}) + aib_bathy_data_df = aib_bathy_data_df.rename(columns={'owp_inchan_channel_area': 'missing_xs_area_m2'}) + + # Calculating missing_wet_perimeter_m and adding it to aib_bathy_data_gdf + missing_wet_perimeter_m = ( + aib_bathy_data_df['owp_inchan_channel_perimeter'] - aib_bathy_data_df['owp_tw_inchan'] + ) + aib_bathy_data_df['missing_wet_perimeter_m'] = missing_wet_perimeter_m + aib_bathy_data_df['Bathymetry_source'] = "AI_Based" + + # Excluding streams with order lower than order (default = 6) + aib_bathy_data_df.loc[ + aib_bathy_data_df["order_"] < strm_order, + ["missing_xs_area_m2", "missing_wet_perimeter_m", "Bathymetry_source"], + ] = 0 + aib_df = aib_bathy_data_df[ + ['feature_id', 'missing_xs_area_m2', 'missing_wet_perimeter_m', 'Bathymetry_source'] + ] + + # Get src_full from each branch + src_all_branches_path = [] + branches = os.listdir(join(fim_huc_dir, 'branches')) + for branch in branches: + src_full = join(fim_huc_dir, 'branches', str(branch), f'src_full_crosswalked_{branch}.csv') + if os.path.isfile(src_full): + src_all_branches_path.append(src_full) + + # Update src parameters with bathymetric data + for src in src_all_branches_path: + src_df = pd.read_csv(src, low_memory=False) + # print(src_df.loc[~src_df['Bathymetry_source'].isna()]['Bathymetry_source']) + + src_name = os.path.basename(src) + branch = src_name.split(".")[0].split("_")[-1] + log_text += f' Branch: {branch}\n' + + # Merge in missing bathy data and fill Nans + if "missing_xs_area_m2" not in src_df.columns: + src_df.drop(columns=["Bathymetry_source"], inplace=True) + src_df = src_df.merge(aib_df, on='feature_id', how='left', validate='many_to_one') + # print([src,src_df.columns]) + else: + src_df = pd.read_csv(src, low_memory=False) + src_df = src_df.merge(aib_df, on='feature_id', how='left', validate='many_to_one') + # print(src_df.loc[~src_df['Bathymetry_source_x'].isna()]["missing_xs_area_m2_x"]) + src_df.loc[src_df["Bathymetry_source_x"].isna(), ["missing_xs_area_m2_x"]] = src_df[ + "missing_xs_area_m2_y" + ] + src_df.loc[src_df["Bathymetry_source_x"].isna(), ["missing_wet_perimeter_m_x"]] = src_df[ + "missing_wet_perimeter_m_y" + ] + src_df.loc[src_df["Bathymetry_source_x"].isna(), ["Bathymetry_source_x"]] = src_df[ + "Bathymetry_source_y" + ] + # for i in range (len(src_df["Bathymetry_source_x"])): + # if src_df["Bathymetry_source_y"][i] != 0: + # print([src_df["Bathymetry_source_x"][i], src_df["Bathymetry_source_y"][i], + # src_df["missing_xs_area_m2_x"][i], src_df["missing_xs_area_m2_y"][i]]) + src_df.drop( + columns=["missing_xs_area_m2_y", "missing_wet_perimeter_m_y", "Bathymetry_source_y"], + inplace=True, + ) + src_df = src_df.rename(columns={'missing_xs_area_m2_x': 'missing_xs_area_m2'}) + src_df = src_df.rename(columns={'missing_wet_perimeter_m_x': 'missing_wet_perimeter_m'}) + src_df = src_df.rename(columns={'Bathymetry_source_x': 'Bathymetry_source'}) + + # Exit if there are no recalculations to be made + if ~src_df['Bathymetry_source'].any(axis=None): + log_text += ' No matching feature_ids in this branch\n' + continue + + src_df['missing_xs_area_m2'] = src_df['missing_xs_area_m2'].fillna(0.0) + src_df['missing_wet_perimeter_m'] = src_df['missing_wet_perimeter_m'].fillna(0.0) + + # Add missing hydraulic geometry into base parameters + src_df['Volume (m3)'] = src_df['Volume (m3)'] + ( + src_df['missing_xs_area_m2'] * (src_df['LENGTHKM'] * 1000) + ) + src_df['BedArea (m2)'] = src_df['BedArea (m2)'] + ( + src_df['missing_wet_perimeter_m'] * (src_df['LENGTHKM'] * 1000) + ) + # Recalc discharge with adjusted geometries + src_df['WettedPerimeter (m)'] = src_df['WettedPerimeter (m)'] + src_df['missing_wet_perimeter_m'] + src_df['WetArea (m2)'] = src_df['WetArea (m2)'] + src_df['missing_xs_area_m2'] + src_df['HydraulicRadius (m)'] = src_df['WetArea (m2)'] / src_df['WettedPerimeter (m)'] + src_df['HydraulicRadius (m)'] = src_df['HydraulicRadius (m)'].fillna(0) + src_df['Discharge (m3s-1)'] = ( + src_df['WetArea (m2)'] + * pow(src_df['HydraulicRadius (m)'], 2.0 / 3) + * pow(src_df['SLOPE'], 0.5) + / src_df['ManningN'] + ) + # Force zero stage to have zero discharge + src_df.loc[src_df['Stage'] == 0, ['Discharge (m3s-1)']] = 0 + # Calculate number of adjusted HydroIDs + + # Write src back to file + src_df.to_csv(src, index=False) + return log_text -def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, number_of_jobs, verbose): +# ------------------------------------------------------- +# Apply src_adjustment_for_bathymetry +def apply_src_adjustment_for_bathymetry( + fim_dir, huc, strm_order, bathy_file_ehydro, bathy_file_aibased, verbose +): + """ + Function for applying both eHydro & AI-based bathymetry adjustment to synthetic rating curves. + + Parameters + ---------- + Please refer to correct_rating_for_ehydro_bathymetry and + correct_rating_for_ai_based_bathymetry functions parameters. + + Returns + ---------- + log_text : str + """ + if os.path.exists(bathy_file_ehydro): + + correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file_ehydro, verbose) + + else: + print('USACE eHydro bathymetry file does not exist') + + if os.path.exists(bathy_file_aibased): + + correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_aibased, verbose) + + else: + print('AI-based bathymetry file does not exist') + + # return log_text + + +# ------------------------------------------------------- +def multi_process_hucs( + fim_dir, + strm_order, + bathy_file_ehydro, + bathy_file_aibased, + wbd_buffer, + wbd, + output_suffix, + number_of_jobs, + verbose, +): """Function for correcting synthetic rating curves. It will correct each branch's SRCs in serial based on the feature_ids in the input bathy_file. @@ -129,9 +326,15 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb ---------- fim_dir : str Directory path for fim_pipeline output. - bathy_file : str - Path to bathymetric adjustment geopackage, e.g. + strm_order : int + stream order on or higher for which you want to apply AI-based bathymetry data. + default = 6 + bathy_file_eHydro : str + Path to eHydro bathymetric adjustment geopackage, e.g. "/data/inputs/bathymetry/bathymetry_adjustment_data.gpkg". + bathy_file_aibased : str + Path to AI-based bathymetric adjustment file, e.g. + "/data/inputs/bathymetry/ml_outputs_v1.01.parquet". wbd_buffer : int Distance in meters to buffer wbd dataset when searching for relevant HUCs. wbd : str @@ -145,7 +348,6 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb Verbose printing. """ - # Set up log file print( 'Writing progress to log file here: ' @@ -161,45 +363,48 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb log_file.write('#########################################################\n\n') # Exit program if the bathymetric data doesn't exist - if not os.path.exists(bathy_file): - statement = f'The input bathymetry file {bathy_file} does not exist. Exiting...' + if not all([os.path.exists(bathy_file_ehydro), os.path.exists(bathy_file_aibased)]): + statement = f'The input bathymetry files {bathy_file_ehydro} and {bathy_file_aibased} do not exist. Exiting...' log_file.write(statement) print(statement) sys.exit(0) - # Find applicable HUCs to apply bathymetric adjustment - # NOTE: This block can be removed if we have estimated bathymetry data for - # the whole domain later. + # Find applicable HUCs to apply ehydro bathymetric adjustment fim_hucs = [h for h in os.listdir(fim_dir) if re.match(r'\d{8}', h)] - bathy_gdf = gpd.read_file(bathy_file, engine="pyogrio", use_arrow=True) + bathy_gdf = gpd.read_file(bathy_file_ehydro, engine="pyogrio", use_arrow=True) buffered_bathy = bathy_gdf.geometry.buffer(wbd_buffer) # We buffer the bathymetric data to get adjacent wbd = gpd.read_file( wbd, mask=buffered_bathy, engine="fiona" ) # HUCs that could also have bathymetric reaches included hucs_with_bathy = wbd.HUC8.to_list() hucs = [h for h in fim_hucs if h in hucs_with_bathy] - log_file.write(f"Identified {len(hucs)} HUCs that have bathymetric data: {hucs}\n") - print(f"Identified {len(hucs)} HUCs that have bathymetric data\n") + log_file.write(f"Identified {len(hucs)} HUCs that have USACE eHydro bathymetric data: {hucs}\n") + print(f"Identified {len(hucs)} HUCs that have USACE eHydro bathymetric data\n") - # Set up multiprocessor + failed_HUCs_list = [] with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: - # Loop through all test cases, build the alpha test arguments, and submit them to the process pool - executor_dict = {} + # Loop through all hucs, build the arguments, and submit them to the process pool + futures = {} for huc in hucs: - arg_keeper = {'fim_dir': fim_dir, 'huc': huc, 'bathy_file': bathy_file, 'verbose': verbose} - future = executor.submit(correct_rating_for_bathymetry, **arg_keeper) - executor_dict[future] = huc - - # Send the executor to the progress bar and wait for all tasks to finish - progress_bar_handler(executor_dict, True, f"Running BARC on {len(hucs)} HUCs") - # Get the returned logs and write to the log file - for future in executor_dict.keys(): - try: - log_file.write(future.result()) - except Exception as ex: - print(f"WARNING: {executor_dict[future]} BARC failed for some reason") - log_file.write(f"ERROR --> {executor_dict[future]} BARC failed (details: *** {ex} )\n") - traceback.print_exc(file=log_file) + args = { + 'fim_dir': fim_dir, + 'huc': huc, + 'strm_order': strm_order, + 'bathy_file_ehydro': bathy_file_ehydro, + 'bathy_file_aibased': bathy_file_aibased, + 'verbose': verbose, + } + future = executor.submit(apply_src_adjustment_for_bathymetry, **args) + futures[future] = future + + for future in as_completed(futures): + if future is not None: + if not future.exception(): + failed_huc = future.result() + if failed_huc != "": + failed_HUCs_list.append(failed_huc) + else: + raise future.exception() ## Record run time and close log file end_time = dt.datetime.now() @@ -210,15 +415,22 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb if __name__ == '__main__': + """ Parameters ---------- fim_dir : str Directory path for fim_pipeline output. Log file will be placed in fim_dir/logs/bathymetric_adjustment.log. - bathy_file : str - Path to bathymetric adjustment geopackage, e.g. + strm_order : int + stream order on or higher for which you want to apply AI-based bathymetry data. + default = 6 + bathy_file_ehydro : str + Path to eHydro bathymetric adjustment geopackage, e.g. "/data/inputs/bathymetry/bathymetry_adjustment_data.gpkg". + bathy_file_aibased : str + Path to AI-based bathymetric adjustment file, e.g. + "/data/inputs/bathymetry/ml_outputs_v1.01.parquet". wbd_buffer : int Distance in meters to buffer wbd dataset when searching for relevant HUCs. wbd : str @@ -233,17 +445,33 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb Sample Usage ---------- - python3 /foss_fim/src/bathymetric_adjustment.py -fim_dir /outputs/fim_run_dir -bathy /data/inputs/bathymetry/bathymetry_adjustment_data.gpkg + python3 /foss_fim/src/bathymetric_adjustment.py -fim_dir /outputs/fim_run_dir + -bathy_eHydro /data/inputs/bathymetry/bathymetric_adjustment_data.gpkg + -bathy_aibased /data/inputs/bathymetry/ml_outputs_v1.01.parquet -buffer 5000 -wbd /data/inputs/wbd/WBD_National_EPSG_5070_WBDHU8_clip_dem_domain.gpkg -j $jobLimit - """ parser = ArgumentParser(description="Bathymetric Adjustment") parser.add_argument('-fim_dir', '--fim-dir', help='FIM output dir', required=True, type=str) parser.add_argument( - '-bathy', - '--bathy_file', - help="Path to geopackage with preprocessed bathymetic data", + '-sor', + '--strm_order', + help="stream order on or higher for which AI-based bathymetry data is applied", + default=6, + required=False, + type=int, + ) + parser.add_argument( + '-bathy_ehydro', + '--bathy_file_ehydro', + help="Path to geopackage with preprocessed eHydro bathymetic data", + required=True, + type=str, + ) + parser.add_argument( + '-bathy_aibased', + '--bathy_file_aibased', + help="Path to parquet file with preprocessed AI-based bathymetic data", required=True, type=str, ) @@ -289,11 +517,23 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb args = vars(parser.parse_args()) fim_dir = args['fim_dir'] - bathy_file = args['bathy_file'] + strm_order = args['strm_order'] + bathy_file_ehydro = args['bathy_file_ehydro'] + bathy_file_aibased = args['bathy_file_aibased'] wbd_buffer = int(args['wbd_buffer']) wbd = args['wbd'] output_suffix = args['output_suffix'] number_of_jobs = args['number_of_jobs'] verbose = bool(args['verbose']) - multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, number_of_jobs, verbose) + multi_process_hucs( + fim_dir, + strm_order, + bathy_file_ehydro, + bathy_file_aibased, + wbd_buffer, + wbd, + output_suffix, + number_of_jobs, + verbose, + ) From 33bebc6939a927dfa84f0fbe56e752604d340978 Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Tue, 18 Jun 2024 14:39:30 -0700 Subject: [PATCH 02/24] Updated CHANGELOG.md --- docs/CHANGELOG.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d4f4011f4..c5f70aa57 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,23 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. +## v4.5.x.x - 2024-06-18 - [PR#1195](https://github.com/NOAA-OWP/inundation-mapping/pull/1195) + +It focuses on adjusting rating curves by using bathymetric data. The bathymetry data includes eHydro surveys and AI-based datasets created for all NWM streams. + +### Changes +- `src/bathymetric-adjustment.py`: 'correct_rating_for_ai_based_bathymetry" function was added to the script. This function processes AI-based bathymetry data and adjusts rating curves using this data. Also "apply_src_adjustment_for_bathymetry' function was added to prioritize USACE eHydro over AI-based bathymetry dataset. Multi-processing function "multi_process_hucs" was also updated based on the latest code. + +- `src/bash_variables.env`: New variables and their paths were added. + +- `fim_post_processing.sh`: New arguments were added. + +### Testing +This PR has been tested on 11 HUC8s around the Illinois River and Ohio River. + +

+ + ## v4.5.2.3 - 2024-06-14 - [PR#1169](https://github.com/NOAA-OWP/inundation-mapping/pull/1169) This tool scans all log directory looking for the word "error" (not case-sensitive). This is primary added to help find errors in the post processing logs such as src_optimization folder (and others). From 442c3c9b03ad5f053982140e48cf405686c483fa Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Thu, 20 Jun 2024 09:25:13 -0700 Subject: [PATCH 03/24] Updated default stream order to 7 --- src/bathymetric_adjustment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index 0787a700c..8a442403b 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -457,7 +457,7 @@ def multi_process_hucs( '-sor', '--strm_order', help="stream order on or higher for which AI-based bathymetry data is applied", - default=6, + default=7, required=False, type=int, ) From 9be6abca58edff5c3f47c162e0f00cfc4b5e0775 Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Thu, 20 Jun 2024 09:26:44 -0700 Subject: [PATCH 04/24] Some note added --- src/bathymetric_adjustment.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index 8a442403b..dbce9ed08 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -138,7 +138,7 @@ def correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_ HUC-8 string. strm_order : int stream order on or higher for which you want to apply AI-based bathymetry data. - default = 6 + default = 7 bathy_file_aibased : str Path to AI-based bathymetric adjustment file, e.g. "/data/inputs/bathymetry/ml_outputs_v1.01.parquet". @@ -328,7 +328,7 @@ def multi_process_hucs( Directory path for fim_pipeline output. strm_order : int stream order on or higher for which you want to apply AI-based bathymetry data. - default = 6 + default = 7 bathy_file_eHydro : str Path to eHydro bathymetric adjustment geopackage, e.g. "/data/inputs/bathymetry/bathymetry_adjustment_data.gpkg". @@ -424,7 +424,7 @@ def multi_process_hucs( fim_dir/logs/bathymetric_adjustment.log. strm_order : int stream order on or higher for which you want to apply AI-based bathymetry data. - default = 6 + default = 7 bathy_file_ehydro : str Path to eHydro bathymetric adjustment geopackage, e.g. "/data/inputs/bathymetry/bathymetry_adjustment_data.gpkg". From 6e82e436def0930545e038d5401faf2c6aa6db85 Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Fri, 21 Jun 2024 11:45:48 -0700 Subject: [PATCH 05/24] A few updates --- src/bathymetric_adjustment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index dbce9ed08..5e80d6fdc 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -385,7 +385,7 @@ def multi_process_hucs( with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: # Loop through all hucs, build the arguments, and submit them to the process pool futures = {} - for huc in hucs: + for huc in fim_hucs: args = { 'fim_dir': fim_dir, 'huc': huc, From 01af5252c782265479e17eca9d96ae2c843508a6 Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Fri, 21 Jun 2024 11:48:50 -0700 Subject: [PATCH 06/24] Update bash_variables.env --- src/bash_variables.env | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bash_variables.env b/src/bash_variables.env index 2e2ce911d..9ef5a67d4 100644 --- a/src/bash_variables.env +++ b/src/bash_variables.env @@ -29,8 +29,8 @@ export input_WBD_gdb=${inputsDir}/wbd/WBD_National_EPSG export input_WBD_gdb_Alaska=${inputsDir}/wbd/WBD_National_South_Alaska.gpkg export input_calib_points_dir=${inputsDir}/rating_curve/water_edge_database/calibration_points/ export pre_clip_huc_dir=${inputsDir}/pre_clip_huc8/240502 -export bathymetry_file_ehydro=${inputsDir}/bathymetry/bathymetric_adjustment_data.gpkg -export bathymetry_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet +export bathy_file_ehydro=${inputsDir}/bathymetry/bathymetric_adjustment_data.gpkg +export bathy_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet export osm_bridges=${inputsDir}/osm/bridges/240426/osm_all_bridges.gpkg # input file location with nwm feature_id and recurrence flow values From 22a0afa62b09ff7a47680693d2a91cfe69a32ca1 Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Fri, 21 Jun 2024 11:50:22 -0700 Subject: [PATCH 07/24] Update fim_post_processing.sh --- fim_post_processing.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fim_post_processing.sh b/fim_post_processing.sh index ae3307e7f..9e29654f7 100755 --- a/fim_post_processing.sh +++ b/fim_post_processing.sh @@ -112,8 +112,8 @@ if [ "$bathymetry_adjust" = "True" ]; then Tstart python3 $srcDir/bathymetric_adjustment.py \ -fim_dir $outputDestDir \ - -bathy_ehydro $bathymetry_file_ehydro \ - -bathy_aibased $bathymetry_file_aibased \ + -bathy_ehydro $bathy_file_ehydro \ + -bathy_aibased $bathy_file_aibased \ -buffer $wbd_buffer \ -wbd $inputsDir/wbd/WBD_National_EPSG_5070_WBDHU8_clip_dem_domain.gpkg \ -j $jobLimit From 6d2317fc4bfe88d10db9c4059185ac1b16992aa2 Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Fri, 21 Jun 2024 11:52:19 -0700 Subject: [PATCH 08/24] Update params_template.env --- config/params_template.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/params_template.env b/config/params_template.env index 8312bea92..7dcc07469 100644 --- a/config/params_template.env +++ b/config/params_template.env @@ -36,7 +36,7 @@ export levee_id_attribute=SYSTEM_ID export healed_hand_hydrocondition=true #### apply bathymetry adjustment to rating curve #### -export bathymetry_adjust=True +export bathymetry_adjust="True" #### estimating bankfull stage in SRCs #### # Toggle to run identify_bankfull routine (True=on; False=off) From 41db794aed803792c8ca8727075570002807388a Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Sun, 23 Jun 2024 12:10:07 -0700 Subject: [PATCH 09/24] Added notes --- src/bathymetric_adjustment.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index 5e80d6fdc..110199691 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -381,6 +381,8 @@ def multi_process_hucs( log_file.write(f"Identified {len(hucs)} HUCs that have USACE eHydro bathymetric data: {hucs}\n") print(f"Identified {len(hucs)} HUCs that have USACE eHydro bathymetric data\n") + print(f"AI-Based bathymetry data is applied on streams with order {strm_order} or higher\n") + failed_HUCs_list = [] with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: # Loop through all hucs, build the arguments, and submit them to the process pool From 359fb357d0c4af0fbd3e594ac0b86d069f984d88 Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Tue, 25 Jun 2024 14:52:13 -0700 Subject: [PATCH 10/24] A few updates --- src/bathymetric_adjustment.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index 110199691..983b4d12e 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -378,6 +378,7 @@ def multi_process_hucs( ) # HUCs that could also have bathymetric reaches included hucs_with_bathy = wbd.HUC8.to_list() hucs = [h for h in fim_hucs if h in hucs_with_bathy] + hucs.sort() log_file.write(f"Identified {len(hucs)} HUCs that have USACE eHydro bathymetric data: {hucs}\n") print(f"Identified {len(hucs)} HUCs that have USACE eHydro bathymetric data\n") From 08cbdb6aa01b214f8ed8c6abe3b5c0ada084a759 Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Tue, 25 Jun 2024 15:06:24 -0700 Subject: [PATCH 11/24] log_file updated --- src/bathymetric_adjustment.py | 92 ++++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 33 deletions(-) diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index 983b4d12e..8019d0cac 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -4,6 +4,7 @@ import os import re import sys +import traceback from argparse import ArgumentParser from concurrent.futures import ProcessPoolExecutor, as_completed from os.path import join @@ -276,11 +277,13 @@ def correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_ # ------------------------------------------------------- # Apply src_adjustment_for_bathymetry def apply_src_adjustment_for_bathymetry( - fim_dir, huc, strm_order, bathy_file_ehydro, bathy_file_aibased, verbose + fim_dir, huc, strm_order, bathy_file_ehydro, bathy_file_aibased, verbose, log_file_path ): """ Function for applying both eHydro & AI-based bathymetry adjustment to synthetic rating curves. + Note: Any failure in here will be logged when it can be but will not abort the Multi-Proc + Parameters ---------- Please refer to correct_rating_for_ehydro_bathymetry and @@ -290,25 +293,47 @@ def apply_src_adjustment_for_bathymetry( ---------- log_text : str """ + log_text = "" +try: if os.path.exists(bathy_file_ehydro): + msg = f"correcting rating curve for ehydro bathy for huc : {huc}" + log_text += msg + '\n' + print(msg + '\n') + log_text += correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file_ehydro, verbose) + else: + print(f'USACE eHydro bathymetry file does not exist for huc: {huc}') - correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file_ehydro, verbose) + except Exception: + log_text += f"An error has occurred while processing ehydro bathy for huc {huc}" + log_text += traceback.format_exc() - else: - print('USACE eHydro bathymetry file does not exist') +try: + with open(log_file_path, "a") as log_file: + log_file.write(log_text + '\n') +except Exception: + print(f"Error trying to write to the log file of {log_file_path}") +log_text = "" +try: if os.path.exists(bathy_file_aibased): + msg = f"correcting rating curve for ai based bathy for huc : {huc}" + log_text += msg + '\n' + print(msg + '\n') - correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_aibased, verbose) - + log_text += correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_aibased, verbose) else: - print('AI-based bathymetry file does not exist') + print(f'AI-based bathymetry file does not exist for huc : {huc}') + +except Exception: + log_text += f"An error has occurred while processing ehydro bathy for huc {huc}" + log_text += traceback.format_exc() - # return log_text +with open(log_file_path, "a") as log_file: + log_file.write(log_text + '\n') # ------------------------------------------------------- -def multi_process_hucs( +def process_bathy_adjustment( fim_dir, strm_order, bathy_file_ehydro, @@ -349,23 +374,25 @@ def multi_process_hucs( """ # Set up log file - print( - 'Writing progress to log file here: ' - + str(join(fim_dir, 'logs', 'bathymetric_adjustment' + output_suffix + '.log')) - ) + log_file_path = os.path.join(fim_dir, 'logs', 'bathymetric_adjustment' + output_suffix + '.log') + print(f'Writing progress to log file here: {log_file_path}') print('This may take a few minutes...') ## Create a time var to log run time - begin_time = dt.datetime.now() + begin_time = dt.datetime.now(dt.timezone.utc) ## Initiate log file - log_file = open(join(fim_dir, 'logs', 'bathymetric_adjustment' + output_suffix + '.log'), "w") - log_file.write('START TIME: ' + str(begin_time) + '\n') - log_file.write('#########################################################\n\n') + with open(log_file_path, "w") as log_file: + log_file.write('START TIME: ' + str(begin_time) + '\n') + log_file.write('#########################################################\n\n') + + # Let log_text build up starting here until the bottom. + log_text = "" # Exit program if the bathymetric data doesn't exist if not all([os.path.exists(bathy_file_ehydro), os.path.exists(bathy_file_aibased)]): statement = f'The input bathymetry files {bathy_file_ehydro} and {bathy_file_aibased} do not exist. Exiting...' - log_file.write(statement) + with open(log_file_path, "w") as log_file: + log_file.write(statement) print(statement) sys.exit(0) @@ -379,12 +406,14 @@ def multi_process_hucs( hucs_with_bathy = wbd.HUC8.to_list() hucs = [h for h in fim_hucs if h in hucs_with_bathy] hucs.sort() - log_file.write(f"Identified {len(hucs)} HUCs that have USACE eHydro bathymetric data: {hucs}\n") - print(f"Identified {len(hucs)} HUCs that have USACE eHydro bathymetric data\n") - - print(f"AI-Based bathymetry data is applied on streams with order {strm_order} or higher\n") - - failed_HUCs_list = [] + msg = f"Identified {len(hucs)} HUCs that have USACE eHydro bathymetric data: {hucs}\n" + log_text += msg + print(msg) + + msg = f"AI-Based bathymetry data is applied on streams with order {strm_order} or higher\n" + log_text += msg + print(msg) + with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: # Loop through all hucs, build the arguments, and submit them to the process pool futures = {} @@ -396,24 +425,21 @@ def multi_process_hucs( 'bathy_file_ehydro': bathy_file_ehydro, 'bathy_file_aibased': bathy_file_aibased, 'verbose': verbose, + 'log_file_path': log_file_path, } future = executor.submit(apply_src_adjustment_for_bathymetry, **args) futures[future] = future for future in as_completed(futures): if future is not None: - if not future.exception(): - failed_huc = future.result() - if failed_huc != "": - failed_HUCs_list.append(failed_huc) - else: + if future.exception(): raise future.exception() ## Record run time and close log file - end_time = dt.datetime.now() - log_file.write('END TIME: ' + str(end_time) + '\n') + end_time = dt.datetime.now(dt.timezone.utc) + log_text +=('END TIME: ' + str(end_time) + '\n') tot_run_time = end_time - begin_time - log_file.write('TOTAL RUN TIME: ' + str(tot_run_time)) + log_text += ('TOTAL RUN TIME: ' + str(tot_run_time).split('.')[0]) log_file.close() @@ -529,7 +555,7 @@ def multi_process_hucs( number_of_jobs = args['number_of_jobs'] verbose = bool(args['verbose']) - multi_process_hucs( + process_bathy_adjustment( fim_dir, strm_order, bathy_file_ehydro, From 3818589cc30490c573ae0a13332b759d4a332493 Mon Sep 17 00:00:00 2001 From: hhs732 Date: Tue, 25 Jun 2024 23:02:58 +0000 Subject: [PATCH 12/24] litting done --- src/bathymetric_adjustment.py | 58 +++++++++++++++++------------------ 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index 8019d0cac..157a2b163 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -294,42 +294,42 @@ def apply_src_adjustment_for_bathymetry( log_text : str """ log_text = "" -try: - if os.path.exists(bathy_file_ehydro): - msg = f"correcting rating curve for ehydro bathy for huc : {huc}" - log_text += msg + '\n' - print(msg + '\n') - log_text += correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file_ehydro, verbose) - else: - print(f'USACE eHydro bathymetry file does not exist for huc: {huc}') + try: + if os.path.exists(bathy_file_ehydro): + msg = f"correcting rating curve for ehydro bathy for huc : {huc}" + log_text += msg + '\n' + print(msg + '\n') + log_text += correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file_ehydro, verbose) + else: + print(f'USACE eHydro bathymetry file does not exist for huc: {huc}') + + except Exception: + log_text += f"An error has occurred while processing ehydro bathy for huc {huc}" + log_text += traceback.format_exc() + + try: + with open(log_file_path, "a") as log_file: + log_file.write(log_text + '\n') + except Exception: + print(f"Error trying to write to the log file of {log_file_path}") + + log_text = "" + try: + if os.path.exists(bathy_file_aibased): + msg = f"correcting rating curve for ai based bathy for huc : {huc}" + log_text += msg + '\n' + print(msg + '\n') + + log_text += correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_aibased, verbose) + else: + print(f'AI-based bathymetry file does not exist for huc : {huc}') except Exception: log_text += f"An error has occurred while processing ehydro bathy for huc {huc}" log_text += traceback.format_exc() -try: with open(log_file_path, "a") as log_file: log_file.write(log_text + '\n') -except Exception: - print(f"Error trying to write to the log file of {log_file_path}") - -log_text = "" -try: - if os.path.exists(bathy_file_aibased): - msg = f"correcting rating curve for ai based bathy for huc : {huc}" - log_text += msg + '\n' - print(msg + '\n') - - log_text += correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_aibased, verbose) - else: - print(f'AI-based bathymetry file does not exist for huc : {huc}') - -except Exception: - log_text += f"An error has occurred while processing ehydro bathy for huc {huc}" - log_text += traceback.format_exc() - -with open(log_file_path, "a") as log_file: - log_file.write(log_text + '\n') # ------------------------------------------------------- From e87772892b1cb5ad6c5d67db8f3c75711eda35e1 Mon Sep 17 00:00:00 2001 From: hhs732 Date: Tue, 25 Jun 2024 23:11:02 +0000 Subject: [PATCH 13/24] redone pre-commit --- src/bathymetric_adjustment.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index 157a2b163..97ac4f926 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -274,7 +274,7 @@ def correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_ return log_text -# ------------------------------------------------------- +# -------------------------------------------------------- # Apply src_adjustment_for_bathymetry def apply_src_adjustment_for_bathymetry( fim_dir, huc, strm_order, bathy_file_ehydro, bathy_file_aibased, verbose, log_file_path @@ -320,7 +320,9 @@ def apply_src_adjustment_for_bathymetry( log_text += msg + '\n' print(msg + '\n') - log_text += correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_aibased, verbose) + log_text += correct_rating_for_ai_based_bathymetry( + fim_dir, huc, strm_order, bathy_file_aibased, verbose + ) else: print(f'AI-based bathymetry file does not exist for huc : {huc}') @@ -413,7 +415,7 @@ def process_bathy_adjustment( msg = f"AI-Based bathymetry data is applied on streams with order {strm_order} or higher\n" log_text += msg print(msg) - + with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: # Loop through all hucs, build the arguments, and submit them to the process pool futures = {} @@ -436,10 +438,10 @@ def process_bathy_adjustment( raise future.exception() ## Record run time and close log file - end_time = dt.datetime.now(dt.timezone.utc) - log_text +=('END TIME: ' + str(end_time) + '\n') + end_time = dt.datetime.now(dt.timezone.utc) + log_text += 'END TIME: ' + str(end_time) + '\n' tot_run_time = end_time - begin_time - log_text += ('TOTAL RUN TIME: ' + str(tot_run_time).split('.')[0]) + log_text += 'TOTAL RUN TIME: ' + str(tot_run_time).split('.')[0] log_file.close() From a605e08b4cdce09a9a03cd53c33e2042b9997c3e Mon Sep 17 00:00:00 2001 From: hhs732 Date: Tue, 25 Jun 2024 23:13:44 +0000 Subject: [PATCH 14/24] small changes --- src/bathymetric_adjustment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index 97ac4f926..74d2c6b49 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -298,7 +298,7 @@ def apply_src_adjustment_for_bathymetry( if os.path.exists(bathy_file_ehydro): msg = f"correcting rating curve for ehydro bathy for huc : {huc}" log_text += msg + '\n' - print(msg + '\n') + print(msg) log_text += correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file_ehydro, verbose) else: print(f'USACE eHydro bathymetry file does not exist for huc: {huc}') From 8f4805c8f437b49f0f504e5923c386183ad2bcee Mon Sep 17 00:00:00 2001 From: hhs732 Date: Wed, 26 Jun 2024 23:04:23 +0000 Subject: [PATCH 15/24] Duplicates in AI bathy data error fixed --- src/bathymetric_adjustment.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index 74d2c6b49..d6fff27f1 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -187,9 +187,12 @@ def correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_ aib_bathy_data_df["order_"] < strm_order, ["missing_xs_area_m2", "missing_wet_perimeter_m", "Bathymetry_source"], ] = 0 - aib_df = aib_bathy_data_df[ + aib_df0 = aib_bathy_data_df[ ['feature_id', 'missing_xs_area_m2', 'missing_wet_perimeter_m', 'Bathymetry_source'] ] + #test = aib_df[aib_df.duplicated(subset='feature_id', keep=False)] + aib_df = aib_df0.drop_duplicates(subset=['feature_id'], keep = 'first') + aib_df.index = range(len(aib_df)) # Get src_full from each branch src_all_branches_path = [] @@ -211,8 +214,8 @@ def correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_ # Merge in missing bathy data and fill Nans if "missing_xs_area_m2" not in src_df.columns: src_df.drop(columns=["Bathymetry_source"], inplace=True) - src_df = src_df.merge(aib_df, on='feature_id', how='left', validate='many_to_one') - # print([src,src_df.columns]) + src_df = src_df.merge(aib_df, on='feature_id', how='left', validate='many_to_one') + # print([src,src_df.columns]) src_df.Bathymetry_source else: src_df = pd.read_csv(src, low_memory=False) src_df = src_df.merge(aib_df, on='feature_id', how='left', validate='many_to_one') @@ -316,7 +319,7 @@ def apply_src_adjustment_for_bathymetry( log_text = "" try: if os.path.exists(bathy_file_aibased): - msg = f"correcting rating curve for ai based bathy for huc : {huc}" + msg = f"correcting rating curve for AI-based bathy for huc : {huc}" log_text += msg + '\n' print(msg + '\n') @@ -327,7 +330,7 @@ def apply_src_adjustment_for_bathymetry( print(f'AI-based bathymetry file does not exist for huc : {huc}') except Exception: - log_text += f"An error has occurred while processing ehydro bathy for huc {huc}" + log_text += f"An error has occurred while processing AI-based bathy for huc {huc}" log_text += traceback.format_exc() with open(log_file_path, "a") as log_file: From a06e3ca3d612bfc1926e7a64154c7b4bdc13550b Mon Sep 17 00:00:00 2001 From: hhs732 Date: Thu, 27 Jun 2024 02:11:34 +0000 Subject: [PATCH 16/24] lintting applied --- src/bathymetric_adjustment.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index d6fff27f1..75b6425c7 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -190,8 +190,8 @@ def correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_ aib_df0 = aib_bathy_data_df[ ['feature_id', 'missing_xs_area_m2', 'missing_wet_perimeter_m', 'Bathymetry_source'] ] - #test = aib_df[aib_df.duplicated(subset='feature_id', keep=False)] - aib_df = aib_df0.drop_duplicates(subset=['feature_id'], keep = 'first') + # test = aib_df[aib_df.duplicated(subset='feature_id', keep=False)] + aib_df = aib_df0.drop_duplicates(subset=['feature_id'], keep='first') aib_df.index = range(len(aib_df)) # Get src_full from each branch @@ -214,7 +214,7 @@ def correct_rating_for_ai_based_bathymetry(fim_dir, huc, strm_order, bathy_file_ # Merge in missing bathy data and fill Nans if "missing_xs_area_m2" not in src_df.columns: src_df.drop(columns=["Bathymetry_source"], inplace=True) - src_df = src_df.merge(aib_df, on='feature_id', how='left', validate='many_to_one') + src_df = src_df.merge(aib_df, on='feature_id', how='left', validate='many_to_one') # print([src,src_df.columns]) src_df.Bathymetry_source else: src_df = pd.read_csv(src, low_memory=False) @@ -299,7 +299,7 @@ def apply_src_adjustment_for_bathymetry( log_text = "" try: if os.path.exists(bathy_file_ehydro): - msg = f"correcting rating curve for ehydro bathy for huc : {huc}" + msg = f"Correcting rating curve for ehydro bathy for huc : {huc}" log_text += msg + '\n' print(msg) log_text += correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file_ehydro, verbose) @@ -319,7 +319,7 @@ def apply_src_adjustment_for_bathymetry( log_text = "" try: if os.path.exists(bathy_file_aibased): - msg = f"correcting rating curve for AI-based bathy for huc : {huc}" + msg = f"Correcting rating curve for AI-based bathy for huc : {huc}" log_text += msg + '\n' print(msg + '\n') From b3eb46816a2c75dc72aaef96da15a28191d70203 Mon Sep 17 00:00:00 2001 From: Rob Hanna Date: Wed, 3 Jul 2024 15:12:29 +0000 Subject: [PATCH 17/24] quick fix to error scan --- fim_post_processing.sh | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/fim_post_processing.sh b/fim_post_processing.sh index 9e29654f7..0155356ff 100755 --- a/fim_post_processing.sh +++ b/fim_post_processing.sh @@ -219,8 +219,14 @@ python3 $toolsDir/combine_crosswalk_tables.py \ Tcount date -u +echo -e $startDiv"Resetting Permissions" find $outputDestDir -type d -exec chmod -R 777 {} + -find $outputDestDir -type f -exec chmod -R 777 {} + +find $outputDestDir -type f -exec chmod 777 {} + # just root level files + +echo +echo -e $startDiv"Scanning logs for errors. Results be saved in root not inside the log folder." +# grep -H -r -i -n "error" $outputDestDir/logs/ > $outputDestDir/all_errors_from_logs.log +find $outputDestDir -type f | grep -H -r -i -n "error" $outputDestDir/logs/ > $outputDestDir/all_errors_from_logs.log & echo echo "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++" From 6a662aedde362a0153488d454f6e0cfe1987baee Mon Sep 17 00:00:00 2001 From: hhs732 Date: Thu, 18 Jul 2024 21:22:58 +0000 Subject: [PATCH 18/24] updated with the latest dev --- config/params_template.env | 4 + data/wbd/clip_vectors_to_wbd.py | 50 +- data/wbd/generate_pre_clip_fim_huc8.py | 210 ++++---- docs/CHANGELOG.md | 44 +- fim_post_processing.sh | 26 +- src/add_crosswalk.py | 189 +++----- src/bash_variables.env | 8 +- src/bathymetric_adjustment.py | 33 ++ src/manningN_optimization.py | 643 +++++++++++++++++++++++++ src/src_adjust_ras2fim_rating.py | 5 +- src/src_adjust_usgs_rating_trace.py | 5 +- tools/bridge_inundation.py | 153 ++++++ tools/rating_curve_comparison.py | 4 +- tools/run_test_case_mannN_optz.py | 540 +++++++++++++++++++++ 14 files changed, 1655 insertions(+), 259 deletions(-) create mode 100644 src/manningN_optimization.py create mode 100644 tools/bridge_inundation.py create mode 100644 tools/run_test_case_mannN_optz.py diff --git a/config/params_template.env b/config/params_template.env index 7dcc07469..2f4d830d7 100644 --- a/config/params_template.env +++ b/config/params_template.env @@ -48,6 +48,10 @@ export src_subdiv_toggle="True" # text to append to output log and hydrotable file names (use for testing/debugging) export vrough_suffix="" +#### applying manningN optimization to SRCs #### +# Toggle to run optimized roughness src routine (True=on; False=off) +export src_optz_manningN_toggle="False" + #### SRC calibration variables #### apply SRC adjustments using USGS rating curve database #### # Toggle to run src adjustment routine with USGS rating data (True=on; False=off) diff --git a/data/wbd/clip_vectors_to_wbd.py b/data/wbd/clip_vectors_to_wbd.py index 048539781..8cb8603e8 100755 --- a/data/wbd/clip_vectors_to_wbd.py +++ b/data/wbd/clip_vectors_to_wbd.py @@ -2,6 +2,7 @@ import argparse import logging +import os import sys import geopandas as gpd @@ -21,18 +22,26 @@ def extend_outlet_streams(streams, wbd_buffered, wbd): Extend outlet streams to nearest buffered WBD boundary """ - wbd['geometry'] = wbd.geometry.boundary - wbd = gpd.GeoDataFrame(data=wbd, geometry='geometry') - - wbd_buffered["linegeom"] = wbd_buffered.geometry - # Select only the streams that are outlets levelpath_outlets = streams[streams['to'] == 0] - # Select only the streams that don't intersect the WBD boundary line - levelpath_outlets = levelpath_outlets[~levelpath_outlets.intersects(wbd['geometry'].iloc[0])] + levelpath_outlets_columns = [x for x in levelpath_outlets.columns] + + # Select streams that intersect the WBD but not the WBD buffer + levelpath_outlets = levelpath_outlets.sjoin(wbd)[levelpath_outlets_columns] + + wbd_boundary = wbd.copy() + wbd_boundary['geometry'] = wbd_boundary.geometry.boundary + wbd_boundary = gpd.GeoDataFrame(data=wbd_boundary, geometry='geometry') + + wbd_buffered["linegeom"] = wbd_buffered.geometry + + levelpath_outlets = levelpath_outlets[ + ~levelpath_outlets.intersects(wbd_buffered["linegeom"].boundary.iloc[0]) + ] levelpath_outlets['nearest_point'] = None + levelpath_outlets['nearest_point_wbd'] = None levelpath_outlets['last'] = None levelpath_outlets = levelpath_outlets.explode(index_parts=False) @@ -45,22 +54,36 @@ def extend_outlet_streams(streams, wbd_buffered, wbd): wbd_buffered['geometry'] = wbd_buffered.geometry.boundary wbd_buffered = gpd.GeoDataFrame(data=wbd_buffered, geometry='geometry') + errors = 0 for index, row in levelpath_outlets.iterrows(): levelpath_geom = row['last'] nearest_point = nearest_points(levelpath_geom, wbd_buffered) + nearest_point_wbd = nearest_points(levelpath_geom, wbd_boundary.geometry) levelpath_outlets.at[index, 'nearest_point'] = nearest_point[1]['geometry'].iloc[0] + levelpath_outlets.at[index, 'nearest_point_wbd'] = nearest_point_wbd[1].iloc[0] levelpath_outlets_nearest_points = levelpath_outlets.at[index, 'nearest_point'] + levelpath_outlets_nearest_points_wbd = levelpath_outlets.at[index, 'nearest_point_wbd'] + if isinstance(levelpath_outlets_nearest_points, pd.Series): levelpath_outlets_nearest_points = levelpath_outlets_nearest_points.iloc[-1] - - levelpath_outlets.at[index, 'geometry'] = LineString( - list(row['geometry'].coords) + list([levelpath_outlets_nearest_points.coords[0]]) - ) + if isinstance(levelpath_outlets_nearest_points_wbd, pd.Series): + levelpath_outlets_nearest_points_wbd = levelpath_outlets_nearest_points_wbd.iloc[-1] + + # Extend outlet stream if outlet point is outside of the WBD or nearest snap point is within 100m of the WBD boundary + outlet_point = Point(row['geometry'].coords[-1]) + if (outlet_point.distance(levelpath_outlets_nearest_points_wbd) < 100) or ( + ~outlet_point.intersects(wbd.geometry)[0] + ): + levelpath_outlets.at[index, 'geometry'] = LineString( + list(row['geometry'].coords) + list([levelpath_outlets_nearest_points.coords[0]]) + ) + else: + errors += 1 levelpath_outlets = gpd.GeoDataFrame(data=levelpath_outlets, geometry='geometry') - levelpath_outlets = levelpath_outlets.drop(columns=['last', 'nearest_point']) + levelpath_outlets = levelpath_outlets.drop(columns=['last', 'nearest_point', 'nearest_point_wbd']) # Replace the streams in the original file with the extended streams streams = streams[~streams['ID'].isin(levelpath_outlets['ID'])] @@ -116,6 +139,7 @@ def subset_vector_layers( logging.info(f"Clip ocean water polygon for {hucCode}") landsea = gpd.read_file(landsea, mask=wbd_buffer, engine="fiona") if not landsea.empty: + os.makedirs(os.path.dirname(subset_landsea), exist_ok=True) # print(f"Create landsea gpkg for {hucCode}", flush=True) landsea.to_file( subset_landsea, driver=getDriver(subset_landsea), index=False, crs=huc_CRS, engine="fiona" @@ -356,6 +380,8 @@ def subset_vector_layers( '-lps', '--subset-levee-protected-areas', help='Levee-protected areas subset', required=True ) parser.add_argument('-osm', '--osm-bridges', help='Open Street Maps gkpg', required=True) + parser.add_argument('-osms', '--subset-osm-bridges', help='Open Street Maps subset', required=True) + parser.add_argument('-ak', '--is-alaska', help='If in Alaska', action='store_true') parser.add_argument('-crs', '--huc-CRS', help='HUC crs', required=True) args = vars(parser.parse_args()) diff --git a/data/wbd/generate_pre_clip_fim_huc8.py b/data/wbd/generate_pre_clip_fim_huc8.py index 8a34e6837..22e3a6022 100755 --- a/data/wbd/generate_pre_clip_fim_huc8.py +++ b/data/wbd/generate_pre_clip_fim_huc8.py @@ -41,6 +41,8 @@ src/bash_variables.env to the corresponding outputs_dir argument after running and testing this script. The newly generated data should be created in a new folder using the format (i.e. September 26, 2023 would be 23_09_26) + + Don't worry about error logs saying "Failed to auto identify EPSG: 7" ''' srcDir = os.getenv('srcDir') @@ -89,77 +91,66 @@ wbd_buffer = os.getenv('wbd_buffer') wbd_buffer_int = int(wbd_buffer) +LOG_FILE_PATH = "" + -def __setup_logger(outputs_dir, huc=None): +def __setup_logger(outputs_dir, huc=None, is_multi_proc=False): ''' Set up logging to file. Since log file includes the date, it will be overwritten if this script is run more than once on the same day. ''' + global LOG_FILE_PATH + datetime_now = dt.datetime.now(dt.timezone.utc) - curr_date = datetime_now.strftime("%y%my%d") + file_dt_string = datetime_now.strftime("%Y_%m_%d-%H_%M_%S") if huc is None: - log_file_name = f"generate_pre_clip_fim_huc8_{curr_date}.log" + log_file_name = f"generate_pre_clip_{file_dt_string}.log" else: - log_file_name = f"mp_{huc}_generate_pre_clip_fim_huc8_{curr_date}.log" + log_file_name = f"mp_{huc}_generate_pre_{file_dt_string}.log" - log_file_path = os.path.join(outputs_dir, log_file_name) + LOG_FILE_PATH = os.path.join(outputs_dir, log_file_name) if not os.path.exists(outputs_dir): os.mkdir(outputs_dir) - if os.path.exists(log_file_path): - os.remove(log_file_path) + if os.path.exists(LOG_FILE_PATH): + os.remove(LOG_FILE_PATH) - file_handler = logging.FileHandler(log_file_path) - file_handler.setLevel(logging.INFO) - console_handler = logging.StreamHandler() - console_handler.setLevel(logging.INFO) + # set up logging to file + logging.basicConfig( + filename=LOG_FILE_PATH, level=logging.INFO, format='[%(asctime)s] %(message)s', datefmt='%H:%M:%S' + ) - logger = logging.getLogger() - logger.addHandler(file_handler) - logger.setLevel(logging.INFO) + # If true, console text will be displayed twice, once from the parent and the other from the MP + if is_multi_proc is False: - # Print start time - start_time_string = datetime_now.strftime("%m/%d/%Y %H:%M:%S") - logging.info('==========================================================================') - logging.info("\n generate_pre_clip_fim_huc8.py") - logging.info(f"\n \t Started: {start_time_string} \n") + # set up logging to console + console = logging.StreamHandler() + console.setLevel(logging.DEBUG) + # set a format which is simpler for console use + # add the handler to the root logger + logging.getLogger('').addHandler(console) -def __merge_mp_logs(outputs_dir): - log_file_list = list(Path(outputs_dir).rglob("mp_*")) +def __merge_mp_logs(log_dir, parent_log_file): + # Locks for all logs starting with the phrase mp* + log_file_list = list(Path(log_dir).rglob("mp_*")) + if len(log_file_list) > 0: log_file_list.sort() + else: + print("No multi-proc files to merge") + return - log_mp_rollup_file = os.path.join(outputs_dir, "mp_merged_logs.log") - - error_huc_found = False - - with open(log_mp_rollup_file, 'a') as main_log: + with open(parent_log_file, 'a') as main_log: # Iterate through list for temp_log_file in log_file_list: # Open each file in read mode with open(temp_log_file) as infile: - contents = infile.read() - temp_upper_contents = contents.upper() - if "ERROR" in temp_upper_contents: - print( - f"\nAn error exist in file {temp_log_file}." - " Check the merge logs for that huc number" - ) - error_huc_found = True - main_log.write(contents) + main_log.write(infile.read()) os.remove(temp_log_file) - if error_huc_found: - print( - "\n\nOften you can just create a new huc list with the fails, re-run to a" - " 1temp directory and recheck if errors still exists. Sometimes multi-prod can create" - " contention errors.\nFor each HUC that is sucessful, you can just copy it back" - " into the original full pre-clip folder.\n" - ) - def pre_clip_hucs_from_wbd(outputs_dir, huc_list, number_of_jobs, overwrite): ''' @@ -212,6 +203,11 @@ def pre_clip_hucs_from_wbd(outputs_dir, huc_list, number_of_jobs, overwrite): __setup_logger(outputs_dir) start_time = dt.datetime.now(dt.timezone.utc) + logging.info("==================================") + logging.info(f"Starting Pre-clip based on huc list of\n {huc_list}") + logging.info(f"Start time: {start_time.strftime('%m/%d/%Y %H:%M:%S')}") + logging.info("") + # Iterate over the huc_list argument and create a directory for each huc. for huc in hucs_to_pre_clip_list: if os.path.isdir(os.path.join(outputs_dir, huc)): @@ -221,10 +217,6 @@ def pre_clip_hucs_from_wbd(outputs_dir, huc_list, number_of_jobs, overwrite): f"\n\t Output Directory: {outputs_dir}/{huc} exists. It will be overwritten, and the " f"newly generated huc level files will be output there. \n" ) - print( - f"\n\t Output Directory: {outputs_dir}/{huc} exists. It will be overwritten, and the " - f"newly generated huc level files will be output there. \n" - ) elif not os.path.isdir(os.path.join(outputs_dir, huc)): os.mkdir(os.path.join(outputs_dir, huc)) @@ -232,14 +224,13 @@ def pre_clip_hucs_from_wbd(outputs_dir, huc_list, number_of_jobs, overwrite): # Build arguments (procs_list) for each process to execute (huc_level_clip_vectors_to_wbd) procs_list = [] for huc in hucs_to_pre_clip_list: - print(f"Generating vectors for {huc}. ") + # logging.info(f"Generating vectors for {huc}. ") procs_list.append([huc, outputs_dir]) # procs_list.append([huc, outputs_dir, wbd_alaska_file]) # Parallelize each huc in hucs_to_parquet_list logging.info('Parallelizing HUC level wbd pre-clip vector creation. ') - print('Parallelizing HUC level wbd pre-clip vector creation. ') # with Pool(processes=number_of_jobs) as pool: # pool.map(huc_level_clip_vectors_to_wbd, procs_list) @@ -248,6 +239,7 @@ def pre_clip_hucs_from_wbd(outputs_dir, huc_list, number_of_jobs, overwrite): # The log files for each multi proc has tons and tons of duplicate lines crossing mp log # processes, but does always log correctly back to the parent log + failed_HUCs_list = [] # On return from the MP, if it returns a HUC number, that is a failed huc with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: futures = {} for huc in hucs_to_pre_clip_list: @@ -257,11 +249,32 @@ def pre_clip_hucs_from_wbd(outputs_dir, huc_list, number_of_jobs, overwrite): for future in as_completed(futures): if future is not None: - if future.exception(): + if not future.exception(): + failed_huc = future.result() + if failed_huc.lower() != "success" and failed_huc.isnumeric(): + failed_HUCs_list.append(failed_huc) + else: raise future.exception() print("Merging MP log files") - __merge_mp_logs(outputs_dir) + __merge_mp_logs(outputs_dir, LOG_FILE_PATH) + + if len(failed_HUCs_list) > 0: + logging.info("\n+++++++++++++++++++") + logging.info("HUCs that failed to proccess are: ") + huc_error_msg = " -- " + for huc in failed_HUCs_list: + huc_error_msg += f"{huc}, " + logging.info(huc_error_msg) + print(" See logs for more details on each HUC fail") + print( + "\n\nOften you can just create a new huc list with the fails, re-run to a" + " temp directory and recheck if errors still exists. Sometimes multi-prod can create" + " contention errors.\nFor each HUC that is sucessful, you can just copy it back" + " into the original full pre-clip folder.\n" + ) + + logging.info("+++++++++++++++++++") # Get time metrics end_time = dt.datetime.now(dt.timezone.utc) @@ -277,9 +290,6 @@ def pre_clip_hucs_from_wbd(outputs_dir, huc_list, number_of_jobs, overwrite): ) logging.info('==========================================================================') - print("\n\t Completed writing all huc level files \n") - print(f"\t \t TOTAL RUN TIME: {str(time_duration).split('.')[0]}") - def huc_level_clip_vectors_to_wbd(huc, outputs_dir): ''' @@ -300,13 +310,14 @@ def huc_level_clip_vectors_to_wbd(huc, outputs_dir): Creation of wbd8_clp.gpkg & wbd_buffered.gpkg using the executable: "ogr2ogr .... -clipsrc ..." Outputs: - - .gpkg files* dependant on HUC's WBD (*differing amount based on individual huc) + - .If successful, then it returns the word sudcess. If it fails, return the huc number. ''' + in_error = False huc_processing_start = dt.datetime.now(dt.timezone.utc) # with this in Multi-proc, it needs it's own logger and unique logging file. - __setup_logger(outputs_dir, huc) - logging.info(f"Processing {huc}") + __setup_logger(outputs_dir, huc, True) + logging.info(f"Start Processing {huc}") try: @@ -338,7 +349,7 @@ def huc_level_clip_vectors_to_wbd(huc, outputs_dir): else: input_LANDSEA = f"{inputsDir}/landsea/water_polygons_us.gpkg" - print(f"\n Get WBD {huc}") + logging.info(f"-- {huc} : Get WBD") # TODO: Use Python API (osgeo.ogr) instead of using ogr2ogr executable get_wbd_subprocess = subprocess.run( @@ -360,24 +371,22 @@ def huc_level_clip_vectors_to_wbd(huc, outputs_dir): universal_newlines=True, ) - logging.info(f"{huc} : {get_wbd_subprocess.stdout}") - - if get_wbd_subprocess.stderr != "": - if "ERROR" in get_wbd_subprocess.stderr.upper(): - msg = ( - f" - Creating -- {huc_directory}/wbd.gpkg" - f" ERROR -- details: ({get_wbd_subprocess.stderr})" - ) - print(msg) - logging.info(msg) + sub_proc_stdout = get_wbd_subprocess.stdout + sub_proc_stderror = get_wbd_subprocess.stderr + if sub_proc_stderror.lower().startswith("warning") is True: + logging.info(f"Warning from ogr creating wbd file: {sub_proc_stderror}") + elif sub_proc_stderror != "": + raise Exception( + f"*** {huc} : Error creating wbd file from ogr for {huc}:" f" Details: {sub_proc_stderror}\n" + ) else: - msg = f" - Creating -- {huc_directory}/wbd.gpkg - Complete \n" - print(msg) + msg = f"-- {huc} : Creating {huc_directory}/wbd.gpkg - Complete" + if sub_proc_stdout != "": # warnings? + msg += f": {sub_proc_stdout}" + logging.info(msg) - msg = f"Get Vector Layers and Subset {huc}" - # print(msg) - logging.info(msg) + logging.info(f"-- {huc} : Get Vector Layers and Subset") # Subset Vector Layers (after determining whether it's alaska or not) if huc2Identifier == '19': @@ -444,12 +453,10 @@ def huc_level_clip_vectors_to_wbd(huc, outputs_dir): huc_CRS=huc_CRS, # TODO: simplify ) - msg = f" Completing Get Vector Layers and Subset: {huc} \n" - print(msg) - logging.info(msg) + logging.info(f"-- {huc} : Completing Get Vector Layers and Subset:\n") ## Clip WBD8 ## - print(f" Creating WBD buffer and clip version {huc}") + logging.info(f"-- {huc} : Creating WBD buffer and clip version") clip_wbd8_subprocess = subprocess.run( [ @@ -470,36 +477,38 @@ def huc_level_clip_vectors_to_wbd(huc, outputs_dir): universal_newlines=True, ) - # msg = clip_wbd8_subprocess.stdout - # print(f"{huc} : {msg}") - # logging.info(f"{huc} : {msg}") - - if clip_wbd8_subprocess.stderr != "": - if "ERROR" in clip_wbd8_subprocess.stderr.upper(): - msg = ( - f" - Creating -- {huc_directory}/wbd.gpkg" - f" ERROR -- details: ({clip_wbd8_subprocess.stderr})" - ) - print(msg) - logging.info(msg) + sub_proc_stdout = clip_wbd8_subprocess.stdout + sub_proc_stderror = clip_wbd8_subprocess.stderr + if sub_proc_stderror.lower().startswith("warning") is True: + logging.info(f"Warning from ogr clip: {sub_proc_stderror}") + elif sub_proc_stderror != "": + raise Exception( + f"*** {huc} : Error WBD buffer and clip version from ogr for {huc}:" + f" Details: {sub_proc_stderror}\n" + ) else: - msg = f" - Creating -- {huc_directory}/wbd.gpkg - Complete" - print(msg) + msg = f"-- {huc} : Creating WBD buffer and clip version - Complete" + if sub_proc_stdout != "": # warnings? + msg += f": {sub_proc_stdout}" + logging.info(msg) except Exception: - print(f"*** An error occurred while processing {huc}") - print(traceback.format_exc()) logging.info(f"*** An error occurred while processing {huc}") logging.info(traceback.format_exc()) - print() + logging.info("") + in_error = True huc_processing_end = dt.datetime.now(dt.timezone.utc) time_duration = huc_processing_end - huc_processing_start - duraton_msg = f"\t \t run time for huc {huc}: is {str(time_duration).split('.')[0]}" - print(duraton_msg) + duraton_msg = f"-- {huc} : run time is {str(time_duration).split('.')[0]}\n" logging.info(duraton_msg) - return + + # if in error return the failed huc number + if in_error is True: + return huc + else: + return "Success" if __name__ == '__main__': @@ -542,4 +551,9 @@ def huc_level_clip_vectors_to_wbd(huc, outputs_dir): args = vars(parser.parse_args()) - pre_clip_hucs_from_wbd(**args) + try: + pre_clip_hucs_from_wbd(**args) + except Exception: + logging.info(traceback.format_exc()) + end_time = dt.datetime.now(dt.timezone.utc) + logging.info(f" End time: {end_time.strftime('%m/%d/%Y %H:%M:%S')}") diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c5f70aa57..539c53fa2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,23 +1,45 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. -## v4.5.x.x - 2024-06-18 - [PR#1195](https://github.com/NOAA-OWP/inundation-mapping/pull/1195) +## v4.5.2.6 - 2024-07-12 - [PR#1184](https://github.com/NOAA-OWP/inundation-mapping/pull/1184) -It focuses on adjusting rating curves by using bathymetric data. The bathymetry data includes eHydro surveys and AI-based datasets created for all NWM streams. +This PR adds a new script to determine which bridges are inundated by a specific flow. It will assign a risk status to each bridge point based on a specific threshold. -### Changes -- `src/bathymetric-adjustment.py`: 'correct_rating_for_ai_based_bathymetry" function was added to the script. This function processes AI-based bathymetry data and adjusts rating curves using this data. Also "apply_src_adjustment_for_bathymetry' function was added to prioritize USACE eHydro over AI-based bathymetry dataset. Multi-processing function "multi_process_hucs" was also updated based on the latest code. +### Additions -- `src/bash_variables.env`: New variables and their paths were added. +- `tools/bridge_inundation.py` -- `fim_post_processing.sh`: New arguments were added. +

-### Testing -This PR has been tested on 11 HUC8s around the Illinois River and Ohio River. + +## v4.5.2.5 - 2024-07-08 - [PR#1205](https://github.com/NOAA-OWP/inundation-mapping/pull/1205) + +Snaps crosswalk from the midpoint of DEM-derived reaches to the nearest point on NWM streams within a threshold of 100 meters. DEM-derived streams that do not locate any NWM streams within 100 meters of their midpoints are removed from the FIM hydrofabric and their catchments are not inundated. + +### Changes + +- `src/add_crosswalk.py`: Locates nearest NWM stream to midpoint of DEM-derived reaches if within 100 meters. Also fixes a couple of minor bugs.

+## v4.5.2.4 - 2024-07-08 - [PR#1204](https://github.com/NOAA-OWP/inundation-mapping/pull/1204) + +Bug fix for extending outlets in order to ensure proper flow direction in depression filling algorithm. This PR adds a distance criteria that in order for the end of an outlet stream to be snapped to the wbd_buffered boundary, the end point must be less than 100 meters from the WBD boundary. + +Also adds missing argparse arguments so that the script can be run from the command line. + +### Changes + +- `data` + - `wbd` + - `clip_vectors_to_wbd.py`: Adds a 100 meter distance threshold to WBD to snap outlets to buffered WBD. + - `generate_pre_clip_fim_huc8.py`: Upgrading logging system. +- `src` + - `bash_variables.env`: Updated pre-clip input path to new pre-clip files. + +

+ ## v4.5.2.3 - 2024-06-14 - [PR#1169](https://github.com/NOAA-OWP/inundation-mapping/pull/1169) This tool scans all log directory looking for the word "error" (not case-sensitive). This is primary added to help find errors in the post processing logs such as src_optimization folder (and others). @@ -29,7 +51,6 @@ This tool scans all log directory looking for the word "error" (not case-sensiti

- ## v4.5.2.2 - 2024-06-14 - [PR#1183](https://github.com/NOAA-OWP/inundation-mapping/pull/1183) Upgrades whitebox from v2.3.1 to 2.3.4. Whitebox was upgraded by `pip install -U whitebox` and then `pipenv lock` to update the `Pipfile`. @@ -189,6 +210,10 @@ Some NWM streams, particularly in coastal areas, fail to reach the edge of the D ### Changes - `data/wbd/clip_vectors_to_wbd.py`: Clips `landsea` ocean mask from the buffered WBD and adds a function to extend outlet streams to the buffered WBD + +

+ + - `data/wbd/clip_vectors_to_wbd.py`: Updated multi-processing and added more logging.

@@ -330,6 +355,7 @@ The "black" packages is also be upgraded from 23.7.0 to 24.3.

+ ## v4.4.13.1 - 2024-03-11 - [PR#1086](https://github.com/NOAA-OWP/inundation-mapping/pull/1086) Fixes bug where levee-protected areas were not being masked from branch 0 DEMs. diff --git a/fim_post_processing.sh b/fim_post_processing.sh index 0155356ff..aae310f01 100755 --- a/fim_post_processing.sh +++ b/fim_post_processing.sh @@ -144,6 +144,17 @@ if [ "$src_subdiv_toggle" = "True" ] && [ "$src_bankfull_toggle" = "True" ]; the Tcount fi +## RUN MANNING N OPTIMIZATION ROUTINE AND ADJUST SRCS ## +if [ "$src_optz_manningN_toggle" = "True" ]; then + echo -e $startDiv"Optimizing manningN for SRCs" + # Run manningN optimization routine + Tstart + python3 $srcDir/manningN_optimization.py \ + -fim_dir $outputDestDir \ + -mannN_aibased $mannN_file_aibased + Tcount +fi + ## RUN SYNTHETIC RATING CURVE CALIBRATION W/ USGS GAGE RATING CURVES ## if [ "$src_adjust_usgs" = "True" ] && [ "$src_subdiv_toggle" = "True" ] && [ "$skipcal" = "0" ]; then Tstart @@ -220,13 +231,20 @@ Tcount date -u echo -e $startDiv"Resetting Permissions" -find $outputDestDir -type d -exec chmod -R 777 {} + -find $outputDestDir -type f -exec chmod 777 {} + # just root level files +Tstart + find $outputDestDir -type d -exec chmod -R 777 {} + + find $outputDestDir -type f -exec chmod 777 {} + # just root level files +Tcount +date -u echo echo -e $startDiv"Scanning logs for errors. Results be saved in root not inside the log folder." -# grep -H -r -i -n "error" $outputDestDir/logs/ > $outputDestDir/all_errors_from_logs.log -find $outputDestDir -type f | grep -H -r -i -n "error" $outputDestDir/logs/ > $outputDestDir/all_errors_from_logs.log & +Tstart + # grep -H -r -i -n "error" $outputDestDir/logs/ > $outputDestDir/all_errors_from_logs.log + find $outputDestDir -type f | grep -H -r -i -n "error" $outputDestDir/logs/ > $outputDestDir/all_errors_from_logs.log +Tcount +date -u + echo echo "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++" diff --git a/src/add_crosswalk.py b/src/add_crosswalk.py index ddc9444df..62789c9b3 100755 --- a/src/add_crosswalk.py +++ b/src/add_crosswalk.py @@ -40,130 +40,70 @@ def add_crosswalk( input_catchments = gpd.read_file(input_catchments_fileName, engine="pyogrio", use_arrow=True) input_flows = gpd.read_file(input_flows_fileName, engine="pyogrio", use_arrow=True) input_huc = gpd.read_file(input_huc_fileName, engine="pyogrio", use_arrow=True) + input_nwmcat = gpd.read_file(input_nwmcat_fileName, engine="pyogrio", use_arrow=True) input_nwmflows = gpd.read_file(input_nwmflows_fileName, engine="pyogrio", use_arrow=True) min_catchment_area = float(min_catchment_area) # 0.25# min_stream_length = float(min_stream_length) # 0.5# - if extent == 'FR': - ## crosswalk using majority catchment method - - # calculate majority catchments - majority_calc = zonal_stats( - input_catchments, input_nwmcatras_fileName, stats=['majority'], geojson_out=True - ) - input_majorities = gpd.GeoDataFrame.from_features(majority_calc) - input_majorities = input_majorities.rename(columns={'majority': 'feature_id'}) - - input_majorities = input_majorities[:][input_majorities['feature_id'].notna()] - if input_majorities.feature_id.dtype != 'int': - input_majorities.feature_id = input_majorities.feature_id.astype(int) - if input_majorities.HydroID.dtype != 'int': - input_majorities.HydroID = input_majorities.HydroID.astype(int) - - input_nwmflows = input_nwmflows.rename(columns={'ID': 'feature_id'}) - if input_nwmflows.feature_id.dtype != 'int': - input_nwmflows.feature_id = input_nwmflows.feature_id.astype(int) - relevant_input_nwmflows = input_nwmflows[ - input_nwmflows['feature_id'].isin(input_majorities['feature_id']) - ] - relevant_input_nwmflows = relevant_input_nwmflows.filter(items=['feature_id', 'order_']) - - if input_catchments.HydroID.dtype != 'int': - input_catchments.HydroID = input_catchments.HydroID.astype(int) - output_catchments = input_catchments.merge(input_majorities[['HydroID', 'feature_id']], on='HydroID') - output_catchments = output_catchments.merge( - relevant_input_nwmflows[['order_', 'feature_id']], on='feature_id' - ) - - if input_flows.HydroID.dtype != 'int': - input_flows.HydroID = input_flows.HydroID.astype(int) - output_flows = input_flows.merge(input_majorities[['HydroID', 'feature_id']], on='HydroID') - if output_flows.HydroID.dtype != 'int': - output_flows.HydroID = output_flows.HydroID.astype(int) - output_flows = output_flows.merge(relevant_input_nwmflows[['order_', 'feature_id']], on='feature_id') - output_flows = output_flows.merge( - output_catchments.filter(items=['HydroID', 'areasqkm']), on='HydroID' - ) - - elif (extent == 'MS') | (extent == 'GMS'): - ## crosswalk using stream segment midpoint method - input_nwmcat = gpd.read_file(input_nwmcat_fileName, mask=input_huc, engine="fiona") - - # only reduce nwm catchments to mainstems if running mainstems - if extent == 'MS': - input_nwmcat = input_nwmcat.loc[input_nwmcat.mainstem == 1] - - input_nwmcat = input_nwmcat.rename(columns={'ID': 'feature_id'}) - if input_nwmcat.feature_id.dtype != 'int': - input_nwmcat.feature_id = input_nwmcat.feature_id.astype(int) - input_nwmcat = input_nwmcat.set_index('feature_id') - - input_nwmflows = input_nwmflows.rename(columns={'ID': 'feature_id'}) - if input_nwmflows.feature_id.dtype != 'int': - input_nwmflows.feature_id = input_nwmflows.feature_id.astype(int) - - # Get stream midpoint - stream_midpoint = [] - hydroID = [] - for i, lineString in enumerate(input_flows.geometry): - hydroID = hydroID + [input_flows.loc[i, 'HydroID']] - stream_midpoint = stream_midpoint + [lineString.interpolate(0.5, normalized=True)] - - input_flows_midpoint = gpd.GeoDataFrame( - {'HydroID': hydroID, 'geometry': stream_midpoint}, crs=input_flows.crs, geometry='geometry' - ) - input_flows_midpoint = input_flows_midpoint.set_index('HydroID') - - # Create crosswalk - crosswalk = gpd.sjoin( - input_flows_midpoint, input_nwmcat, how='left', predicate='within' - ).reset_index() - crosswalk = crosswalk.rename(columns={"index_right": "feature_id"}) - - # fill in missing ms - crosswalk_missing = crosswalk.loc[crosswalk.feature_id.isna()] - for index, stream in crosswalk_missing.iterrows(): - # find closest nwm catchment by distance - distances = [stream.geometry.distance(poly) for poly in input_nwmcat.geometry] - min_dist = min(distances) - nwmcat_index = distances.index(min_dist) - - # update crosswalk - crosswalk.loc[crosswalk.HydroID == stream.HydroID, 'feature_id'] = input_nwmcat.iloc[ - nwmcat_index - ].name - crosswalk.loc[crosswalk.HydroID == stream.HydroID, 'AreaSqKM'] = input_nwmcat.iloc[ - nwmcat_index - ].AreaSqKM - crosswalk.loc[crosswalk.HydroID == stream.HydroID, 'Shape_Length'] = input_nwmcat.iloc[ - nwmcat_index - ].Shape_Length - crosswalk.loc[crosswalk.HydroID == stream.HydroID, 'Shape_Area'] = input_nwmcat.iloc[ - nwmcat_index - ].Shape_Area - - crosswalk = crosswalk.filter(items=['HydroID', 'feature_id']) - crosswalk = crosswalk.merge(input_nwmflows[['feature_id', 'order_']], on='feature_id') - - if len(crosswalk) < 1: - print("No relevant streams within HUC boundaries.") - sys.exit(0) - - if input_catchments.HydroID.dtype != 'int': - input_catchments.HydroID = input_catchments.HydroID.astype(int) - output_catchments = input_catchments.merge(crosswalk, on='HydroID') - - if input_flows.HydroID.dtype != 'int': - input_flows.HydroID = input_flows.HydroID.astype(int) - output_flows = input_flows.merge(crosswalk, on='HydroID') - - # added for GMS. Consider adding filter_catchments_and_add_attributes.py to run_by_branch.sh - if 'areasqkm' not in output_catchments.columns: - output_catchments['areasqkm'] = output_catchments.geometry.area / (1000**2) - - output_flows = output_flows.merge( - output_catchments.filter(items=['HydroID', 'areasqkm']), on='HydroID' - ) + input_catchments = input_catchments.dissolve(by='HydroID').reset_index() + + ## crosswalk using stream segment midpoint method + input_nwmcat = gpd.read_file(input_nwmcat_fileName, mask=input_huc, engine="fiona") + + # only reduce nwm catchments to mainstems if running mainstems + if extent == 'MS': + input_nwmcat = input_nwmcat.loc[input_nwmcat.mainstem == 1] + + input_nwmcat = input_nwmcat.rename(columns={'ID': 'feature_id'}) + if input_nwmcat.feature_id.dtype != 'int': + input_nwmcat.feature_id = input_nwmcat.feature_id.astype(int) + input_nwmcat = input_nwmcat.set_index('feature_id') + + input_nwmflows = input_nwmflows.rename(columns={'ID': 'feature_id'}) + if input_nwmflows.feature_id.dtype != 'int': + input_nwmflows.feature_id = input_nwmflows.feature_id.astype(int) + input_nwmflows = input_nwmflows.set_index('feature_id') + + # Get stream midpoint + stream_midpoint = [] + hydroID = [] + for i, lineString in enumerate(input_flows.geometry): + hydroID = hydroID + [input_flows.loc[i, 'HydroID']] + stream_midpoint = stream_midpoint + [lineString.interpolate(0.5, normalized=True)] + + input_flows_midpoint = gpd.GeoDataFrame( + {'HydroID': hydroID, 'geometry': stream_midpoint}, crs=input_flows.crs, geometry='geometry' + ) + input_flows_midpoint = input_flows_midpoint.set_index('HydroID') + + # Create crosswalk + crosswalk = gpd.sjoin_nearest( + input_flows_midpoint, input_nwmflows, how='left', distance_col='distance' + ).reset_index() + crosswalk = crosswalk.rename(columns={"index_right": "feature_id"}) + + crosswalk.loc[crosswalk['distance'] > 100.0, 'feature_id'] = pd.NA + + crosswalk = crosswalk.filter(items=['HydroID', 'feature_id', 'distance']) + crosswalk = crosswalk.merge(input_nwmflows[['order_']], on='feature_id') + + if len(crosswalk) < 1: + print("No relevant streams within HUC boundaries.") + sys.exit(0) + + if input_catchments.HydroID.dtype != 'int': + input_catchments.HydroID = input_catchments.HydroID.astype(int) + output_catchments = input_catchments.merge(crosswalk, on='HydroID') + + if input_flows.HydroID.dtype != 'int': + input_flows.HydroID = input_flows.HydroID.astype(int) + output_flows = input_flows.merge(crosswalk, on='HydroID') + + # added for GMS. Consider adding filter_catchments_and_add_attributes.py to run_by_branch.sh + if 'areasqkm' not in output_catchments.columns: + output_catchments['areasqkm'] = output_catchments.geometry.area / (1000**2) + + output_flows = output_flows.merge(output_catchments.filter(items=['HydroID', 'areasqkm']), on='HydroID') output_flows['ManningN'] = mannings_n @@ -223,7 +163,7 @@ def add_crosswalk( elif len(output_flows.loc[output_flows.From_Node == to_node]['HydroID']) > 1: try: max_order = max( - output_flows.loc[output_flows.From_Node == to_node]['HydroID']['order_'] + output_flows.loc[output_flows.From_Node == to_node]['order_'] ) # drainage area would be better than stream order but we would need to calculate except Exception as e: print( @@ -276,7 +216,7 @@ def add_crosswalk( update_id = output_flows.loc[output_flows.From_Node == to_node]['HydroID'].item() else: - update_id = output_flows.loc[output_flows.HydroID == short_id]['HydroID'].item() + update_id = output_flows[output_flows.HydroID == short_id]['HydroID'].iloc[0] output_order = output_flows.loc[output_flows.HydroID == short_id]['order_'] if len(output_order) == 1: @@ -347,10 +287,7 @@ def add_crosswalk( ['Discharge (m3s-1)'], ] = src_stage[1] - if extent == 'FR': - output_src = output_src.merge(input_majorities[['HydroID', 'feature_id']], on='HydroID') - elif (extent == 'MS') | (extent == 'GMS'): - output_src = output_src.merge(crosswalk[['HydroID', 'feature_id']], on='HydroID') + output_src = output_src.merge(crosswalk[['HydroID', 'feature_id']], on='HydroID') output_crosswalk = output_src[['HydroID', 'feature_id']] output_crosswalk = output_crosswalk.drop_duplicates(ignore_index=True) diff --git a/src/bash_variables.env b/src/bash_variables.env index 9ef5a67d4..17e75af10 100644 --- a/src/bash_variables.env +++ b/src/bash_variables.env @@ -6,6 +6,9 @@ export DEFAULT_FIM_PROJECTION_CRS=EPSG:5070 export ALASKA_CRS=EPSG:3338 # NOTE: $inputsDir is defined in Dockerfile + +export pre_clip_huc_dir=${inputsDir}/pre_clip_huc8/20240702 + export input_DEM=${inputsDir}/3dep_dems/10m_5070/fim_seamless_3dep_dem_10m_5070.vrt export input_DEM_Alaska=${inputsDir}/3dep_dems/10m_South_Alaska/23_11_07/FIM_3dep_dem_South_Alask_10m.vrt export input_DEM_domain=${inputsDir}/3dep_dems/10m_5070/HUC6_dem_domain.gpkg @@ -31,16 +34,17 @@ export input_calib_points_dir=${inputsDir}/rating_curve/water_ed export pre_clip_huc_dir=${inputsDir}/pre_clip_huc8/240502 export bathy_file_ehydro=${inputsDir}/bathymetry/bathymetric_adjustment_data.gpkg export bathy_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet +export mannN_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet export osm_bridges=${inputsDir}/osm/bridges/240426/osm_all_bridges.gpkg # input file location with nwm feature_id and recurrence flow values -export bankfull_flows_file=${inputsDir}/rating_curve/bankfull_flows/nwm_high_water_threshold_cms.csv +export bankfull_flows_file=${inputsDir}/rating_curve/bankfull_flows/nwm3_high_water_threshold_cms.csv # input file location with nwm feature_id and channel roughness and overbank roughness attributes export vmann_input_file=${inputsDir}/rating_curve/variable_roughness/mannings_global_06_12.csv # input file location with nwm feature_id and recurrence flow values -export nwm_recur_file=${inputsDir}/rating_curve/nwm_recur_flows/nwm21_17C_recurrence_flows_cfs.csv +export nwm_recur_file=${inputsDir}/rating_curve/nwm_recur_flows/nwm3_17C_recurrence_flows_cfs.csv # input file location with usgs rating curve database export usgs_rating_curve_csv=${inputsDir}/usgs_gages/usgs_rating_curves.csv diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index 75b6425c7..94abe3858 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -12,6 +12,39 @@ import geopandas as gpd import pandas as pd +# ------------------------------------------------------- +# data = '/efs-drives/fim-dev-efs/fim_data' +# ai_mannN_parquet_path = "/data/inputs/rating_curve/variable_roughness/ml_outputs_v1.01.parquet" +# original_mannN_csv_path = "/data/inputs/rating_curve/variable_roughness/mannings_global_06_12.csv" +# path_to_save = "/data/inputs/rating_curve/variable_roughness/mannings_ai.csv" + +# Replacing manning numbers with AI-Driven ones +def Replacing_mannN_AI_coefs(original_mannN_csv_path, ai_mannN_parquet_path, path_to_save): + + org_mannN_data = pd.read_csv(original_mannN_csv_path) + org_mannN_data_df = org_mannN_data[['feature_id', 'channel_n', 'overbank_n']] + org_mannN_data_df = org_mannN_data_df.rename(columns={'channel_n': 'channel_n_06'}) + org_mannN_data_df = org_mannN_data_df.rename(columns={'overbank_n': 'overbank_n_12'}) + + ai_mannN_data = pd.read_parquet(ai_mannN_parquet_path, engine='pyarrow') + ai_mannN_data_df = ai_mannN_data[['COMID', 'owp_roughness']] + ai_mannN_data_df = ai_mannN_data_df.rename(columns={'COMID': 'feature_id'}) + ai_mannN_data_df = ai_mannN_data_df.rename(columns={'owp_roughness': 'channel_n'}) + ai_mannN_data_df['overbank_n'] = ai_mannN_data_df['channel_n'] + + ai_mannN_data_df_F = ai_mannN_data_df.merge(org_mannN_data_df[['feature_id', 'channel_n_06', 'overbank_n_12']], + on='feature_id', + how='left', + validate='many_to_one') + + ai_mannN_data_df_F = ai_mannN_data_df_F.fillna({'channel_n': 0.06, 'overbank_n': 0.12}) + ai_mannN_data_df_F.drop(columns=['channel_n_06', 'overbank_n_12'], inplace=True) + ai_mannN_data_df_F = ai_mannN_data_df_F.drop_duplicates(subset=['feature_id'], keep='first') + # test = ai_mannN_data_df_F[ai_mannN_data_df_F.duplicated(subset='feature_id', keep=False)] + ai_mannN_data_df_F.sort_values('feature_id', inplace=True) + + ai_mannN_data_df_F.to_csv(path_to_save, index=False) + # ------------------------------------------------------- # Adjusting synthetic rating curves using 'USACE eHydro' bathymetry data diff --git a/src/manningN_optimization.py b/src/manningN_optimization.py new file mode 100644 index 000000000..c622b0ff6 --- /dev/null +++ b/src/manningN_optimization.py @@ -0,0 +1,643 @@ + +import json +import argparse +from datetime import datetime +import multiprocessing +from multiprocessing import Pool +from concurrent.futures import ProcessPoolExecutor, as_completed, wait +import os +from os.path import join +import re +import sys +import traceback +import warnings +import csv +import numpy as np + +import geopandas as gpd +import pandas as pd +from scipy.optimize import minimize, differential_evolution +import random + +from run_test_case import Test_Case +from tools_shared_variables import ( + AHPS_BENCHMARK_CATEGORIES, + MAGNITUDE_DICT, + PREVIOUS_FIM_DIR, + TEST_CASES_DIR, +) + +# fim_dir = "/home/rdp-user/outputs/mno_11010004_cal_off_0710/" +# huc = "11010004" +# mannN_file_aibased = "/efs-drives/fim-dev-efs/fim-data/inputs/rating_curve/variable_roughness/ml_outputs_v1.01.parquet" + +# ********************************************************* +def create_master_metrics_csv(fim_version): #prev_metrics_csv, + """ + This function searches for and collates metrics into a single CSV file that can queried database-style. + The CSV is an input to eval_plots.py. + This function automatically looks for metrics produced for official versions and loads them into + memory to be written to the output CSV. + + Args: + master_metrics_csv_output (str) : Full path to CSV output. + If a file already exists at this path, it will be overwritten. + dev_versions_to_include_list (list): A list of non-official FIM version names. + If a user supplied information on the command line using the + -dc flag, then this function will search for metrics in the + "testing_versions" library of metrics and include them in + the CSV output. + """ + + + # Default to processing all possible versions in PREVIOUS_FIM_DIR. + config = "DEV" + # Specify which results to iterate through + if config == 'DEV': + iteration_list = [ + 'official', + 'testing', + ] # iterating through official model results AND testing model(s) + else: + iteration_list = ['official'] # only iterating through official model results + + prev_versions_to_include_list = [] + dev_versions_to_include_list = [] + + prev_versions_to_include_list = os.listdir(PREVIOUS_FIM_DIR) + if config == 'DEV': # development fim model results + dev_versions_to_include_list = [fim_version] + + # Construct header + metrics_to_write = [ + 'true_negatives_count', + 'false_negatives_count', + 'true_positives_count', + 'false_positives_count', + 'contingency_tot_count', + 'cell_area_m2', + 'TP_area_km2', + 'FP_area_km2', + 'TN_area_km2', + 'FN_area_km2', + 'contingency_tot_area_km2', + 'predPositive_area_km2', + 'predNegative_area_km2', + 'obsPositive_area_km2', + 'obsNegative_area_km2', + 'positiveDiff_area_km2', + 'CSI', + 'FAR', + 'TPR', + 'TNR', + 'PND', + 'PPV', + 'NPV', + 'ACC', + 'Bal_ACC', + 'MCC', + 'EQUITABLE_THREAT_SCORE', + 'PREVALENCE', + 'BIAS', + 'F1_SCORE', + 'TP_perc', + 'FP_perc', + 'TN_perc', + 'FN_perc', + 'predPositive_perc', + 'predNegative_perc', + 'obsPositive_perc', + 'obsNegative_perc', + 'positiveDiff_perc', + 'masked_count', + 'masked_perc', + 'masked_area_km2', + ] + + # Create table header + additional_header_info_prefix = ['version', 'nws_lid', 'magnitude', 'huc'] + list_to_write = [ + additional_header_info_prefix + + metrics_to_write + + ['full_json_path'] + + ['flow'] + + ['benchmark_source'] + + ['extent_config'] + + ["calibrated"] + ] + + # add in composite of versions (used for previous FIM3 versions) + if "official" in iteration_list: + composite_versions = [v.replace('_ms', '_comp') for v in prev_versions_to_include_list if '_ms' in v] + prev_versions_to_include_list += composite_versions + + # Iterate through 5 benchmark sources + for benchmark_source in ['ble', 'nws', 'usgs', 'ifc', 'ras2fim']: + benchmark_test_case_dir = os.path.join(TEST_CASES_DIR, benchmark_source + '_test_cases') + test_cases_list = [d for d in os.listdir(benchmark_test_case_dir) if re.match(r'\d{8}_\w{3,7}', d)] + + if benchmark_source in ['ble', 'ifc', 'ras2fim']: + magnitude_list = MAGNITUDE_DICT[benchmark_source] + + # Iterate through available test cases + for each_test_case in test_cases_list: + try: + # Get HUC id + int(each_test_case.split('_')[0]) + huc = each_test_case.split('_')[0] + + # Update filepaths based on whether the official or dev versions should be included + for iteration in iteration_list: + if ( + iteration == "official" + ): # and str(pfiles) == "True": # "official" refers to previous finalized model versions + versions_to_crawl = os.path.join( + benchmark_test_case_dir, each_test_case, 'official_versions' + ) + versions_to_aggregate = prev_versions_to_include_list + + if ( + iteration == "testing" + ): # "testing" refers to the development model version(s) being evaluated + versions_to_crawl = os.path.join( + benchmark_test_case_dir, each_test_case, 'testing_versions' + ) + versions_to_aggregate = dev_versions_to_include_list + + # Pull version info from filepath + for magnitude in magnitude_list: + for version in versions_to_aggregate: + if '_ms' in version: + extent_config = 'MS' + elif ('_fr' in version) or (version == 'fim_2_3_3'): + extent_config = 'FR' + else: + extent_config = 'COMP' + if "_c" in version and version.split('_c')[1] == "": + calibrated = "yes" + else: + calibrated = "no" + version_dir = os.path.join(versions_to_crawl, version) + magnitude_dir = os.path.join(version_dir, magnitude) + + # Add metrics from file to metrics table ('list_to_write') + if os.path.exists(magnitude_dir): + magnitude_dir_list = os.listdir(magnitude_dir) + for f in magnitude_dir_list: + if '.json' in f: + flow = 'NA' + nws_lid = "NA" + sub_list_to_append = [version, nws_lid, magnitude, huc] + full_json_path = os.path.join(magnitude_dir, f) + if os.path.exists(full_json_path): + stats_dict = json.load(open(full_json_path)) + for metric in metrics_to_write: + sub_list_to_append.append(stats_dict[metric]) + sub_list_to_append.append(full_json_path) + sub_list_to_append.append(flow) + sub_list_to_append.append(benchmark_source) + sub_list_to_append.append(extent_config) + sub_list_to_append.append(calibrated) + + list_to_write.append(sub_list_to_append) + except ValueError: + pass + + # Iterate through AHPS benchmark data + if benchmark_source in AHPS_BENCHMARK_CATEGORIES: + test_cases_list = os.listdir(benchmark_test_case_dir) + + for each_test_case in test_cases_list: + try: + # Get HUC id + int(each_test_case.split('_')[0]) + huc = each_test_case.split('_')[0] + + # Update filepaths based on whether the official or dev versions should be included + for iteration in iteration_list: + if iteration == "official": # "official" refers to previous finalized model versions + versions_to_crawl = os.path.join( + benchmark_test_case_dir, each_test_case, 'official_versions' + ) + versions_to_aggregate = prev_versions_to_include_list + + if ( + iteration == "testing" + ): # "testing" refers to the development model version(s) being evaluated + versions_to_crawl = os.path.join( + benchmark_test_case_dir, each_test_case, 'testing_versions' + ) + versions_to_aggregate = dev_versions_to_include_list + + # Pull model info from filepath + for magnitude in ['action', 'minor', 'moderate', 'major']: + for version in versions_to_aggregate: + if '_ms' in version: + extent_config = 'MS' + elif ('_fr' in version) or (version == 'fim_2_3_3'): + extent_config = 'FR' + else: + extent_config = 'COMP' + if "_c" in version and version.split('_c')[1] == "": + calibrated = "yes" + else: + calibrated = "no" + + version_dir = os.path.join(versions_to_crawl, version) + magnitude_dir = os.path.join(version_dir, magnitude) + + if os.path.exists(magnitude_dir): + magnitude_dir_list = os.listdir(magnitude_dir) + for f in magnitude_dir_list: + if '.json' in f and 'total_area' not in f: + nws_lid = f[:5] + sub_list_to_append = [version, nws_lid, magnitude, huc] + full_json_path = os.path.join(magnitude_dir, f) + flow = '' + if os.path.exists(full_json_path): + # Get flow used to map + flow_file = os.path.join( + benchmark_test_case_dir, + 'validation_data_' + benchmark_source, + huc, + nws_lid, + magnitude, + 'ahps_' + + nws_lid + + '_huc_' + + huc + + '_flows_' + + magnitude + + '.csv', + ) + if os.path.exists(flow_file): + with open(flow_file, newline='') as csv_file: + reader = csv.reader(csv_file) + next(reader) + for row in reader: + flow = row[1] + + # Add metrics from file to metrics table ('list_to_write') + stats_dict = json.load(open(full_json_path)) + for metric in metrics_to_write: + sub_list_to_append.append(stats_dict[metric]) + sub_list_to_append.append(full_json_path) + sub_list_to_append.append(flow) + sub_list_to_append.append(benchmark_source) + sub_list_to_append.append(extent_config) + sub_list_to_append.append(calibrated) + list_to_write.append(sub_list_to_append) + except ValueError: + pass + + df_to_write = pd.DataFrame(list_to_write) + df_to_write.columns = df_to_write.iloc[0] + df_to_write = df_to_write[1:] + + return df_to_write + + +# ********************************************************* +def run_test_cases(fim_version): #prev_metrics_csv, + """ + This function + """ + + print("================================") + print("Start synthesize test cases") + start_time = datetime.now() + dt_string = datetime.now().strftime("%m/%d/%Y %H:%M:%S") + print(f"started: {dt_string}") + print() + + # Default to processing all possible versions in PREVIOUS_FIM_DIR. + fim_version = fim_version #"all" + + # Create a list of all test_cases for which we have validation data + archive_results = False + benchmark_category = "all" + all_test_cases = Test_Case.list_all_test_cases( + version=fim_version, + archive=archive_results, + benchmark_categories=[] if benchmark_category == "all" else [benchmark_category], + ) + model = "GMS" + job_number_huc = 1 + overwrite=True + verbose=False + calibrated = False + job_number_branch = 5 + # Set up multiprocessor + with ProcessPoolExecutor(max_workers=job_number_huc) as executor: + # Loop through all test cases, build the alpha test arguments, and submit them to the process pool + executor_dict = {} + for test_case_class in all_test_cases: + # if not os.path.exists(test_case_class.fim_dir): + # continue + alpha_test_args = { + 'calibrated': calibrated, + 'model': model, + 'mask_type': 'huc', + 'overwrite': overwrite, + 'verbose': verbose, + 'gms_workers': job_number_branch, + } + try: + future = executor.submit(test_case_class.alpha_test, **alpha_test_args) + executor_dict[future] = test_case_class.test_id + except Exception as ex: + print(f"*** {ex}") + traceback.print_exc() + sys.exit(1) + + # # Send the executor to the progress bar and wait for all MS tasks to finish + # progress_bar_handler( + # executor_dict, True, f"Running {model} alpha test cases with {job_number_huc} workers" + # ) + metrics_df = create_master_metrics_csv(fim_version) + + print("================================") + print("End synthesize test cases") + + end_time = datetime.now() + dt_string = datetime.now().strftime("%m/%d/%Y %H:%M:%S") + print(f"ended: {dt_string}") + + # Calculate duration + time_duration = end_time - start_time + print(f"Duration: {str(time_duration).split('.')[0]}") + print() + + return metrics_df + + +# ********************************************************* +def initialize_mannN_ai(fim_dir, huc, mannN_file_aibased): + + # log_text = f'Initializing manningN for HUC: {huc}\n' + + # Load AI-based manning number data + ai_mannN_data = pd.read_parquet(mannN_file_aibased, engine='pyarrow') + # aib_mannN_data.columns + ai_mannN_data_df = ai_mannN_data[["COMID", "owp_roughness"]] + + # Clip ai_manningN for HUC8 + fim_huc_dir = join(fim_dir, huc) + + path_nwm_streams = join(fim_huc_dir, "nwm_subset_streams.gpkg") + nwm_stream = gpd.read_file(path_nwm_streams) + + wbd8_path = join(fim_huc_dir, 'wbd.gpkg') + wbd8 = gpd.read_file(wbd8_path, engine="pyogrio", use_arrow=True) + nwm_stream_clp = nwm_stream.clip(wbd8) + + ai_mannN_data_df_huc = ai_mannN_data_df.merge(nwm_stream_clp, left_on='COMID', right_on='ID') + mannN_ai_df = ai_mannN_data_df_huc.drop_duplicates(subset=['COMID'], keep='first') + mannN_ai_df.index = range(len(mannN_ai_df)) + mannN_ai_df = mannN_ai_df.drop(columns=['ID', 'order_']) + mannN_ai_df = mannN_ai_df.rename(columns={'COMID': 'feature_id'}) + + # Initializing optimized manningN + # MaxN = 0.5 # MinN = 0.01 #AI-N min=0.01 #max=0.35 + mannN_ai_df['channel_ratio_optz'] = [random.uniform(0.025, 50) for _ in range(len(mannN_ai_df))] #[0.025,50] + mannN_ai_df['overbank_ratio_optz'] = [random.uniform(0.025, 50) for _ in range(len(mannN_ai_df))] #[0.025,50] + mannN_ai_df['channel_n_optz'] = mannN_ai_df['owp_roughness']# *mannN_ai_df['channel_ratio_optz'] + mannN_ai_df['overbank_n_optz'] = mannN_ai_df['owp_roughness']# *mannN_ai_df['overbank_ratio_optz']ManningN + mannN_ai_df['ManningN'] = mannN_ai_df['owp_roughness']# *mannN_ai_df['overbank_ratio_optz'] + + # ch_lower_optz = 0.01 + # ch_upper_optz = 0.20 + # ob_lower_optz = 0.01 + # ob_upper_optz = 0.50 + # mannN_ai_df['channel_n_optz'] = mannN_ai_df['channel_n_optz'].clip(lower=ch_lower_optz, upper=ch_upper_optz) + # mannN_ai_df['overbank_n_optz'] = mannN_ai_df['overbank_n_optz'].clip(lower=ob_lower_optz, upper=ob_upper_optz) + # mannN_ai_df.columns + + initial_mannN_df = mannN_ai_df[['feature_id', 'ManningN']] + + return initial_mannN_df + + +# ********************************************************* +def update_hydrotable_with_mannN_and_Q(fim_dir, huc, mannN_fid_df, mannN_values): #, mannN_values + + mannN_fid_df_temp = mannN_fid_df.copy() + mannN_fid_df_temp['ManningN'] = mannN_values + + fim_huc_dir = join(fim_dir, huc) + # Get hydro_table from each branch + ht_all_branches_path = [] + branches = os.listdir(join(fim_huc_dir, 'branches')) + for branch in branches: + ht_full = join(fim_huc_dir, 'branches', str(branch), f'hydroTable_{branch}.csv') + if os.path.isfile(ht_full): + ht_all_branches_path.append(ht_full) + + # Update hydro_table with updated Q and n + for ht_path in ht_all_branches_path: #[0:1] + # try: + ht_name = os.path.basename(ht_path) + branch = ht_name.split(".")[0].split("_")[-1] + ht_df = pd.read_csv(ht_path, dtype={'feature_id': 'int64'}, low_memory=False) # + + ## Check the Stage_bankfull exists in the hydro_table (channel ratio column that the user specified) + # drop these cols (in case optz_mann was previously performed) + if 'manningN_optz' in ht_df.columns: + ht_df = ht_df.drop( + ['manningN_optz', 'Discharge(cms)_optzN', 'optzN_on', 'discharge_cms', 'ManningN', 'overbank_n', 'channel_n'], + axis=1, + ) + ## Merge (crosswalk) the df of Manning's n with the SRC df + ht_df = ht_df.merge(mannN_fid_df_temp, how='left', on='feature_id') + + ## Calculate composite Manning's n using the channel geometry ratio attribute given by user + # print(ht_df['ManningN'][0:5]) + ht_df['manningN_optz'] = ht_df['ManningN'] + ## Define the channel geometry variable names to use from the hydroTable + hydr_radius = 'HydraulicRadius (m)' + wet_area = 'WetArea (m2)' + + ## Calculate Q using Manning's equation + ht_df['Discharge(cms)_optzN'] = ( + ht_df[wet_area] + * pow(ht_df[hydr_radius], 2.0 / 3) + * pow(ht_df['SLOPE'], 0.5) + / ht_df['manningN_optz'] + ) + optzN_on = True + ht_df['optzN_on'] = optzN_on + ht_df['discharge_cms'] = ht_df['Discharge(cms)_optzN'] + ht_df['overbank_n'] = ht_df['manningN_optz'] + ht_df['channel_n'] = ht_df['manningN_optz'] + ht_df.to_csv(ht_path, index=False) + + # except Exception as ex: + # summary = traceback.StackSummary.extract(traceback.walk_stack(None)) + # print( + # 'WARNING: ' + str(huc) + ' branch id: ' + str(branch) + " updadting hydro_table failed for some reason" + # ) + # # log_text += ( + # # 'ERROR --> ' + # # + str(huc) + # # + ' branch id: ' + # # + str(branch) + # # + " updating hydro_table failed (details: " + # # + (f"*** {ex}") + # # + (''.join(summary.format())) + # # + '\n' + # # ) + + +# ********************************************************* +def read_hydroTables(fim_dir, huc): #, mannN_values + + fim_huc_dir = join(fim_dir, huc) + # Get hydro_table from each branch + ht_all_branches_path = [] + branches = os.listdir(join(fim_huc_dir, 'branches')) + for branch in branches: + ht_full = join(fim_huc_dir, 'branches', str(branch), f'hydroTable_{branch}.csv') + if os.path.isfile(ht_full): + ht_all_branches_path.append(ht_full) + + # Update hydro_table with updated Q and n + initial_hydroTables_ls = [] + for ht_path in ht_all_branches_path: #[0:1] + # try: + ht_name = os.path.basename(ht_path) + branch = ht_name.split(".")[0].split("_")[-1] + ht_df = pd.read_csv(ht_path, dtype={'feature_id': 'int64'}, low_memory=False) # + + initial_hydroTables_ls.append(ht_df) + + return(initial_hydroTables_ls) + + +# ********************************************************* +def recalculate_Q_with_mannN_and_update_hydroTables(initial_hydroTables_ls, mannN_values, feature_ids): + + updated_hydroTables_ls = [] + for ht_df in initial_hydroTables_ls: + if 'manningN_optz' in ht_df.columns: + ht_df = ht_df.drop( + ['manningN_optz', 'Discharge(cms)_optzN', 'optzN_on', 'discharge_cms', 'ManningN', 'overbank_n', 'channel_n'], + axis=1, + ) + + # Create a temporary dataframe with updated ManningN values + temp_mannN_df = pd.DataFrame({'feature_id': feature_ids, 'ManningN': mannN_values}) + + # Merge the updated ManningN values + ht_df = ht_df.merge(temp_mannN_df, how='left', on='feature_id') + + # Rest of your calculations + hydr_radius = 'HydraulicRadius (m)' + wet_area = 'WetArea (m2)' + + ht_df['Discharge(cms)_optzN'] = ( + ht_df[wet_area] + * pow(ht_df[hydr_radius], 2.0 / 3) + * pow(ht_df['SLOPE'], 0.5) + / ht_df['ManningN'] + ) + ht_df['optzN_on'] = True + ht_df['manningN_optz'] = ht_df['ManningN'] + ht_df['discharge_cms'] = ht_df['Discharge(cms)_optzN'] + ht_df['overbank_n'] = ht_df['ManningN'] + ht_df['channel_n'] = ht_df['ManningN'] + + updated_hydroTables_ls.append(ht_df) + + return updated_hydroTables_ls + + +# ********************************************************* +def objective_function(mannN_values, feature_ids, initial_hydroTables_ls, iteration): + print(f'Iteration {iteration}: Current mannN values: {mannN_values[:5]}') # Print first 5 values for debugging + + updated_hydroTables_ls = recalculate_Q_with_mannN_and_update_hydroTables(initial_hydroTables_ls, mannN_values, feature_ids) + + error_ls = [] + for ht_df in updated_hydroTables_ls: + q_obj = ht_df['default_discharge_cms'] + q_dynamic = ht_df['discharge_cms'] + + error1 = np.sqrt(np.mean((q_obj - q_dynamic)**2)) # RMSE + error_ls.append(error1) + + error_mannN = np.sum(error_ls) + + print(f'Iteration {iteration}: Current error: {error_mannN}') + iteration[0] += 1 + + return error_mannN + + +def objective_function2(mannN_values, feature_ids, initial_hydroTables_ls, iteration): + print(f'Iteration {iteration}: Current mannN values: {mannN_values[:5]}') # Print first 5 values for debugging + + updated_hydroTables_ls = recalculate_Q_with_mannN_and_update_hydroTables(initial_hydroTables_ls, mannN_values, feature_ids) + + error_ls = [] + for ht_df in updated_hydroTables_ls: + synth_test_df = run_test_cases(fim_version) + + error_mannN = np.sum(error_ls) + + print(f'Iteration {iteration}: Current error: {error_mannN}') + iteration[0] += 1 + + return error_mannN + + + +# ********************************************************* +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Impliment user provided Manning's n values for in-channel vs. overbank flow. " + "Recalculate Manning's eq for discharge" + ) + parser.add_argument('-fim_dir', '--fim-dir', help='FIM output dir', required=True, type=str) + parser.add_argument('-huc', '--huc', help='HUC8 Number', required=True, type=str) + parser.add_argument( + '-mann', + '--mannN_file_aibased', + help="Path to a csv file containing initial Manning's n values by featureid", + required=True, + type=str, + ) + + args = vars(parser.parse_args()) + + fim_dir = args['fim_dir'] + huc = args['huc'] + mannN_file_aibased = args['mannN_file_aibased'] + + # ********************************************************* + # huc = "11010004" + initial_mannN_ai_df = initialize_mannN_ai(fim_dir, huc, mannN_file_aibased) + # Define the initial values for mannN + mannN_init = initial_mannN_ai_df['ManningN'].values + feature_ids = initial_mannN_ai_df['feature_id'].values + + print(f'Updating hydro-tables for each branch for HUC: {huc}\n') + initial_hydroTables_ls = read_hydroTables(fim_dir, huc) + + bounds = [(0.01, 0.5) for _ in range(len(mannN_init))] + + iteration = [0] # Use a list to allow modification inside the callback function + + # Create a partial function for the objective function with fixed arguments + from functools import partial + obj_func_partial = partial(objective_function, + feature_ids=feature_ids, + initial_hydroTables_ls=initial_hydroTables_ls, + iteration=iteration) + + # Run the optimization + result = differential_evolution(obj_func_partial, bounds, maxiter=5000, popsize=2, disp=True) + + optimized_mannN = result.x + + print(f"Optimization results - Optimal mannN values: {optimized_mannN[:5]}, Final error: {result.fun}") + diff --git a/src/src_adjust_ras2fim_rating.py b/src/src_adjust_ras2fim_rating.py index 22a47b33e..95bdce66b 100644 --- a/src/src_adjust_ras2fim_rating.py +++ b/src/src_adjust_ras2fim_rating.py @@ -97,21 +97,20 @@ def create_ras2fim_rating_database(ras_rc_filepath, ras_elev_df, nwm_recurr_file '10_0_year_recurrence_flow_17C': '10_0_year', '25_0_year_recurrence_flow_17C': '25_0_year', '50_0_year_recurrence_flow_17C': '50_0_year', - '100_0_year_recurrence_flow_17C': '100_0_year', }, inplace=True, ) # convert cfs to cms (x 0.028317) nwm_recur_df.loc[ - :, ['2_0_year', '5_0_year', '10_0_year', '25_0_year', '50_0_year', '100_0_year'] + :, ['2_0_year', '5_0_year', '10_0_year', '25_0_year', '50_0_year'] ] *= 0.028317 # merge nwm recurr with ras_rc_df merge_df = ras_rc_df.merge(nwm_recur_df, how='left', on='feature_id') # NWM recurr intervals - recurr_intervals = ["2", "5", "10", "25", "50", "100"] # "2","5","10","25","50","100" + recurr_intervals = ["2", "5", "10", "25", "50"] # "2","5","10","25","50","100" final_df = pd.DataFrame() # create empty dataframe to append flow interval dataframes for interval in recurr_intervals: log_text += '\n\nProcessing: ' + str(interval) + '-year NWM recurr intervals\n' diff --git a/src/src_adjust_usgs_rating_trace.py b/src/src_adjust_usgs_rating_trace.py index ac923a5be..f11d20998 100644 --- a/src/src_adjust_usgs_rating_trace.py +++ b/src/src_adjust_usgs_rating_trace.py @@ -95,20 +95,19 @@ def create_usgs_rating_database(usgs_rc_filepath, usgs_elev_df, nwm_recurr_filep '10_0_year_recurrence_flow_17C': '10_0_year', '25_0_year_recurrence_flow_17C': '25_0_year', '50_0_year_recurrence_flow_17C': '50_0_year', - '100_0_year_recurrence_flow_17C': '100_0_year', } ) # convert cfs to cms (x 0.028317) nwm_recur_df.loc[ - :, ['2_0_year', '5_0_year', '10_0_year', '25_0_year', '50_0_year', '100_0_year'] + :, ['2_0_year', '5_0_year', '10_0_year', '25_0_year', '50_0_year'] ] *= 0.028317 # merge nwm recurr with usgs_rc merge_df = usgs_rc_df.merge(nwm_recur_df, how='left', on='feature_id') # NWM recurr intervals - recurr_intervals = ("2", "5", "10", "25", "50", "100") + recurr_intervals = ("2", "5", "10", "25", "50") final_df = pd.DataFrame() # create empty dataframe to append flow interval dataframes for interval in recurr_intervals: log_text += '\n\nProcessing: ' + str(interval) + '-year NWM recurr intervals\n' diff --git a/tools/bridge_inundation.py b/tools/bridge_inundation.py new file mode 100644 index 000000000..6f9238196 --- /dev/null +++ b/tools/bridge_inundation.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 + +import argparse +import errno +import os +import re +from timeit import default_timer as timer + +import geopandas as gpd +import pandas as pd + + +def bridge_risk_status( + hydrofabric_dir: str, flow_file: str, output_dir: str, limit_hucs: list = [] +) -> gpd.GeoDataFrame: + """ + This function detect which bridge points are affected by a specified flow file. The function requires a flow file (expected to follow + the schema used by 'inundation_mosaic_wrapper') with data organized by 'feature_id' and 'discharge' in cms. The output includes a geopackage + containing bridge points labeled as "threatened", "at risk", or "not at risk" based on forcasted discharge compared to preset discharge + ("max_discharge" or "max_discharge75"). + + Args: + hydrofabric_dir (str): Path to hydrofabric directory where FIM outputs were written by + fim_pipeline. + flow_file (str): Path to flow file to be used for inundation. + feature_ids in flow_file should be present in supplied HUC. + output (str): Path to output geopackage. + limit_hucs (list): Optional. If specified, only the bridges in these HUCs will be processed. + + Example usage: + python /foss_fim/tools/bridge_inundation.py \ + -y /data/previous_fim/fim_4_5_2_0 \ + -f /data/ble_huc_12090301_flows_100yr.csv \ + -o /home/user/Documents/bridges/inundated_bridge_pnts.gpkg \ + -u 12090301 02020005 + """ + + dir_path = hydrofabric_dir + # Check that hydrofabric_dir exists + if not os.path.exists(dir_path): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), dir_path) + + # Get the list of all hucs in the directory + entries = [d for d in os.listdir(dir_path) if re.match(r'^\d{8}$', d)] + hucs = [] + for entry in entries: + # create the full path of the entry + full_path = os.path.join(dir_path, entry) + # check if the netry is a directory + if os.path.isdir(full_path): + hucs.append(entry) + + # Check that flow file exists + if not os.path.exists(flow_file): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), flow_file) + + # Read the flow_file + dtype_dict = {'feature_id': str} + flow_file_data = pd.read_csv(flow_file, dtype=dtype_dict) + + # Initialize an empty list to hold GeoDataFrames + gdfs = [] + + # Filter HUCs if specified + if limit_hucs: + hucs = [h for h in limit_hucs if h in hucs] + + # Iterate through hucs + for huc in hucs: + print(f'Processing HUC: {huc}') + # Construct the file path + gpkg_path = os.path.join(dir_path, huc, 'osm_bridge_centroids.gpkg') + # Check if the file exists + if not os.path.exists(gpkg_path): + print(f"No GeoPackage file found in {gpkg_path}. Skipping...") + continue + # Open the bridge point GeoPackage for each huc + bri_po = gpd.read_file(gpkg_path) + + # Save the origignal crs in a new column + bri_po['original_crs'] = bri_po.crs.to_string() + + # Reproject to EPSG:4326 + bri_po = bri_po.to_crs('epsg:4326') + gdfs.append(bri_po) + + # Concatenate all GeoDataFrame into a single GeoDataFrame + bridge_points = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True)) + + # Find the common feature_id between flow_file and bridge_points + merged_bri = bridge_points.merge(flow_file_data, on='feature_id', how='inner') + + # Assign risk status for each point + def risk_class(row): + if row['discharge'] > row['max_discharge']: + return 'threatened' + elif row['max_discharge75'] <= row['discharge'] < row['max_discharge']: + return 'at_risk' + else: + return 'not_at_risk' + + # Apply risk_class function to each row + merged_bri['risk_status'] = merged_bri.apply(risk_class, axis=1) + merged_bri.drop('discharge', axis=1, inplace=True) + + # Drop not_at_risk status from points with the same geometry + mapping_dic = {'not_at_risk': 0, 'at_risk': 1, 'threatened': 2} + merged_bri['risk'] = merged_bri['risk_status'].map(mapping_dic) + merged_bri.reset_index(drop=True, inplace=True) + merged_data_max = merged_bri.groupby('geometry')['risk'].idxmax() + bridge_out = merged_bri.loc[merged_data_max] + bridge_out.reset_index(drop=True, inplace=True) + bridge_out.drop('risk', axis=1, inplace=True) + bridge_out.to_file(output_dir, driver='GPKG', layer='bridge_risk_status') + + return bridge_out + + +if __name__ == "__main__": + # Parse arguments + parser = argparse.ArgumentParser( + description="Detect which bridge points are affected by a specified flow file." + ) + parser.add_argument( + "-y", + "--hydrofabric_dir", + help="Directory path to FIM hydrofabric by processing unit.", + required=True, + type=str, + ) + parser.add_argument( + "-f", + "--flow_file", + help='Discharges in CMS as CSV file. "feature_id" and "discharge" columns MUST be supplied.', + required=True, + type=str, + ) + parser.add_argument("-o", "--output_dir", help="Path to geopackage output.", required=True, type=str) + parser.add_argument( + "-u", + "--limit_hucs", + help="Optional. If specified, only the bridges in these HUCs will be processed.", + required=False, + type=str, + nargs="+", + ) + + start = timer() + + # Extract to dictionary and run + bridge_risk_status(**vars(parser.parse_args())) + + print(f"Completed in {round((timer() - start)/60, 2)} minutes.") diff --git a/tools/rating_curve_comparison.py b/tools/rating_curve_comparison.py index ef8e2417f..0a9e58acb 100755 --- a/tools/rating_curve_comparison.py +++ b/tools/rating_curve_comparison.py @@ -200,10 +200,10 @@ def generate_rating_curve_metrics(args): rating_curves['order_'] = rating_curves['order_'].astype('int') # NWM recurr intervals - recurr_intervals = ("2", "5", "10", "25", "50", "100") + recurr_intervals = ("2", "5", "10", "25", "50") recurr_dfs = [] for interval in recurr_intervals: - recurr_file = join(nwm_flow_dir, 'nwm21_17C_recurr_{}_0_cms.csv'.format(interval)) + recurr_file = join(nwm_flow_dir, 'nwm3_17C_recurr_{}_0_cms.csv'.format(interval)) df = pd.read_csv(recurr_file, dtype={'feature_id': str}) # Update column names df = df.rename(columns={"discharge": interval}) diff --git a/tools/run_test_case_mannN_optz.py b/tools/run_test_case_mannN_optz.py new file mode 100644 index 000000000..e74b06e81 --- /dev/null +++ b/tools/run_test_case_mannN_optz.py @@ -0,0 +1,540 @@ +#!/usr/bin/env python3 + +import json +import os +import re +import shutil +import sys +import traceback + +import pandas as pd +from inundate_mosaic_wrapper import produce_mosaicked_inundation +from inundation import inundate +from mosaic_inundation import Mosaic_inundation +from tools_shared_functions import compute_contingency_stats_from_rasters +from tools_shared_variables import ( + AHPS_BENCHMARK_CATEGORIES, + INPUTS_DIR, + MAGNITUDE_DICT, + OUTPUTS_DIR, + PREVIOUS_FIM_DIR, + TEST_CASES_DIR, + elev_raster_ndv, +) + +from utils.shared_functions import FIM_Helpers as fh + + +AHPS_BENCHMARK_CATEGORIES = AHPS_BENCHMARK_CATEGORIES +MAGNITUDE_DICT = MAGNITUDE_DICT + +def initial_func(category): + """Function that handles benchmark data. + + Parameters + ---------- + category : str + Category of the benchmark site. Should be one of ['ble', 'ifc', 'nws', 'usgs', 'ras2fim']. + """ + + category = category.lower() + assert category in list(MAGNITUDE_DICT.keys()), f"Category must be one of {list(MAGNITUDE_DICT.keys())}" + validation_data = os.path.join(TEST_CASES_DIR, f'{category}_test_cases', f'validation_data_{category}') + + if category in AHPS_BENCHMARK_CATEGORIES: + is_ahps = True + else: + is_ahps = False + + magnitudes = MAGNITUDE_DICT[category] + + return validation_data, is_ahps, magnitudes + +def huc_data(validation_data): + '''Returns a dict of HUC8, magnitudes, and sites.''' + huc_mags = {} + for huc in os.listdir(validation_data): + if not re.match(r'\d{8}', huc): + continue + huc_mags[huc] = data(huc) + + return huc_mags + +def data(huc, validation_data, is_ahps, magnitudes): + '''Returns a dict of magnitudes and sites for a given huc. Sites will be AHPS lids for + AHPS sites and empty strings for non-AHPS sites. + ''' + huc_dir = os.path.join(validation_data, huc) + if not os.path.isdir(huc_dir): + return {} + if is_ahps == is_ahps: + lids = os.listdir(huc_dir) + + mag_dict = {} + for lid in lids: + lid_dir = os.path.join(huc_dir, lid) + for mag in [file for file in os.listdir(lid_dir) if file in magnitudes()]: + if mag in mag_dict: + mag_dict[mag].append(lid) + else: + mag_dict[mag] = [lid] + + return mag_dict + + else: + mags = list(os.listdir(huc_dir)) + return {mag: [''] for mag in mags} + + +# class Test_Case(Benchmark): +def initial_Test_Case(test_id, version, archive=True): + """Class that handles test cases, specifically running the alpha test. + + Parameters + ---------- + test_id : str + ID of the test case in huc8_category format, e.g. `12090201_ble`. + version : str + Version of FIM to which this test_case belongs. This should correspond to the fim directory + name in either `/data/previous_fim/` or `/outputs/`. + archive : bool + If true, this test case outputs will be placed into the `official_versions` folder + and the FIM model will be read from the `/data/previous_fim` folder. + If false, it will be saved to the `testing_versions/` folder and the FIM model + will be read from the `/outputs/` folder. + + """ + test_id = test_id + huc, benchmark_cat = test_id.split('_') + + validation_data, is_ahps, magnitudes = initial_func(benchmark_cat) + version = version + archive = archive + # FIM run directory path - uses HUC 6 for FIM 1 & 2 + fim_dir = os.path.join(PREVIOUS_FIM_DIR if archive else OUTPUTS_DIR, version, + huc if not re.search('^fim_[1,2]', version, re.IGNORECASE) else huc[:6]) + + # Test case directory path + dir_TC = os.path.join( + TEST_CASES_DIR, + f'{benchmark_cat}_test_cases', + test_id, + 'official_versions' if archive else 'testing_versions', + version, + ) + if not os.path.exists(dir_TC): + os.makedirs(dir_TC) + + # Benchmark data path + benchmark_dir = os.path.join(validation_data, huc) + + # Create list of shapefile paths to use as exclusion areas. + mask_dict = { + 'levees': { + 'path': '/data/inputs/nld_vectors/Levee_protected_areas.gpkg', + 'buffer': None, + 'operation': 'exclude', + }, + 'waterbodies': { + 'path': '/data/inputs/nwm_hydrofabric/nwm_lakes.gpkg', + 'buffer': None, + 'operation': 'exclude', + }, + } + + return dir_TC, mask_dict + + +# @classmethod +def list_all_test_cases(cls, version, archive, benchmark_categories=[]): + """Returns a complete list of all benchmark category test cases as classes. + + Parameters + ---------- + version : str + Version of FIM to which this test_case belongs. This should correspond to the fim directory + name in either `/data/previous_fim/` or `/outputs/`. + archive : bool + If true, this test case outputs will be placed into the `official_versions` folder + and the FIM model will be read from the `/data/previous_fim` folder. + If false, it will be saved to the `testing_versions/` folder and the FIM model + will be read from the `/outputs/` folder. + """ + if not benchmark_categories: + benchmark_categories = list(cls.MAGNITUDE_DICT.keys()) + + test_case_list = [] + for bench_cat in benchmark_categories: + benchmark_class = Benchmark(bench_cat) + benchmark_data = benchmark_class.huc_data() + + for huc in benchmark_data.keys(): + test_case_list.append(cls(f'{huc}_{bench_cat}', version, archive)) + + return test_case_list + +def alpha_test( + calibrated=False, + model='', + mask_type='huc', + inclusion_area='', + inclusion_area_buffer=0, + overwrite=True, + verbose=False, + gms_workers=1, + dir_TC = '', + fim_dir = '', + mask_dict = '' +): + '''Compares a FIM directory with benchmark data from a variety of sources. + + Parameters + ---------- + calibrated : bool + Whether or not this FIM version is calibrated. + model : str + MS or FR extent of the model. This value will be written to the eval_metadata.json. + mask_type : str + Mask type to feed into inundation.py. + inclusion_area : int + Area to include in agreement analysis. + inclusion_area_buffer : int + Buffer distance in meters to include outside of the model's domain. + overwrite : bool + If True, overwites pre-existing test cases within the test_cases directory. + verbose : bool + If True, prints out all pertinent data. + gms_workers : int + Number of worker processes assigned to GMS processing. + ''' + + try: + if not overwrite and os.path.isdir(dir_TC): + print(f"Metrics for {dir_TC} already exist. Use overwrite flag (-o) to overwrite metrics.") + return + + fh.vprint(f"Starting alpha test for {dir_TC}", verbose) + + stats_modes_list = ['total_area'] + + # Create paths to fim_run outputs for use in inundate() + if model != 'GMS': + rem = os.path.join(fim_dir, 'rem_zeroed_masked.tif') + if not os.path.exists(rem): + rem = os.path.join(fim_dir, 'rem_clipped_zeroed_masked.tif') + catchments = os.path.join( + fim_dir, 'gw_catchments_reaches_filtered_addedAttributes.tif' + ) + if not os.path.exists(catchments): + catchments = os.path.join( + fim_dir, 'gw_catchments_reaches_clipped_addedAttributes.tif' + ) + mask_type = mask_type + if mask_type == 'huc': + catchment_poly = '' + else: + catchment_poly = os.path.join( + fim_dir, 'gw_catchments_reaches_filtered_addedAttributes_crosswalked.gpkg' + ) + hydro_table = os.path.join(fim_dir, 'hydroTable.csv') + + # Map necessary inputs for inundate(). + hucs, hucs_layerName = os.path.join(INPUTS_DIR, 'wbd', 'WBD_National.gpkg'), 'WBDHU8' + + if inclusion_area != '': + inclusion_area_name = os.path.split(inclusion_area)[1].split('.')[0] # Get layer name + mask_dict.update( + { + inclusion_area_name: { + 'path': inclusion_area, + 'buffer': int(inclusion_area_buffer), + 'operation': 'include', + } + } + ) + # Append the concatenated inclusion_area_name and buffer. + if inclusion_area_buffer == None: + inclusion_area_buffer = 0 + stats_modes_list.append(inclusion_area_name + '_b' + str(inclusion_area_buffer) + 'm') + + # Delete the directory if it exists + if os.path.exists(dir_TC): + shutil.rmtree(dir_TC) + os.mkdir(dir_TC) + + # Get the magnitudes and lids for the current huc and loop through them + validation_data = self.data(self.huc) + for magnitude in validation_data: + for instance in validation_data[ + magnitude + ]: # instance will be the lid for AHPS sites and '' for other sites + # For each site, inundate the REM and compute aggreement raster with stats + self._inundate_and_compute( + magnitude, instance, model=model, verbose=verbose, gms_workers=gms_workers + ) + + # Clean up 'total_area' outputs from AHPS sites + if self.is_ahps: + self.clean_ahps_outputs(os.path.join(dir_TC, magnitude)) + + # Write out evaluation meta-data + self.write_metadata(calibrated, model) + + except KeyboardInterrupt: + print("Program aborted via keyboard interrupt") + sys.exit(1) + except Exception as ex: + print(ex) + # Temporarily adding stack trace + print(f"trace for {self.test_id} -------------\n", traceback.format_exc()) + sys.exit(1) + +def _inundate_and_compute( + self, magnitude, lid, compute_only=False, model='', verbose=False, gms_workers=1 +): + '''Method for inundating and computing contingency rasters as part of the alpha_test. + Used by both the alpha_test() and composite() methods. + + Parameters + ---------- + magnitude : str + Magnitude of the current benchmark site. + lid : str + lid of the current benchmark site. For non-AHPS sites, this should be an empty string (''). + compute_only : bool + If true, skips inundation and only computes contingency stats. + ''' + # Output files + fh.vprint("Creating output files", verbose) + + test_case_out_dir = os.path.join(self.dir, magnitude) + inundation_prefix = lid + '_' if lid else '' + inundation_path = os.path.join(test_case_out_dir, f'{inundation_prefix}inundation_extent.tif') + predicted_raster_path = inundation_path.replace('.tif', f'_{self.huc}.tif') + agreement_raster = os.path.join( + test_case_out_dir, (f'ahps_{lid}' if lid else '') + 'total_area_agreement.tif' + ) + stats_json = os.path.join(test_case_out_dir, 'stats.json') + stats_csv = os.path.join(test_case_out_dir, 'stats.csv') + + # Create directory + if not os.path.isdir(test_case_out_dir): + os.mkdir(test_case_out_dir) + + # Benchmark raster and flow files + benchmark_rast = ( + f'ahps_{lid}' if lid else self.benchmark_cat + ) + f'_huc_{self.huc}_extent_{magnitude}.tif' + benchmark_rast = os.path.join(self.benchmark_dir, lid, magnitude, benchmark_rast) + benchmark_flows = benchmark_rast.replace(f'_extent_{magnitude}.tif', f'_flows_{magnitude}.csv') + mask_dict_indiv = self.mask_dict.copy() + if self.is_ahps: # add domain shapefile to mask for AHPS sites + domain = os.path.join(self.benchmark_dir, lid, f'{lid}_domain.shp') + mask_dict_indiv.update({lid: {'path': domain, 'buffer': None, 'operation': 'include'}}) + # Check to make sure all relevant files exist + if ( + not os.path.isfile(benchmark_rast) + or not os.path.isfile(benchmark_flows) + or (self.is_ahps and not os.path.isfile(domain)) + ): + return -1 + + # Inundate REM + if not compute_only: # composite alpha tests don't need to be inundated + if model == "GMS": + produce_mosaicked_inundation( + os.path.dirname(self.fim_dir), + self.huc, + benchmark_flows, + inundation_raster=predicted_raster_path, + mask=os.path.join(self.fim_dir, "wbd.gpkg"), + verbose=verbose, + ) + + # FIM v3 and before + else: + fh.vprint("Begin FIM v3 (or earlier) Inundation", verbose) + inundate_result = inundate( + self.rem, + self.catchments, + self.catchment_poly, + self.hydro_table, + benchmark_flows, + self.mask_type, + hucs=self.hucs, + hucs_layerName=self.hucs_layerName, + subset_hucs=self.huc, + num_workers=1, + aggregate=False, + inundation_raster=inundation_path, + inundation_polygon=None, + depths=None, + out_raster_profile=None, + out_vector_profile=None, + quiet=True, + ) + if inundate_result != 0: + return inundate_result + + # Create contingency rasters and stats + fh.vprint("Begin creating contingency rasters and stats", verbose) + if os.path.isfile(predicted_raster_path): + compute_contingency_stats_from_rasters( + predicted_raster_path, + benchmark_rast, + agreement_raster, + stats_csv=stats_csv, + stats_json=stats_json, + mask_dict=mask_dict_indiv, + ) + return + +@classmethod +def run_alpha_test( + cls, + version, + test_id, + magnitude, + calibrated, + model, + archive_results=False, + mask_type='huc', + inclusion_area='', + inclusion_area_buffer=0, + light_run=False, + overwrite=True, + verbose=False, + gms_workers=1, +): + '''Class method for instantiating the test_case class and running alpha_test directly''' + + alpha_class = cls(test_id, version, archive_results) + alpha_class.alpha_test( + calibrated, + model, + mask_type, + inclusion_area, + inclusion_area_buffer, + overwrite, + verbose, + gms_workers, + ) + +def composite(self, version_2, calibrated=False, overwrite=True, verbose=False): + '''Class method for compositing MS and FR inundation and creating an agreement raster with stats + + Parameters + ---------- + version_2 : str + Version with which to composite. + calibrated : bool + Whether or not this FIM version is calibrated. + overwrite : bool + If True, overwites pre-existing test cases within the test_cases directory. + ''' + + if re.match(r'(.*)(_ms|_fr)', self.version): + composite_version_name = re.sub(r'(.*)(_ms|_fr)', r'\1_comp', self.version, count=1) + else: + composite_version_name = re.sub(r'(.*)(_ms|_fr)', r'\1_comp', version_2, count=1) + + fh.vprint(f"Begin composite for version : {composite_version_name}", verbose) + + composite_test_case = Test_Case(self.test_id, composite_version_name, self.archive) + input_test_case_2 = Test_Case(self.test_id, version_2, self.archive) + composite_test_case.stats_modes_list = ['total_area'] + + if not overwrite and os.path.isdir(composite_test_case.dir): + return + + # Delete the directory if it exists + if os.path.exists(composite_test_case.dir): + shutil.rmtree(composite_test_case.dir) + + validation_data = composite_test_case.data(composite_test_case.huc) + for magnitude in validation_data: + for instance in validation_data[ + magnitude + ]: # instance will be the lid for AHPS sites and '' for other sites (ble/ifc/ras2fim) + inundation_prefix = instance + '_' if instance else '' + + input_inundation = os.path.join( + self.dir, magnitude, f'{inundation_prefix}inundation_extent_{self.huc}.tif' + ) + input_inundation_2 = os.path.join( + input_test_case_2.dir, + magnitude, + f'{inundation_prefix}inundation_extent_{input_test_case_2.huc}.tif', + ) + output_inundation = os.path.join( + composite_test_case.dir, magnitude, f'{inundation_prefix}inundation_extent.tif' + ) + + if os.path.isfile(input_inundation) and os.path.isfile(input_inundation_2): + inundation_map_file = pd.DataFrame( + { + 'huc8': [composite_test_case.huc] * 2, + 'branchID': [None] * 2, + 'inundation_rasters': [input_inundation, input_inundation_2], + 'depths_rasters': [None] * 2, + 'inundation_polygons': [None] * 2, + } + ) + os.makedirs(os.path.dirname(output_inundation), exist_ok=True) + + fh.vprint(f"Begin mosaic inundation for version : {composite_version_name}", verbose) + Mosaic_inundation( + inundation_map_file, + mosaic_attribute='inundation_rasters', + mosaic_output=output_inundation, + mask=None, + unit_attribute_name='huc8', + nodata=elev_raster_ndv, + workers=1, + remove_inputs=False, + subset=None, + verbose=False, + ) + composite_test_case._inundate_and_compute(magnitude, instance, compute_only=True) + + elif os.path.isfile(input_inundation) or os.path.isfile(input_inundation_2): + # If only one model (MS or FR) has inundation, simply copy over all files as the composite + single_test_case = self if os.path.isfile(input_inundation) else input_test_case_2 + shutil.copytree( + single_test_case.dir, + re.sub(r'(.*)(_ms|_fr)', r'\1_comp', single_test_case.dir, count=1), + ) + composite_test_case.write_metadata(calibrated, 'COMP') + return + + # Clean up 'total_area' outputs from AHPS sites + if composite_test_case.is_ahps: + composite_test_case.clean_ahps_outputs(os.path.join(composite_test_case.dir, magnitude)) + + composite_test_case.write_metadata(calibrated, 'COMP') + +def write_metadata(self, calibrated, model): + '''Writes metadata files for a test_case directory.''' + with open(os.path.join(self.dir, 'eval_metadata.json'), 'w') as meta: + eval_meta = {'calibrated': calibrated, 'model': model} + meta.write(json.dumps(eval_meta, indent=2)) + +def clean_ahps_outputs(self, magnitude_directory): + '''Cleans up `total_area` files from an input AHPS magnitude directory.''' + output_file_list = [os.path.join(magnitude_directory, of) for of in os.listdir(magnitude_directory)] + for output_file in output_file_list: + if "total_area" in output_file: + os.remove(output_file) + +def get_current_agreements(self): + '''Returns a list of all agreement rasters currently existing for the test_case.''' + agreement_list = [] + for mag in os.listdir(self.dir): + mag_dir = os.path.join(self.dir, mag) + if not os.path.isdir(mag_dir): + continue + + for f in os.listdir(mag_dir): + if 'agreement.tif' in f: + agreement_list.append(os.path.join(mag_dir, f)) + return agreement_list From 1b802e73934de84e0fcfa540ab778d3febb0404c Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Thu, 18 Jul 2024 14:57:06 -0700 Subject: [PATCH 19/24] Update clip_vectors_to_wbd.py --- data/wbd/clip_vectors_to_wbd.py | 1 + 1 file changed, 1 insertion(+) diff --git a/data/wbd/clip_vectors_to_wbd.py b/data/wbd/clip_vectors_to_wbd.py index 8cb8603e8..506fc73cc 100755 --- a/data/wbd/clip_vectors_to_wbd.py +++ b/data/wbd/clip_vectors_to_wbd.py @@ -387,3 +387,4 @@ def subset_vector_layers( args = vars(parser.parse_args()) subset_vector_layers(**args) + From dba051a3f74492ad1f85358b0c224d2f73ffad2e Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Thu, 18 Jul 2024 14:58:37 -0700 Subject: [PATCH 20/24] Update generate_pre_clip_fim_huc8.py From 05c6d19b3c0f018bc03ca47bfd54742edab2ca61 Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Thu, 18 Jul 2024 14:59:36 -0700 Subject: [PATCH 21/24] Update CHANGELOG.md From 3c95096d0e7b25d2d9309cd20cb4d1c81e496e51 Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Thu, 18 Jul 2024 15:00:28 -0700 Subject: [PATCH 22/24] Update add_crosswalk.py --- src/add_crosswalk.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/add_crosswalk.py b/src/add_crosswalk.py index 62789c9b3..23bf187ca 100755 --- a/src/add_crosswalk.py +++ b/src/add_crosswalk.py @@ -1,3 +1,4 @@ + #!/usr/bin/env python3 import argparse From 76053e90e15e6110e70400f90a42619550d2647d Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Thu, 18 Jul 2024 15:01:04 -0700 Subject: [PATCH 23/24] Update bash_variables.env --- src/bash_variables.env | 1 - 1 file changed, 1 deletion(-) diff --git a/src/bash_variables.env b/src/bash_variables.env index 17e75af10..0b2c4d3af 100644 --- a/src/bash_variables.env +++ b/src/bash_variables.env @@ -8,7 +8,6 @@ export ALASKA_CRS=EPSG:3338 # NOTE: $inputsDir is defined in Dockerfile export pre_clip_huc_dir=${inputsDir}/pre_clip_huc8/20240702 - export input_DEM=${inputsDir}/3dep_dems/10m_5070/fim_seamless_3dep_dem_10m_5070.vrt export input_DEM_Alaska=${inputsDir}/3dep_dems/10m_South_Alaska/23_11_07/FIM_3dep_dem_South_Alask_10m.vrt export input_DEM_domain=${inputsDir}/3dep_dems/10m_5070/HUC6_dem_domain.gpkg From ef5138682092217482c9b1e4ecf21cf50818f80b Mon Sep 17 00:00:00 2001 From: HeidiSafa-NOAA Date: Thu, 18 Jul 2024 15:02:37 -0700 Subject: [PATCH 24/24] Update bridge_inundation.py --- tools/bridge_inundation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/bridge_inundation.py b/tools/bridge_inundation.py index 6f9238196..dec1896c2 100644 --- a/tools/bridge_inundation.py +++ b/tools/bridge_inundation.py @@ -151,3 +151,4 @@ def risk_class(row): bridge_risk_status(**vars(parser.parse_args())) print(f"Completed in {round((timer() - start)/60, 2)} minutes.") +