diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 48df89672..401a91b75 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,37 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. +## v4.0.15.0 - 2022-12-20 - [PR #758](https://github.com/NOAA-OWP/inundation-mapping/pull/758) + +This merge addresses feedback received from field users regarding CatFIM. Users wanted a Stage-Based version of CatFIM, they wanted maps created for multiple intervals between flood categories, and they wanted documentation as to why many sites are absent from the Stage-Based CatFIM service. This merge seeks to address this feedback. CatFIM will continue to evolve with more feedback over time. + +## Changes +- `/src/gms/usgs_gage_crosswalk.py`: Removed filtering of extra attributes when writing table +- `/src/gms/usgs_gage_unit_setup.py`: Removed filter of gages where `rating curve == yes`. The filtering happens later on now. +- `/tools/eval_plots.py`: Added a post-processing step to produce CSVs of spatial data +- `/tools/generate_categorical_fim.py`: + - New arguments to support more advanced multiprocessing, support production of Stage-Based CatFIM, specific output directory pathing, upstream and downstream distance, controls on how high past "major" magnitude to go when producing interval maps for Stage-Based, the ability to run a single AHPS site. +- `/tools/generate_categorical_fim_flows.py`: + - Allows for flows to be retrieved for only one site (useful for testing) + - More logging + - Filtering stream segments according to stream order +- `/tools/generate_categorical_fim_mapping.py`: + - Support for Stage-Based CatFIM production + - Enhanced multiprocessing + - Improved post-processing +- `/tools/pixel_counter.py`: fixed a bug where Nonetypes were being returned +- `/tools/rating_curve_get_usgs_rating_curves.py`: + - Removed filtering when producing `usgs_gages.gpkg`, but adding attribute as to whether or not it meets acceptance criteria, as defined in `gms_tools/tools_shared_variables.py`. + - Creating a lookup list to filter out unacceptable gages before they're written to `usgs_rating_curves.csv` + - The `usgs_gages.gpkg` now includes two fields indicating whether or not gages pass acceptance criteria (defined in `tools_shared_variables.py`. The fields are `acceptable_codes` and `acceptable_alt_error` +- `/tools/tools_shared_functions.py`: + - Added `get_env_paths()` function to retrieve environmental variable information used by CatFIM and rating curves scripts + - `Added `filter_nwm_segments_by_stream_order()` function that uses WRDS to filter out NWM feature_ids from a list if their stream order is different than a desired stream order. +- `/tools/tools_shared_variables.py`: Added the acceptance criteria and URLS for gages as non-constant variables. These can be modified and tracked through version changes. These variables are imported by the CatFIM and USGS rating curve and gage generation scripts. +- `/tools/test_case_by_hydroid.py`: reformatting code, recommend adding more comments/docstrings in future commit + +

+ ## v4.0.14.2 - 2022-12-22 - [PR #772](https://github.com/NOAA-OWP/inundation-mapping/pull/772) Added `usgs_elev_table.csv` to hydrovis whitelist files. Also updated the name to include the word "hydrovis" in them (anticipating more s3 whitelist files). @@ -49,7 +80,7 @@ Fixes inundation of nodata areas of REM. - `tools/inundation.py`: Assigns depth a value of `0` if REM is less than `0` -

+ ## v4.0.13.1 - 2022-12-09 - [PR #743](https://github.com/NOAA-OWP/inundation-mapping/pull/743) diff --git a/src/usgs_gage_crosswalk.py b/src/usgs_gage_crosswalk.py index 57b7c1cb8..a12494754 100755 --- a/src/usgs_gage_crosswalk.py +++ b/src/usgs_gage_crosswalk.py @@ -41,7 +41,7 @@ def run_crosswalk(self, input_catchment_filename, input_flows_filename, dem_file the dem-derived flows 3) sample both dems at the snapped points 4) write the crosswalked points to usgs_elev_table.csv ''' - + if self.gages.empty: print(f'There are no gages for branch {branch_id}') os._exit(0) @@ -96,10 +96,10 @@ def sample_dem(self, dem_filename, column_name): def write(self, output_table_filename): '''Write to csv file''' + # Prep and write out file elev_table = self.gages.copy() - # Elev table cleanup elev_table.loc[elev_table['location_id'] == elev_table['nws_lid'], 'location_id'] = None # set location_id to None where there isn't a gage - elev_table = elev_table[['location_id', 'nws_lid', 'feature_id', 'HydroID', 'levpa_id', 'dem_elevation', 'dem_adj_elevation', 'order_', 'LakeID', 'HUC8', 'snap_distance']] + elev_table = elev_table[elev_table['location_id'].notna()] if not elev_table.empty: elev_table.to_csv(output_table_filename, index=False) diff --git a/src/usgs_gage_unit_setup.py b/src/usgs_gage_unit_setup.py index 93c4095f6..476777da0 100755 --- a/src/usgs_gage_unit_setup.py +++ b/src/usgs_gage_unit_setup.py @@ -23,7 +23,7 @@ def load_gages(self): # Filter USGS gages to huc usgs_gages = gpd.read_file(self.usgs_gage_filename) - self.gages = usgs_gages[(usgs_gages.HUC8 == self.huc8) & (usgs_gages.curve == 'yes')] + self.gages = usgs_gages[(usgs_gages.HUC8 == self.huc8)] # Get AHPS sites within the HUC and add them to the USGS dataset if self.ahps_filename: diff --git a/tools/eval_plots.py b/tools/eval_plots.py index 560c6c05d..d0b041f99 100644 --- a/tools/eval_plots.py +++ b/tools/eval_plots.py @@ -8,6 +8,7 @@ import matplotlib.pyplot as plt import seaborn as sns import re +import glob import os import sys sys.path.append('/foss_fim/src') @@ -669,6 +670,19 @@ def eval_plots(metrics_csv, workspace, versions = [], stats = ['CSI','FAR','TPR' wbd_with_metrics.to_file(Path(workspace) / 'fim_performance_polys.shp') else: print('BLE/IFC/RAS2FIM FR datasets not analyzed, no spatial data created.\nTo produce spatial data analyze a FR version') + +def convert_shapes_to_csv(workspace): + + # Convert any geopackage in the root level of output_mapping_dir to CSV and rename. + shape_list = glob.glob(os.path.join(workspace, '*.shp')) + for shape in shape_list: + gdf = gpd.read_file(shape) + parent_directory = os.path.split(shape)[0] + file_name = shape.replace('.shp', '.csv') + csv_output_path = os.path.join(parent_directory, file_name) + gdf.to_csv(csv_output_path) + + ####################################################################### if __name__ == '__main__': # Parse arguments @@ -697,3 +711,7 @@ def eval_plots(metrics_csv, workspace, versions = [], stats = ['CSI','FAR','TPR' print('The following AHPS sites are considered "BAD_SITES": ' + ', '.join(BAD_SITES)) print('The following query is used to filter AHPS: ' + DISCARD_AHPS_QUERY) eval_plots(metrics_csv = m, workspace = w, versions = v, stats = s, spatial = sp, fim_1_ms = f, site_barplots = i) + + # Convert output shapefiles to CSV + print("Converting to CSVs...") + convert_shapes_to_csv(w) diff --git a/tools/generate_categorical_fim.py b/tools/generate_categorical_fim.py index bcd6fc14d..4974fcb09 100755 --- a/tools/generate_categorical_fim.py +++ b/tools/generate_categorical_fim.py @@ -1,15 +1,148 @@ #!/usr/bin/env python3 import os -import subprocess import argparse +import csv +import traceback +import sys import time from pathlib import Path import geopandas as gpd import pandas as pd +import rasterio +from rasterio.warp import calculate_default_transform, reproject, Resampling +import glob +from generate_categorical_fim_flows import generate_catfim_flows +from generate_categorical_fim_mapping import manage_catfim_mapping, post_process_cat_fim_for_viz +from tools_shared_functions import get_thresholds, get_nwm_segs, get_datum, ngvd_to_navd_ft, filter_nwm_segments_by_stream_order +from concurrent.futures import ProcessPoolExecutor, as_completed, wait +import numpy as np +from utils.shared_variables import VIZ_PROJECTION +from tools_shared_variables import (acceptable_coord_acc_code_list, + acceptable_coord_method_code_list, + acceptable_alt_acc_thresh, + acceptable_alt_meth_code_list, + acceptable_site_type_list) -def update_mapping_status(output_mapping_dir, output_flows_dir): +def process_generate_categorical_fim(fim_run_dir, job_number_huc, job_number_inundate, + stage_based, output_folder, overwrite, search, + lid_to_run, job_number_intervals, past_major_interval_cap): + + # Check job numbers and raise error if necessary + total_cpus_requested = job_number_huc * job_number_inundate * job_number_intervals + total_cpus_available = os.cpu_count() - 1 + if total_cpus_requested > total_cpus_available: + raise ValueError('The HUC job number, {}, multiplied by the inundate job number, {}, '\ + 'exceeds your machine\'s available CPU count minus one. '\ + 'Please lower the job_number_huc or job_number_inundate '\ + 'values accordingly.'.format(job_number_huc, job_number_inundate) ) + + # Define default arguments. Modify these if necessary + fim_version = os.path.split(fim_run_dir)[1] + + # Append option configuration (flow_based or stage_based) to output folder name. + if stage_based: + file_handle_appendage, catfim_method = "_stage_based", "STAGE-BASED" + else: + file_handle_appendage, catfim_method = "_flow_based", "FLOW-BASED" + + # Define output directories + output_catfim_dir_parent = output_folder + file_handle_appendage + output_flows_dir = os.path.join(output_catfim_dir_parent, 'flows') + output_mapping_dir = os.path.join(output_catfim_dir_parent, 'mapping') + attributes_dir = os.path.join(output_catfim_dir_parent, 'attributes') + + # Create output directories + if not os.path.exists(output_catfim_dir_parent): os.mkdir(output_catfim_dir_parent) + if not os.path.exists(output_flows_dir): os.mkdir(output_flows_dir) + if not os.path.exists(output_mapping_dir): os.mkdir(output_mapping_dir) + if not os.path.exists(attributes_dir): os.mkdir(attributes_dir) + + # Define upstream and downstream search in miles + nwm_us_search, nwm_ds_search = search, search + fim_dir = fim_version + + # Set up logging + log_dir = os.path.join(output_catfim_dir_parent, 'logs') + log_file = os.path.join(log_dir, 'errors.log') + + # STAGE-BASED + if stage_based: + # Generate Stage-Based CatFIM mapping + nws_sites_layer = generate_stage_based_categorical_fim(output_mapping_dir, fim_version, fim_run_dir, + nwm_us_search, nwm_ds_search, job_number_inundate, + lid_to_run, attributes_dir, job_number_intervals, + past_major_interval_cap, job_number_huc) + + job_number_tif = job_number_inundate * job_number_intervals + print("Post-processing TIFs...") + post_process_cat_fim_for_viz(job_number_huc, job_number_tif, output_mapping_dir, attributes_dir, log_file=log_file, fim_version=fim_version) + + # Updating mapping status + print('Updating mapping status...') + update_mapping_status(str(output_mapping_dir), str(output_flows_dir), nws_sites_layer, stage_based) + + # FLOW-BASED + else: + fim_dir = "" + print('Creating flow files using the ' + catfim_method + ' technique...') + start = time.time() + nws_sites_layer = generate_catfim_flows(output_flows_dir, nwm_us_search, nwm_ds_search, stage_based, fim_dir, lid_to_run, attributes_dir) + end = time.time() + elapsed_time = (end-start)/60 + print(f'Finished creating flow files in {elapsed_time} minutes') + # Generate CatFIM mapping + print('Begin mapping') + start = time.time() + manage_catfim_mapping(fim_run_dir, output_flows_dir, output_mapping_dir, attributes_dir, job_number_huc, job_number_inundate, overwrite, depthtif=False) + end = time.time() + elapsed_time = (end-start)/60 + print(f'Finished mapping in {elapsed_time} minutes') + + # Updating mapping status + print('Updating mapping status') + update_mapping_status(str(output_mapping_dir), str(output_flows_dir), nws_sites_layer, stage_based) + + # Create CSV versions of the final geopackages. + print('Creating CSVs. This may take several minutes.') + reformatted_catfim_method = catfim_method.lower().replace('-', '_') + create_csvs(output_mapping_dir, reformatted_catfim_method) + + +def create_csvs(output_mapping_dir, reformatted_catfim_method): + ''' + Produces CSV versions of desired geopackage in the output_mapping_dir. + + Parameters + ---------- + output_mapping_dir : STR + Path to the output directory of all inundation maps. + reformatted_catfim_method : STR + Text to append to CSV to communicate the type of CatFIM. + + Returns + ------- + None. + + ''' + + # Convert any geopackage in the root level of output_mapping_dir to CSV and rename. + gpkg_list = glob.glob(os.path.join(output_mapping_dir, '*.gpkg')) + for gpkg in gpkg_list: + print(f"creating CSV for {gpkg}") + gdf = gpd.read_file(gpkg) + parent_directory = os.path.split(gpkg)[0] + if 'catfim_library' in gpkg: + file_name = reformatted_catfim_method + '_catfim.csv' + if 'nws_lid_sites' in gpkg: + file_name = reformatted_catfim_method + '_catfim_sites.csv' + + csv_output_path = os.path.join(parent_directory, file_name) + gdf.to_csv(csv_output_path) + + +def update_mapping_status(output_mapping_dir, output_flows_dir, nws_sites_layer, stage_based): ''' Updates the status for nws_lids from the flows subdirectory. Status is updated for sites where the inundation.py routine was not able to @@ -38,9 +171,8 @@ def update_mapping_status(output_mapping_dir, output_flows_dir): mapping_df['did_it_map'] = 'no' mapping_df['map_status'] = ' and all categories failed to map' - # Import shapefile output from flows creation - shapefile = Path(output_flows_dir)/'nws_lid_flows_sites.shp' - flows_df = gpd.read_file(shapefile) + # Import geopackage output from flows creation + flows_df = gpd.read_file(nws_sites_layer) # Join failed sites to flows df flows_df = flows_df.merge(mapping_df, how = 'left', on = 'nws_lid') @@ -48,63 +180,535 @@ def update_mapping_status(output_mapping_dir, output_flows_dir): # Switch mapped column to no for failed sites and update status flows_df.loc[flows_df['did_it_map'] == 'no', 'mapped'] = 'no' flows_df.loc[flows_df['did_it_map']=='no','status'] = flows_df['status'] + flows_df['map_status'] - - # Perform pass for HUCs where mapping was skipped due to missing data #TODO check with Brian - flows_hucs = [i.stem for i in Path(output_flows_dir).iterdir() if i.is_dir()] - mapping_hucs = [i.stem for i in Path(output_mapping_dir).iterdir() if i.is_dir()] - missing_mapping_hucs = list(set(flows_hucs) - set(mapping_hucs)) - # Update status for nws_lid in missing hucs and change mapped attribute to 'no' - flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'status'] = flows_df['status'] + ' and all categories failed to map because missing HUC information' - flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'mapped'] = 'no' +# # Perform pass for HUCs where mapping was skipped due to missing data #TODO check with Brian +# if stage_based: +# missing_mapping_hucs = +# else: +# flows_hucs = [i.stem for i in Path(output_flows_dir).iterdir() if i.is_dir()] +# mapping_hucs = [i.stem for i in Path(output_mapping_dir).iterdir() if i.is_dir()] +# missing_mapping_hucs = list(set(flows_hucs) - set(mapping_hucs)) +# +# # Update status for nws_lid in missing hucs and change mapped attribute to 'no' +# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'status'] = flows_df['status'] + ' and all categories failed to map because missing HUC information' +# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'mapped'] = 'no' # Clean up GeoDataFrame and rename columns for consistency flows_df = flows_df.drop(columns = ['did_it_map','map_status']) flows_df = flows_df.rename(columns = {'nws_lid':'ahps_lid'}) # Write out to file - nws_lid_path = Path(output_mapping_dir) / 'nws_lid_sites.shp' - flows_df.to_file(nws_lid_path) + flows_df.to_file(nws_sites_layer) + + +def produce_inundation_map_with_stage_and_feature_ids(rem_path, catchments_path, hydroid_list, hand_stage, lid_directory, category, huc, lid, branch): + + # Open rem_path and catchment_path using rasterio. + rem_src = rasterio.open(rem_path) + catchments_src = rasterio.open(catchments_path) + rem_array = rem_src.read(1) + catchments_array = catchments_src.read(1) + + # Use numpy.where operation to reclassify rem_path on the condition that the pixel values are <= to hand_stage and the catchments + # value is in the hydroid_list. + reclass_rem_array = np.where((rem_array<=hand_stage) & (rem_array != rem_src.nodata), 1, 0).astype('uint8') + hydroid_mask = np.isin(catchments_array, hydroid_list) + target_catchments_array = np.where((hydroid_mask == True) & (catchments_array != catchments_src.nodata), 1, 0).astype('uint8') + masked_reclass_rem_array = np.where((reclass_rem_array == 1) & (target_catchments_array == 1), 1, 0).astype('uint8') + + # Save resulting array to new tif with appropriate name. brdc1_record_extent_18060005.tif + is_all_zero = np.all((masked_reclass_rem_array == 0)) + + if not is_all_zero: + output_tif = os.path.join(lid_directory, lid + '_' + category + '_extent_' + huc + '_' + branch + '.tif') + with rasterio.Env(): + profile = rem_src.profile + profile.update(dtype=rasterio.uint8) + profile.update(nodata=10) + + with rasterio.open(output_tif, 'w', **profile) as dst: + dst.write(masked_reclass_rem_array, 1) + + +def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, threshold_url, flood_categories, all_lists, past_major_interval_cap, number_of_jobs, number_of_interval_jobs, attributes_dir): + missing_huc_files = [] + all_messages = [] + stage_based_att_dict = {} + + print(f'Iterating through {huc}...') + # Make output directory for huc. + huc_directory = os.path.join(workspace, huc) + if not os.path.exists(huc_directory): + os.mkdir(huc_directory) + + # Define paths to necessary HAND and HAND-related files. + usgs_elev_table = os.path.join(fim_dir, huc, 'usgs_elev_table.csv') + branch_dir = os.path.join(fim_dir, huc, 'branches') + + # Loop through each lid in nws_lids list + nws_lids = huc_dictionary[huc] + for lid in nws_lids: + lid = lid.lower() # Convert lid to lower case + # -- If necessary files exist, continue -- # + if not os.path.exists(usgs_elev_table): + all_messages.append([f'{lid}:usgs_elev_table missing, likely unacceptable gage datum error--more details to come in future release']) + continue + if not os.path.exists(branch_dir): + all_messages.append([f'{lid}:branch directory missing']) + continue + usgs_elev_df = pd.read_csv(usgs_elev_table) + # Make lid_directory. + + lid_directory = os.path.join(huc_directory, lid) + if not os.path.exists(lid_directory): + os.mkdir(lid_directory) + # Get stages and flows for each threshold from the WRDS API. Priority given to USGS calculated flows. + stages, flows = get_thresholds(threshold_url = threshold_url, select_by = 'nws_lid', selector = lid, threshold = 'all') + + if stages == None: + all_messages.append([f'{lid}:error getting thresholds from WRDS API']) + continue + # Check if stages are supplied, if not write message and exit. + if all(stages.get(category, None)==None for category in flood_categories): + all_messages.append([f'{lid}:missing threshold stages']) + continue + + # Drop columns that offend acceptance criteria + usgs_elev_df['acceptable_codes'] = (usgs_elev_df['usgs_data_coord_accuracy_code'].isin(acceptable_coord_acc_code_list) + & usgs_elev_df['usgs_data_coord_method_code'].isin(acceptable_coord_method_code_list) + & usgs_elev_df['usgs_data_alt_method_code'].isin(acceptable_alt_meth_code_list) + & usgs_elev_df['usgs_data_site_type'].isin(acceptable_site_type_list)) + + usgs_elev_df = usgs_elev_df.astype({'usgs_data_alt_accuracy_code': float}) + usgs_elev_df['acceptable_alt_error'] = np.where(usgs_elev_df['usgs_data_alt_accuracy_code'] <= acceptable_alt_acc_thresh, True, False) + + acceptable_usgs_elev_df = usgs_elev_df[(usgs_elev_df['acceptable_codes'] == True) & (usgs_elev_df['acceptable_alt_error'] == True)] + + # Get the dem_adj_elevation value from usgs_elev_table.csv. Prioritize the value that is not from branch 0. + try: + matching_rows = acceptable_usgs_elev_df.loc[acceptable_usgs_elev_df['nws_lid'] == lid.upper(), 'dem_adj_elevation'] + if len(matching_rows) == 2: # It means there are two level paths, use the one that is not 0 + lid_usgs_elev = acceptable_usgs_elev_df.loc[(acceptable_usgs_elev_df['nws_lid'] == lid.upper()) & ('levpa_id' != 0), 'dem_adj_elevation'].values[0] + else: + lid_usgs_elev = acceptable_usgs_elev_df.loc[acceptable_usgs_elev_df['nws_lid'] == lid.upper(), 'dem_adj_elevation'].values[0] + except IndexError: # Occurs when LID is missing from table + all_messages.append([f'{lid}:likely unacceptable gage datum error or accuracy code(s); please see acceptance criteria']) + continue + # Initialize nested dict for lid attributes + stage_based_att_dict.update({lid:{}}) + + # Find lid metadata from master list of metadata dictionaries. + metadata = next((item for item in all_lists if item['identifiers']['nws_lid'] == lid.upper()), False) + lid_altitude = metadata['usgs_data']['altitude'] + + ### --- Do Datum Offset --- ### + #determine source of interpolated threshold flows, this will be the rating curve that will be used. + rating_curve_source = flows.get('source') + if rating_curve_source is None: + all_messages.append([f'{lid}:No source for rating curve']) + continue + # Get the datum and adjust to NAVD if necessary. + nws_datum_info, usgs_datum_info = get_datum(metadata) + if rating_curve_source == 'USGS Rating Depot': + datum_data = usgs_datum_info + elif rating_curve_source == 'NRLDB': + datum_data = nws_datum_info + + # If datum not supplied, skip to new site + datum = datum_data.get('datum', None) + if datum is None: + all_messages.append([f'{lid}:datum info unavailable']) + continue + # _________________________________________________________________________________________________________# + # SPECIAL CASE: Workaround for "bmbp1" where the only valid datum is from NRLDB (USGS datum is null). + # Modifying rating curve source will influence the rating curve and datum retrieved for benchmark determinations. + if lid == 'bmbp1': + rating_curve_source = 'NRLDB' + # ___________________________________________________________________# + + # SPECIAL CASE: Custom workaround these sites have faulty crs from WRDS. CRS needed for NGVD29 conversion to NAVD88 + # USGS info indicates NAD83 for site: bgwn7, fatw3, mnvn4, nhpp1, pinn4, rgln4, rssk1, sign4, smfn7, stkn4, wlln7 + # Assumed to be NAD83 (no info from USGS or NWS data): dlrt2, eagi1, eppt2, jffw3, ldot2, rgdt2 + if lid in ['bgwn7', 'dlrt2','eagi1','eppt2','fatw3','jffw3','ldot2','mnvn4','nhpp1','pinn4','rgdt2','rgln4','rssk1','sign4','smfn7','stkn4','wlln7' ]: + datum_data.update(crs = 'NAD83') + # ___________________________________________________________________# + + # SPECIAL CASE: Workaround for bmbp1; CRS supplied by NRLDB is mis-assigned (NAD29) and is actually NAD27. + # This was verified by converting USGS coordinates (in NAD83) for bmbp1 to NAD27 and it matches NRLDB coordinates. + if lid == 'bmbp1': + datum_data.update(crs = 'NAD27') + # ___________________________________________________________________# + + # SPECIAL CASE: Custom workaround these sites have poorly defined vcs from WRDS. VCS needed to ensure datum reported in NAVD88. + # If NGVD29 it is converted to NAVD88. + # bgwn7, eagi1 vertical datum unknown, assume navd88 + # fatw3 USGS data indicates vcs is NAVD88 (USGS and NWS info agree on datum value). + # wlln7 USGS data indicates vcs is NGVD29 (USGS and NWS info agree on datum value). + if lid in ['bgwn7','eagi1','fatw3']: + datum_data.update(vcs = 'NAVD88') + elif lid == 'wlln7': + datum_data.update(vcs = 'NGVD29') + # _________________________________________________________________________________________________________# + + # Adjust datum to NAVD88 if needed + # Default datum_adj_ft to 0.0 + datum_adj_ft = 0.0 + crs = datum_data.get('crs') + if datum_data.get('vcs') in ['NGVD29', 'NGVD 1929', 'NGVD,1929', 'NGVD OF 1929', 'NGVD']: + # Get the datum adjustment to convert NGVD to NAVD. Sites not in contiguous US are previously removed otherwise the region needs changed. + try: + datum_adj_ft = ngvd_to_navd_ft(datum_info = datum_data, region = 'contiguous') + except Exception as e: + e = str(e) + if crs == None: + all_messages.append([f'{lid}:NOAA VDatum adjustment error, CRS is missing']) + if 'HTTPSConnectionPool' in e: + time.sleep(10) # Maybe the API needs a break, so wait 10 seconds + try: + datum_adj_ft = ngvd_to_navd_ft(datum_info = datum_data, region = 'contiguous') + except Exception: + all_messages.append([f'{lid}:NOAA VDatum adjustment error, possible API issue']) + if 'Invalid projection' in e: + all_messages.append([f'{lid}:NOAA VDatum adjustment error, invalid projection: crs={crs}']) + continue + + ### -- Concluded Datum Offset --- ### + # Get mainstem segments of LID by intersecting LID segments with known mainstem segments. + unfiltered_segments = list(set(get_nwm_segs(metadata))) + + # Filter segments to be of like stream order. + desired_order = metadata['nwm_feature_data']['stream_order'] + segments = filter_nwm_segments_by_stream_order(unfiltered_segments, desired_order) + action_stage = stages['action'] + minor_stage = stages['minor'] + moderate_stage = stages['moderate'] + major_stage = stages['major'] + stage_list = [i for i in [action_stage, minor_stage, moderate_stage, major_stage] if i is not None] + # Create a list of stages, incrementing by 1 ft. + if stage_list == []: + all_messages.append([f'{lid}:no stage values available']) + continue + interval_list = np.arange(min(stage_list), max(stage_list) + past_major_interval_cap, 1.0) # Go an extra 10 ft beyond the max stage, arbitrary + # For each flood category + for category in flood_categories: + + # Pull stage value and confirm it's valid, then process + stage = stages[category] + + if stage != None and datum_adj_ft != None and lid_altitude != None: + + # Call function to execute mapping of the TIFs. + messages, hand_stage, datum_adj_wse, datum_adj_wse_m = produce_stage_based_catfim_tifs(stage, datum_adj_ft, branch_dir, lid_usgs_elev, lid_altitude, fim_dir, segments, lid, huc, lid_directory, category, number_of_jobs) + all_messages += messages + + # Extra metadata for alternative CatFIM technique. TODO Revisit because branches complicate things + stage_based_att_dict[lid].update({category: {'datum_adj_wse_ft': datum_adj_wse, + 'datum_adj_wse_m': datum_adj_wse_m, + 'hand_stage': hand_stage, + 'datum_adj_ft': datum_adj_ft, + 'lid_alt_ft': lid_altitude, + 'lid_alt_m': lid_altitude*0.3048}}) + # If missing HUC file data, write message + if huc in missing_huc_files: + all_messages.append([f'{lid}:missing some HUC data']) + + # Now that the "official" category maps are made, produce the incremental maps. + for interval_stage in interval_list: + try: + with ProcessPoolExecutor(max_workers=number_of_interval_jobs) as executor: + # Determine category the stage value belongs with. + if action_stage <= interval_stage < minor_stage: + category = 'action_' + str(interval_stage).replace('.', 'p') + 'ft' + if minor_stage <= interval_stage < moderate_stage: + category = 'minor_' + str(interval_stage).replace('.', 'p') + 'ft' + if moderate_stage <= interval_stage < major_stage: + category = 'moderate_' + str(interval_stage).replace('.', 'p') + 'ft' + if interval_stage >= major_stage: + category = 'major_' + str(interval_stage).replace('.', 'p') + 'ft' + executor.submit(produce_stage_based_catfim_tifs, stage, datum_adj_ft, branch_dir, lid_usgs_elev, lid_altitude, fim_dir, segments, lid, huc, lid_directory, category, number_of_jobs) + except TypeError: # sometimes the thresholds are Nonetypes + pass + + # Create a csv with same information as geopackage but with each threshold as new record. + # Probably a less verbose way. + csv_df = pd.DataFrame() + for threshold in flood_categories: + try: + line_df = pd.DataFrame({'nws_lid': [lid], + 'name':metadata['nws_data']['name'], + 'WFO': metadata['nws_data']['wfo'], + 'rfc':metadata['nws_data']['rfc'], + 'huc':[huc], + 'state':metadata['nws_data']['state'], + 'county':metadata['nws_data']['county'], + 'magnitude': threshold, + 'q':flows[threshold], + 'q_uni':flows['units'], + 'q_src':flows['source'], + 'stage':stages[threshold], + 'stage_uni':stages['units'], + 's_src':stages['source'], + 'wrds_time':stages['wrds_timestamp'], + 'nrldb_time':metadata['nrldb_timestamp'], + 'nwis_time':metadata['nwis_timestamp'], + 'lat':[float(metadata['nws_preferred']['latitude'])], + 'lon':[float(metadata['nws_preferred']['longitude'])], + 'dtm_adj_ft': stage_based_att_dict[lid][threshold]['datum_adj_ft'], + 'dadj_w_ft': stage_based_att_dict[lid][threshold]['datum_adj_wse_ft'], + 'dadj_w_m': stage_based_att_dict[lid][threshold]['datum_adj_wse_m'], + 'lid_alt_ft': stage_based_att_dict[lid][threshold]['lid_alt_ft'], + 'lid_alt_m': stage_based_att_dict[lid][threshold]['lid_alt_m']}) + csv_df = csv_df.append(line_df) + + except Exception as e: + print("Exception") + print(e) + + # Round flow and stage columns to 2 decimal places. + csv_df = csv_df.round({'q':2,'stage':2}) + # If a site folder exists (ie a flow file was written) save files containing site attributes. + output_dir = os.path.join(workspace, huc, lid) + if os.path.exists(output_dir): + # Export DataFrame to csv containing attributes + csv_df.to_csv(os.path.join(attributes_dir, f'{lid}_attributes.csv'), index = False) + else: + all_messages.append([f'{lid}:missing all calculated flows']) + + # If it made it to this point (i.e. no continues), there were no major preventers of mapping + all_messages.append([f'{lid}:OK']) + # Write all_messages by HUC to be scraped later. + messages_dir = os.path.join(workspace, 'messages') + if not os.path.exists(messages_dir): + os.mkdir(messages_dir) + huc_messages_csv = os.path.join(messages_dir, huc + '_messages.csv') + with open(huc_messages_csv, 'w') as output_csv: + writer = csv.writer(output_csv) + writer.writerows(all_messages) + + +def generate_stage_based_categorical_fim(workspace, fim_version, fim_dir, nwm_us_search, nwm_ds_search, number_of_jobs, lid_to_run, attributes_dir, number_of_interval_jobs, past_major_interval_cap, job_number_huc): + + flood_categories = ['action', 'minor', 'moderate', 'major', 'record'] + + huc_dictionary, out_gdf, metadata_url, threshold_url, all_lists = generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search, stage_based=True, fim_dir=fim_dir, lid_to_run=lid_to_run) + + with ProcessPoolExecutor(max_workers=job_number_huc) as executor: + for huc in huc_dictionary: + executor.submit(iterate_through_huc_stage_based, workspace, huc, + fim_dir, huc_dictionary, threshold_url, flood_categories, + all_lists, past_major_interval_cap, number_of_jobs, + number_of_interval_jobs, attributes_dir) + + print('Wrapping up Stage-Based CatFIM...') + csv_files = os.listdir(attributes_dir) + all_csv_df = pd.DataFrame() + refined_csv_files_list = [] + for csv_file in csv_files: + full_csv_path = os.path.join(attributes_dir, csv_file) + # HUC has to be read in as string to preserve leading zeros. + try: + temp_df = pd.read_csv(full_csv_path, dtype={'huc':str}) + all_csv_df = all_csv_df.append(temp_df, ignore_index = True) + refined_csv_files_list.append(csv_file) + except Exception: # Happens if a file is empty (i.e. no mapping) + pass + # Write to file + all_csv_df.to_csv(os.path.join(workspace, 'nws_lid_attributes.csv'), index = False) + + # This section populates a geopackage of all potential sites and details + # whether it was mapped or not (mapped field) and if not, why (status field). + + # Preprocess the out_gdf GeoDataFrame. Reproject and reformat fields. + viz_out_gdf = out_gdf.to_crs(VIZ_PROJECTION) + viz_out_gdf.rename(columns = {'identifiers_nwm_feature_id': 'nwm_seg', 'identifiers_nws_lid':'nws_lid', 'identifiers_usgs_site_code':'usgs_gage'}, inplace = True) + viz_out_gdf['nws_lid'] = viz_out_gdf['nws_lid'].str.lower() + + # Using list of csv_files, populate DataFrame of all nws_lids that had + # a flow file produced and denote with "mapped" column. + nws_lids = [] + for csv_file in csv_files: + nws_lids.append(csv_file.split('_attributes')[0]) + lids_df = pd.DataFrame(nws_lids, columns = ['nws_lid']) + lids_df['mapped'] = 'yes' + + # Identify what lids were mapped by merging with lids_df. Populate + # 'mapped' column with 'No' if sites did not map. + viz_out_gdf = viz_out_gdf.merge(lids_df, how = 'left', on = 'nws_lid') + viz_out_gdf['mapped'] = viz_out_gdf['mapped'].fillna('no') + + # Create list from all messages in messages dir. + messages_dir = os.path.join(workspace, 'messages') + all_messages = [] + all_message_csvs = os.listdir(messages_dir) + for message_csv in all_message_csvs: + full_message_csv_path = os.path.join(messages_dir, message_csv) + with open(full_message_csv_path, newline='') as message_file: + reader = csv.reader(message_file) + for row in reader: + all_messages.append(row) + + # Write messages to DataFrame, split into columns, aggregate messages. + messages_df = pd.DataFrame(all_messages, columns = ['message']) + + messages_df = messages_df['message'].str.split(':', n = 1, expand = True).rename(columns={0:'nws_lid', 1:'status'}) + status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index() + + # Join messages to populate status field to candidate sites. Assign + # status for null fields. + viz_out_gdf = viz_out_gdf.merge(status_df, how = 'left', on = 'nws_lid') + +# viz_out_gdf['status'] = viz_out_gdf['status'].fillna('OK') + + # Filter out columns and write out to file + nws_sites_layer = os.path.join(workspace, 'nws_lid_sites.gpkg') + + # Add acceptance criteria to viz_out_gdf before writing + viz_out_gdf['acceptable_coord_acc_code_list'] = str(acceptable_coord_acc_code_list) + viz_out_gdf['acceptable_coord_method_code_list'] = str(acceptable_coord_method_code_list) + viz_out_gdf['acceptable_alt_acc_thresh'] = float(acceptable_alt_acc_thresh) + viz_out_gdf['acceptable_alt_meth_code_list'] = str(acceptable_alt_meth_code_list) + viz_out_gdf['acceptable_site_type_list'] = str(acceptable_site_type_list) + + viz_out_gdf.to_file(nws_sites_layer, driver='GPKG') + + return nws_sites_layer + + +def produce_stage_based_catfim_tifs(stage, datum_adj_ft, branch_dir, lid_usgs_elev, lid_altitude, fim_dir, segments, lid, huc, lid_directory, category, number_of_jobs): + messages = [] + + # Determine datum-offset water surface elevation (from above). + datum_adj_wse = stage + datum_adj_ft + lid_altitude + datum_adj_wse_m = datum_adj_wse*0.3048 # Convert ft to m + + # Subtract HAND gage elevation from HAND WSE to get HAND stage. + hand_stage = datum_adj_wse_m - lid_usgs_elev + + # Produce extent tif hand_stage. Multiprocess across branches. + branches = os.listdir(branch_dir) + with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: + for branch in branches: + # Define paths to necessary files to produce inundation grids. + full_branch_path = os.path.join(branch_dir, branch) + rem_path = os.path.join(fim_dir, huc, full_branch_path, 'rem_zeroed_masked_' + branch + '.tif') + catchments_path = os.path.join(fim_dir, huc, full_branch_path, 'gw_catchments_reaches_filtered_addedAttributes_' + branch + '.tif') + hydrotable_path = os.path.join(fim_dir, huc, full_branch_path, 'hydroTable_' + branch + '.csv') + + if not os.path.exists(rem_path): + messages.append([f"{lid}:rem doesn't exist"]) + continue + if not os.path.exists(catchments_path): + messages.append([f"{lid}:catchments files don't exist"]) + continue + if not os.path.exists(hydrotable_path): + messages.append([f"{lid}:hydrotable doesn't exist"]) + continue + + # Use hydroTable to determine hydroid_list from site_ms_segments. + hydrotable_df = pd.read_csv(hydrotable_path) + hydroid_list = [] + + # Determine hydroids at which to perform inundation + for feature_id in segments: + try: + subset_hydrotable_df = hydrotable_df[hydrotable_df['feature_id'] == int(feature_id)] + hydroid_list += list(subset_hydrotable_df.HydroID.unique()) + except IndexError: + pass + + if len(hydroid_list) == 0: +# messages.append(f"{lid}:no matching hydroids") # Some branches don't have matching hydroids + continue + + # If no segments, write message and exit out + if not segments: + messages.append([f'{lid}:missing nwm segments']) + continue + + # Create inundation maps with branch and stage data + try: + print("Running inundation for " + huc + " and branch " + branch) + executor.submit(produce_inundation_map_with_stage_and_feature_ids, rem_path, catchments_path, hydroid_list, hand_stage, lid_directory, category, huc, lid, branch) + except Exception: + messages.append([f'{lid}:inundation failed at {category}']) + + # -- MOSAIC -- # + # Merge all rasters in lid_directory that have the same magnitude/category. + path_list = [] + lid_dir_list = os.listdir(lid_directory) + print("Merging " + category) + for f in lid_dir_list: + if category in f: + path_list.append(os.path.join(lid_directory, f)) + path_list.sort() # To force branch 0 first in list, sort + + if len(path_list) > 0: + zero_branch_grid = path_list[0] + zero_branch_src = rasterio.open(zero_branch_grid) + zero_branch_array = zero_branch_src.read(1) + summed_array = zero_branch_array # Initialize it as the branch zero array + + # Loop through remaining items in list and sum them with summed_array + for remaining_raster in path_list[1:]: + remaining_raster_src = rasterio.open(remaining_raster) + remaining_raster_array_original = remaining_raster_src.read(1) + + # Reproject non-branch-zero grids so I can sum them with the branch zero grid + remaining_raster_array = np.empty(zero_branch_array.shape, dtype=np.int8) + reproject(remaining_raster_array_original, + destination = remaining_raster_array, + src_transform = remaining_raster_src.transform, + src_crs = remaining_raster_src.crs, + src_nodata = remaining_raster_src.nodata, + dst_transform = zero_branch_src.transform, + dst_crs = zero_branch_src.crs, + dst_nodata = -1, + dst_resolution = zero_branch_src.res, + resampling = Resampling.nearest) + # Sum rasters + summed_array = summed_array + remaining_raster_array + + del zero_branch_array # Clean up + + # Define path to merged file, in same format as expected by post_process_cat_fim_for_viz function + output_tif = os.path.join(lid_directory, lid + '_' + category + '_extent.tif') + profile = zero_branch_src.profile + summed_array = summed_array.astype('uint8') + with rasterio.open(output_tif, 'w', **profile) as dst: + dst.write(summed_array, 1) + del summed_array + + return messages, hand_stage, datum_adj_wse, datum_adj_wse_m if __name__ == '__main__': + # Parse arguments parser = argparse.ArgumentParser(description = 'Run Categorical FIM') - parser.add_argument('-f','--fim_version',help='Name of directory containing outputs of fim_run.sh',required=True) - parser.add_argument('-j','--number_of_jobs',help='Number of processes to use. Default is 1.',required=False, default="1",type=int) + parser.add_argument('-f', '--fim_run_dir', help='Path to directory containing HAND outputs, e.g. /data/previous_fim/fim_4_0_9_2', + required=True) + parser.add_argument('-jh','--job_number_huc',help='Number of processes to use for HUC scale operations.'\ + ' HUC and inundation job numbers should multiply to no more than one less than the CPU count of the'\ + ' machine. CatFIM sites generally only have 2-3 branches overlapping a site, so this number can be kept low (2-4)', required=False, default=1, type=int) + parser.add_argument('-jn','--job_number_inundate', help='Number of processes to use for inundating'\ + ' HUC and inundation job numbers should multiply to no more than one less than the CPU count'\ + ' of the machine.', required=False, default=1, type=int) + parser.add_argument('-a', '--stage_based', help = 'Run stage-based CatFIM instead of flow-based?'\ + ' NOTE: flow-based CatFIM is the default.', required=False, default=False, action='store_true') + parser.add_argument('-t', '--output_folder', help = 'Target: Where the output folder will be', + required = False, default = '/data/catfim/') + parser.add_argument('-o','--overwrite', help='Overwrite files', required=False, action="store_true") + parser.add_argument('-s','--search', help='Upstream and downstream search in miles. How far up and downstream do you want to go?', + required=False, default='5') + parser.add_argument('-l','--lid_to_run', help='NWS LID, lowercase, to produce CatFIM for. Currently only accepts one. Default is all sites', + required=False, default='all') + parser.add_argument('-ji','--job_number_intervals', help='Number of processes to use for inundating multiple intervals in stage-based'\ + ' inundation and interval job numbers should multiply to no more than one less than the CPU count'\ + ' of the machine.', required=False, default=1, type=int) + parser.add_argument('-mc','--past_major_interval_cap', help='Stage-Based Only. How many feet past major do you want to go for the interval FIMs?'\ + ' of the machine.', required=False, default=5.0, type=float) + args = vars(parser.parse_args()) + process_generate_categorical_fim(**args) - # Get arguments - fim_version = args['fim_version'] - number_of_jobs = args['number_of_jobs'] - - # Define default arguments. Modify these if necessary - fim_run_dir = Path(f'{fim_version}') - fim_version_folder = os.path.basename(fim_version) - output_flows_dir = Path(f'/data/catfim/{fim_version_folder}/flows') - output_mapping_dir = Path(f'/data/catfim/{fim_version_folder}/mapping') - nwm_us_search = '5' - nwm_ds_search = '5' - write_depth_tiff = False - - ## Run CatFIM scripts in sequence - # Generate CatFIM flow files - print('Creating flow files') - start = time.time() - subprocess.call(['python3','/foss_fim/tools/generate_categorical_fim_flows.py', '-w' , str(output_flows_dir), '-u', nwm_us_search, '-d', nwm_ds_search]) - end = time.time() - elapsed_time = (end-start)/60 - print(f'Finished creating flow files in {elapsed_time} minutes') - - # Generate CatFIM mapping - print('Begin mapping') - start = time.time() - subprocess.call(['python3','/foss_fim/tools/generate_categorical_fim_mapping.py', '-r' , str(fim_run_dir), '-s', str(output_flows_dir), '-o', str(output_mapping_dir), '-j', str(number_of_jobs)]) - end = time.time() - elapsed_time = (end-start)/60 - print(f'Finished mapping in {elapsed_time} minutes') - - # Updating mapping status - print('Updating mapping status') - update_mapping_status(str(output_mapping_dir), str(output_flows_dir)) diff --git a/tools/generate_categorical_fim_flows.py b/tools/generate_categorical_fim_flows.py index d2f5f0501..667b3ec9d 100755 --- a/tools/generate_categorical_fim_flows.py +++ b/tools/generate_categorical_fim_flows.py @@ -1,8 +1,10 @@ #!/usr/bin/env python3 from pathlib import Path import pandas as pd +import numpy as np +import geopandas as gpd import time -from tools_shared_functions import aggregate_wbd_hucs, mainstem_nwm_segs, get_thresholds, flow_data, get_metadata, get_nwm_segs +from tools_shared_functions import aggregate_wbd_hucs, mainstem_nwm_segs, get_thresholds, flow_data, get_metadata, get_nwm_segs, get_datum, ngvd_to_navd_ft, filter_nwm_segments_by_stream_order import argparse from dotenv import load_dotenv import os @@ -10,8 +12,6 @@ sys.path.append('/foss_fim/src') from utils.shared_variables import VIZ_PROJECTION -EVALUATED_SITES_CSV = r'/data/inputs/ahps_sites/evaluated_ahps_sites.csv' - def get_env_paths(): load_dotenv() @@ -21,7 +21,7 @@ def get_env_paths(): return API_BASE_URL, WBD_LAYER -def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search): +def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search, stage_based, fim_dir, lid_to_run, attributes_dir=""): ''' This will create static flow files for all nws_lids and save to the workspace directory with the following format: @@ -33,7 +33,6 @@ def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search): This will use the WRDS API to get the nwm segments as well as the flow values for each threshold at each nws_lid and then create the necessary flow file to use for inundation mapping. - Parameters ---------- workspace : STR @@ -48,10 +47,10 @@ def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search): Returns ------- None. - ''' - + all_start = time.time() + API_BASE_URL, WBD_LAYER = get_env_paths() #Define workspace and wbd_path as a pathlib Path. Convert search distances to integer. workspace = Path(workspace) nwm_us_search = int(nwm_us_search) @@ -63,44 +62,49 @@ def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search): #Create workspace workspace.mkdir(parents=True,exist_ok = True) - print('Retrieving metadata...') + print(f'Retrieving metadata for site(s): {lid_to_run}...') #Get metadata for 'CONUS' - conus_list, conus_dataframe = get_metadata(metadata_url, select_by = 'nws_lid', selector = ['all'], must_include = 'nws_data.rfc_forecast_point', upstream_trace_distance = nwm_us_search, downstream_trace_distance = nwm_ds_search ) - - #Get metadata for Islands - islands_list, islands_dataframe = get_metadata(metadata_url, select_by = 'state', selector = ['HI','PR'] , must_include = None, upstream_trace_distance = nwm_us_search, downstream_trace_distance = nwm_ds_search) - - #Append the dataframes and lists - all_lists = conus_list + islands_list - + print(metadata_url) + if lid_to_run != 'all': + all_lists, conus_dataframe = get_metadata(metadata_url, select_by = 'nws_lid', selector = [lid_to_run], must_include = 'nws_data.rfc_forecast_point', upstream_trace_distance = nwm_us_search, downstream_trace_distance = nwm_ds_search) + else: + # Get CONUS metadata + conus_list, conus_dataframe = get_metadata(metadata_url, select_by = 'nws_lid', selector = ['all'], must_include = 'nws_data.rfc_forecast_point', upstream_trace_distance = nwm_us_search, downstream_trace_distance = nwm_ds_search) + # Get metadata for Islands + islands_list, islands_dataframe = get_metadata(metadata_url, select_by = 'state', selector = ['HI','PR'] , must_include = None, upstream_trace_distance = nwm_us_search, downstream_trace_distance = nwm_ds_search) + # Append the dataframes and lists + all_lists = conus_list + islands_list + print(len(all_lists)) print('Determining HUC using WBD layer...') - #Assign HUCs to all sites using a spatial join of the FIM 3 HUC layer. - #Get a dictionary of hucs (key) and sites (values) as well as a GeoDataFrame - #of all sites used later in script. - huc_dictionary, out_gdf = aggregate_wbd_hucs(metadata_list = all_lists, wbd_huc8_path = WBD_LAYER) + # Assign HUCs to all sites using a spatial join of the FIM 3 HUC layer. + # Get a dictionary of hucs (key) and sites (values) as well as a GeoDataFrame + # of all sites used later in script. + huc_dictionary, out_gdf = aggregate_wbd_hucs(metadata_list = all_lists, wbd_huc8_path = WBD_LAYER, retain_attributes=True) + # Drop list fields if invalid + out_gdf = out_gdf.drop(['downstream_nwm_features'], axis=1, errors='ignore') + out_gdf = out_gdf.drop(['upstream_nwm_features'], axis=1, errors='ignore') + out_gdf = out_gdf.astype({'metadata_sources': str}) - #Get all possible mainstem segments - print('Getting list of mainstem segments') - #Import list of evaluated sites - print(EVALUATED_SITES_CSV) - print(os.path.exists(EVALUATED_SITES_CSV)) - list_of_sites = pd.read_csv(EVALUATED_SITES_CSV)['Total_List'].to_list() - #The entire routine to get mainstems is hardcoded in this function. - ms_segs = mainstem_nwm_segs(metadata_url, list_of_sites) - - #Loop through each huc unit, first define message variable and flood categories. + # Loop through each huc unit, first define message variable and flood categories. all_messages = [] flood_categories = ['action', 'minor', 'moderate', 'major', 'record'] + + if stage_based: + return huc_dictionary, out_gdf, metadata_url, threshold_url, all_lists + for huc in huc_dictionary: print(f'Iterating through {huc}') #Get list of nws_lids nws_lids = huc_dictionary[huc] #Loop through each lid in list to create flow file - for lid in nws_lids: + for lid in nws_lids: #Convert lid to lower case lid = lid.lower() #Get stages and flows for each threshold from the WRDS API. Priority given to USGS calculated flows. stages, flows = get_thresholds(threshold_url = threshold_url, select_by = 'nws_lid', selector = lid, threshold = 'all') + if stages == None or flows == None: + print("Likely WRDS error") + continue #Check if stages are supplied, if not write message and exit. if all(stages.get(category, None)==None for category in flood_categories): message = f'{lid}:missing threshold stages' @@ -111,14 +115,15 @@ def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search): message = f'{lid}:missing calculated flows' all_messages.append(message) continue - #find lid metadata from master list of metadata dictionaries (line 66). metadata = next((item for item in all_lists if item['identifiers']['nws_lid'] == lid.upper()), False) - + #Get mainstem segments of LID by intersecting LID segments with known mainstem segments. - segments = get_nwm_segs(metadata) - site_ms_segs = set(segments).intersection(ms_segs) - segments = list(site_ms_segs) + unfiltered_segments = list(set(get_nwm_segs(metadata))) + + desired_order = metadata['nwm_feature_data']['stream_order'] + # Filter segments to be of like stream order. + segments = filter_nwm_segments_by_stream_order(unfiltered_segments, desired_order) #if no segments, write message and exit out if not segments: print(f'{lid} no segments') @@ -143,10 +148,11 @@ def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search): else: message = f'{lid}:{category} is missing calculated flow' all_messages.append(message) - #Get various attributes of the site. - lat = float(metadata['usgs_preferred']['latitude']) - lon = float(metadata['usgs_preferred']['longitude']) +# lat = float(metadata['usgs_preferred']['latitude']) +# lon = float(metadata['usgs_preferred']['longitude']) + lat = float(metadata['nws_preferred']['latitude']) + lon = float(metadata['nws_preferred']['longitude']) wfo = metadata['nws_data']['wfo'] rfc = metadata['nws_data']['rfc'] state = metadata['nws_data']['state'] @@ -159,7 +165,7 @@ def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search): wrds_timestamp = stages['wrds_timestamp'] nrldb_timestamp = metadata['nrldb_timestamp'] nwis_timestamp = metadata['nwis_timestamp'] - + #Create a csv with same information as shapefile but with each threshold as new record. csv_df = pd.DataFrame() for threshold in flood_categories: @@ -172,41 +178,44 @@ def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search): output_dir = workspace / huc / lid if output_dir.exists(): #Export DataFrame to csv containing attributes - csv_df.to_csv(output_dir / f'{lid}_attributes.csv', index = False) + csv_df.to_csv(os.path.join(attributes_dir, f'{lid}_attributes.csv'), index = False) + message = f'{lid}:flows available' + all_messages.append(message) else: message = f'{lid}:missing all calculated flows' all_messages.append(message) - - print('wrapping up...') + print('Wrapping up...') #Recursively find all *_attributes csv files and append - csv_files = list(workspace.rglob('*_attributes.csv')) + csv_files = os.listdir(attributes_dir) all_csv_df = pd.DataFrame() for csv in csv_files: - #Huc has to be read in as string to preserve leading zeros. - temp_df = pd.read_csv(csv, dtype={'huc':str}) + full_csv_path = os.path.join(attributes_dir, csv) + # Huc has to be read in as string to preserve leading zeros. + temp_df = pd.read_csv(full_csv_path, dtype={'huc':str}) all_csv_df = all_csv_df.append(temp_df, ignore_index = True) - #Write to file - all_csv_df.to_csv(workspace / 'nws_lid_attributes.csv', index = False) + # Write to file + all_csv_df.to_csv(os.path.join(workspace, 'nws_lid_attributes.csv'), index = False) - #This section populates a shapefile of all potential sites and details - #whether it was mapped or not (mapped field) and if not, why (status field). + # This section populates a shapefile of all potential sites and details + # whether it was mapped or not (mapped field) and if not, why (status field). - #Preprocess the out_gdf GeoDataFrame. Reproject and reformat fields. + # Preprocess the out_gdf GeoDataFrame. Reproject and reformat fields. viz_out_gdf = out_gdf.to_crs(VIZ_PROJECTION) viz_out_gdf.rename(columns = {'identifiers_nwm_feature_id': 'nwm_seg', 'identifiers_nws_lid':'nws_lid', 'identifiers_usgs_site_code':'usgs_gage'}, inplace = True) viz_out_gdf['nws_lid'] = viz_out_gdf['nws_lid'].str.lower() - #Using list of csv_files, populate DataFrame of all nws_lids that had - #a flow file produced and denote with "mapped" column. - nws_lids = [file.stem.split('_attributes')[0] for file in csv_files] + # Using list of csv_files, populate DataFrame of all nws_lids that had + # a flow file produced and denote with "mapped" column. + nws_lids = [] + for csv_file in csv_files: + nws_lids.append(csv_file.split('_attributes')[0]) lids_df = pd.DataFrame(nws_lids, columns = ['nws_lid']) lids_df['mapped'] = 'yes' - #Identify what lids were mapped by merging with lids_df. Populate - #'mapped' column with 'No' if sites did not map. + # Identify what lids were mapped by merging with lids_df. Populate + # 'mapped' column with 'No' if sites did not map. viz_out_gdf = viz_out_gdf.merge(lids_df, how = 'left', on = 'nws_lid') viz_out_gdf['mapped'] = viz_out_gdf['mapped'].fillna('no') - #Write messages to DataFrame, split into columns, aggregate messages. messages_df = pd.DataFrame(all_messages, columns = ['message']) messages_df = messages_df['message'].str.split(':', n = 1, expand = True).rename(columns={0:'nws_lid', 1:'status'}) @@ -218,13 +227,17 @@ def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search): viz_out_gdf['status'] = viz_out_gdf['status'].fillna('all calculated flows available') #Filter out columns and write out to file - viz_out_gdf = viz_out_gdf.filter(['nws_lid','usgs_gage','nwm_seg','HUC8','mapped','status','geometry']) - viz_out_gdf.to_file(workspace /'nws_lid_flows_sites.shp') +# viz_out_gdf = viz_out_gdf.filter(['nws_lid','usgs_gage','nwm_seg','HUC8','mapped','status','geometry']) + nws_lid_layer = os.path.join(workspace, 'nws_lid_sites.gpkg').replace('flows', 'mapping') + + viz_out_gdf.to_file(nws_lid_layer, driver='GPKG') #time operation all_end = time.time() print(f'total time is {round((all_end - all_start)/60),1} minutes') + return nws_lid_layer + if __name__ == '__main__': #Parse arguments @@ -232,8 +245,9 @@ def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search): parser.add_argument('-w', '--workspace', help = 'Workspace where all data will be stored.', required = True) parser.add_argument('-u', '--nwm_us_search', help = 'Walk upstream on NWM network this many miles', required = True) parser.add_argument('-d', '--nwm_ds_search', help = 'Walk downstream on NWM network this many miles', required = True) + parser.add_argument('-a', '--stage_based', help = 'Run stage-based CatFIM instead of flow-based? NOTE: flow-based CatFIM is the default.', required=False, default=False, action='store_true') + parser.add_argument('-f', '--fim-dir', help='Path to FIM outputs directory. Only use this option if you are running in alt-catfim mode.',required=False,default="") args = vars(parser.parse_args()) #Run get_env_paths and static_flow_lids - API_BASE_URL, WBD_LAYER = get_env_paths() generate_catfim_flows(**args) diff --git a/tools/generate_categorical_fim_mapping.py b/tools/generate_categorical_fim_mapping.py index 0827d3f08..743552664 100755 --- a/tools/generate_categorical_fim_mapping.py +++ b/tools/generate_categorical_fim_mapping.py @@ -2,33 +2,24 @@ import sys import os -from multiprocessing import Pool +from concurrent.futures import ProcessPoolExecutor, as_completed, wait import argparse import traceback import rasterio import geopandas as gpd import pandas as pd -import shutil from rasterio.features import shapes from shapely.geometry.polygon import Polygon from shapely.geometry.multipolygon import MultiPolygon -from inundation import inundate sys.path.append('/foss_fim/src') from utils.shared_variables import PREP_PROJECTION,VIZ_PROJECTION from utils.shared_functions import getDriver +from gms_tools.mosaic_inundation import Mosaic_inundation +from gms_tools.inundate_gms import Inundate_gms -INPUTS_DIR = r'/data/inputs' -magnitude_list = ['action', 'minor', 'moderate','major', 'record'] -# Define necessary variables for inundation() -hucs, hucs_layerName = os.path.join(INPUTS_DIR, 'wbd', 'WBD_National.gpkg'), 'WBDHU8' -mask_type, catchment_poly = 'huc', '' - - -def generate_categorical_fim(fim_run_dir, source_flow_dir, output_cat_fim_dir, number_of_jobs, depthtif, log_file): - - no_data_list = [] - procs_list = [] +def generate_categorical_fim(fim_run_dir, source_flow_dir, output_catfim_dir, + job_number_huc, job_number_inundate, depthtif, log_file): source_flow_dir_list = os.listdir(source_flow_dir) output_flow_dir_list = os.listdir(fim_run_dir) @@ -43,36 +34,18 @@ def generate_categorical_fim(fim_run_dir, source_flow_dir, output_cat_fim_dir, n # Loop through matching huc directories in the source_flow directory matching_hucs = list(set(output_flow_dir_list) & set(source_flow_dir_list)) - for huc in matching_hucs: - - if "." not in huc: + + with ProcessPoolExecutor(max_workers=job_number_huc) as executor: + for huc in matching_hucs: + if "." in huc: + continue # Get list of AHPS site directories ahps_site_dir = os.path.join(source_flow_dir, huc) ahps_site_dir_list = os.listdir(ahps_site_dir) - # Map paths to HAND files needed for inundation() - fim_run_huc_dir = os.path.join(fim_run_dir, huc) - rem = os.path.join(fim_run_huc_dir, 'rem_zeroed_masked.tif') - catchments = os.path.join(fim_run_huc_dir, 'gw_catchments_reaches_filtered_addedAttributes.tif') - hydroTable = os.path.join(fim_run_huc_dir, 'hydroTable.csv') - - exit_flag = False # Default to False. - - # Check if necessary data exist; set exit_flag to True if they don't exist - for f in [rem, catchments, hydroTable]: - if not os.path.exists(f): - no_data_list.append(f) - exit_flag = True - - # Log missing data - if exit_flag == True: - f = open(log_file, 'a+') - f.write(f"Missing data for: {fim_run_huc_dir}\n") - f.close() - - # Map path to huc directory inside out output_cat_fim_dir - cat_fim_huc_dir = os.path.join(output_cat_fim_dir, huc) + # Map path to huc directory inside out output_catfim_dir + cat_fim_huc_dir = os.path.join(output_catfim_dir, huc) if not os.path.exists(cat_fim_huc_dir): os.mkdir(cat_fim_huc_dir) @@ -89,49 +62,46 @@ def generate_categorical_fim(fim_run_dir, source_flow_dir, output_cat_fim_dir, n # Loop through thresholds/magnitudes and define inundation output files paths for magnitude in thresholds_dir_list: - - if "." not in magnitude: - - magnitude_flows_csv = os.path.join(ahps_site_parent, magnitude, 'ahps_' + ahps_site + '_huc_' + huc + '_flows_' + magnitude + '.csv') - - if os.path.exists(magnitude_flows_csv): - - output_extent_grid = os.path.join(cat_fim_huc_ahps_dir, ahps_site + '_' + magnitude + '_extent.tif') - - if depthtif: - output_depth_grid = os.path.join(cat_fim_huc_ahps_dir, ahps_site + '_' + magnitude + '_depth.tif') - else: - output_depth_grid = None - - # Append necessary variables to list for multiprocessing. - procs_list.append([rem, catchments, magnitude_flows_csv, huc, hydroTable, output_extent_grid, output_depth_grid, ahps_site, magnitude, log_file]) - - # Initiate multiprocessing - print(f"Running inundation for {len(procs_list)} sites using {number_of_jobs} jobs") - with Pool(processes=number_of_jobs) as pool: - pool.map(run_inundation, procs_list) - - -def run_inundation(args): - - rem = args[0] - catchments = args[1] - magnitude_flows_csv = args[2] - huc = args[3] - hydroTable = args[4] - output_extent_grid = args[5] - output_depth_grid = args[6] - ahps_site = args[7] - magnitude = args[8] - log_file = args[9] - + if "." in magnitude: + continue + magnitude_flows_csv = os.path.join(ahps_site_parent, magnitude, 'ahps_' + ahps_site + '_huc_' + huc + '_flows_' + magnitude + '.csv') + if os.path.exists(magnitude_flows_csv): + output_extent_grid = os.path.join(cat_fim_huc_ahps_dir, ahps_site + '_' + magnitude + '_extent.tif') + try: + executor.submit(run_inundation, magnitude_flows_csv, huc, output_extent_grid, ahps_site, magnitude, log_file, fim_run_dir, job_number_inundate) + except Exception: + traceback.print_exc() + sys.exit(1) + + +def run_inundation(magnitude_flows_csv, huc, output_extent_grid, ahps_site, magnitude, log_file, fim_run_dir, job_number_inundate): + + huc_dir = os.path.join(fim_run_dir, huc) try: - inundate(rem,catchments,catchment_poly,hydroTable,magnitude_flows_csv,mask_type,hucs=hucs,hucs_layerName=hucs_layerName, - subset_hucs=huc,num_workers=1,aggregate=False,inundation_raster=output_extent_grid,inundation_polygon=None, - depths=output_depth_grid,out_raster_profile=None,out_vector_profile=None,quiet=True - ) - - except: + print("Running Inundate_gms for " + huc) + map_file = Inundate_gms( hydrofabric_dir = fim_run_dir, + forecast = magnitude_flows_csv, + num_workers = job_number_inundate, + hucs = huc, + inundation_raster = output_extent_grid, + inundation_polygon = None, + depths_raster = None, + verbose = False, + log_file = None, + output_fileNames = None ) + print("Mosaicking for " + huc) + Mosaic_inundation( map_file, + mosaic_attribute = 'inundation_rasters', + mosaic_output = output_extent_grid, + mask = os.path.join(huc_dir,'wbd.gpkg'), + unit_attribute_name = 'huc8', + nodata = -9999, + workers = 1, + remove_inputs = False, + subset = None, + verbose = False ) + + except Exception: # Log errors and their tracebacks f = open(log_file, 'a+') f.write(f"{output_extent_grid} - inundation error: {traceback.format_exc()}\n") @@ -147,143 +117,172 @@ def run_inundation(args): f.write('FAILURE_huc_{}:{}:{} map failed to create\n'.format(huc,ahps_site,magnitude)) -def post_process_cat_fim_for_viz(number_of_jobs, output_cat_fim_dir, nws_lid_attributes_filename, log_file): - +def post_process_huc_level(job_number_tif, ahps_dir_list, huc_dir, attributes_dir, gpkg_dir, fim_version, huc): + + tifs_to_reformat_list = [] + + # Loop through ahps sites + for ahps_lid in ahps_dir_list: + ahps_lid_dir = os.path.join(huc_dir, ahps_lid) + + # Append desired filenames to list. + tif_list = os.listdir(ahps_lid_dir) + for tif in tif_list: + if 'extent.tif' in tif: + tifs_to_reformat_list.append(os.path.join(ahps_lid_dir, tif)) + + # Stage-Based CatFIM uses attributes from individual CSVs instead of the master CSV. + nws_lid_attributes_filename = os.path.join(attributes_dir, ahps_lid + '_attributes.csv') + + print(f"Reformatting TIFs {ahps_lid} for {huc_dir}") + with ProcessPoolExecutor(max_workers=job_number_tif) as executor: + for tif_to_process in tifs_to_reformat_list: + if not os.path.exists(tif_to_process): + continue + try: + magnitude = os.path.split(tif_to_process)[1].split('_')[1] + try: + interval_stage = float((os.path.split(tif_to_process)[1].split('_')[2]).replace('p', '.').replace("ft", "")) + if interval_stage == 'extent': + interval_stage = None + except ValueError: + interval_stage = None + executor.submit(reformat_inundation_maps, ahps_lid, tif_to_process, gpkg_dir, fim_version, huc, magnitude, nws_lid_attributes_filename, interval_stage) + except Exception as ex: + print(f"*** {ex}") + traceback.print_exc() + + +def post_process_cat_fim_for_viz(job_number_huc, job_number_tif, output_catfim_dir, attributes_dir, log_file="", fim_version=""): + + print("In post processing...") # Create workspace - gpkg_dir = os.path.join(output_cat_fim_dir, 'gpkg') + gpkg_dir = os.path.join(output_catfim_dir, 'gpkg') if not os.path.exists(gpkg_dir): os.mkdir(gpkg_dir) # Find the FIM version - fim_version = os.path.basename(output_cat_fim_dir) - merged_layer = os.path.join(output_cat_fim_dir, 'catfim_library.shp') - - if not os.path.exists(merged_layer): # prevents appending to existing output - - huc_ahps_dir_list = os.listdir(output_cat_fim_dir) - skip_list=['errors','logs','gpkg',merged_layer] - - for magnitude in magnitude_list: - - procs_list = [] - - # Loop through all categories + merged_layer = os.path.join(output_catfim_dir, 'catfim_library.gpkg') + if not os.path.exists(merged_layer): # prevents appending to existing output + huc_ahps_dir_list = os.listdir(output_catfim_dir) + skip_list=['errors','logs','gpkg','missing_files.txt','messages',merged_layer] + + # Loop through all categories + print("Building list of TIFs to reformat...") + with ProcessPoolExecutor(max_workers=job_number_huc) as huc_exector: + for huc in huc_ahps_dir_list: - if huc not in skip_list: - - huc_dir = os.path.join(output_cat_fim_dir, huc) + if huc in skip_list: + continue + huc_dir = os.path.join(output_catfim_dir, huc) + try: ahps_dir_list = os.listdir(huc_dir) - - # Loop through ahps sites - for ahps_lid in ahps_dir_list: - ahps_lid_dir = os.path.join(huc_dir, ahps_lid) - - extent_grid = os.path.join(ahps_lid_dir, ahps_lid + '_' + magnitude + '_extent_' + huc + '.tif') - - if os.path.exists(extent_grid): - procs_list.append([ahps_lid, extent_grid, gpkg_dir, fim_version, huc, magnitude, nws_lid_attributes_filename]) - - else: - try: - f = open(log_file, 'a+') - f.write(f"Missing layers: {extent_gpkg}\n") - f.close() - except: - pass - - # Multiprocess with instructions - with Pool(processes=number_of_jobs) as pool: - pool.map(reformat_inundation_maps, procs_list) - + except NotADirectoryError: + continue + # If there's no mapping for a HUC, delete the HUC directory. + if ahps_dir_list == []: + os.rmdir(huc_dir) + continue + + huc_exector.submit(post_process_huc_level, job_number_tif, ahps_dir_list, huc_dir, attributes_dir, gpkg_dir, fim_version, huc) + # Merge all layers print(f"Merging {len(os.listdir(gpkg_dir))} layers...") - for layer in os.listdir(gpkg_dir): - + # Open dissolved extent layers diss_extent_filename = os.path.join(gpkg_dir, layer) - - # Open diss_extent diss_extent = gpd.read_file(diss_extent_filename) diss_extent['viz'] = 'yes' - + # Write/append aggregate diss_extent + print(f"Merging layer: {layer}") if os.path.isfile(merged_layer): diss_extent.to_file(merged_layer,driver=getDriver(merged_layer),index=False, mode='a') else: diss_extent.to_file(merged_layer,driver=getDriver(merged_layer),index=False) - del diss_extent - - shutil.rmtree(gpkg_dir) + + #shutil.rmtree(gpkg_dir) # TODO else: print(f"{merged_layer} already exists.") -def reformat_inundation_maps(args): +def reformat_inundation_maps(ahps_lid, extent_grid, gpkg_dir, fim_version, huc, magnitude, nws_lid_attributes_filename, interval_stage=None): try: - lid = args[0] - grid_path = args[1] - gpkg_dir = args[2] - fim_version = args[3] - huc = args[4] - magnitude = args[5] - nws_lid_attributes_filename = args[6] - # Convert raster to to shapes - with rasterio.open(grid_path) as src: + with rasterio.open(extent_grid) as src: image = src.read(1) mask = image > 0 - + # Aggregate shapes results = ({'properties': {'extent': 1}, 'geometry': s} for i, (s, v) in enumerate(shapes(image, mask=mask,transform=src.transform))) # Convert list of shapes to polygon - extent_poly = gpd.GeoDataFrame.from_features(list(results), crs=PREP_PROJECTION) - + extent_poly = gpd.GeoDataFrame.from_features(list(results), crs=PREP_PROJECTION) # Dissolve polygons extent_poly_diss = extent_poly.dissolve(by='extent') # Update attributes extent_poly_diss = extent_poly_diss.reset_index(drop=True) - extent_poly_diss['ahps_lid'] = lid + extent_poly_diss['ahps_lid'] = ahps_lid extent_poly_diss['magnitude'] = magnitude extent_poly_diss['version'] = fim_version extent_poly_diss['huc'] = huc - + extent_poly_diss['interval_stage'] = interval_stage # Project to Web Mercator extent_poly_diss = extent_poly_diss.to_crs(VIZ_PROJECTION) # Join attributes nws_lid_attributes_table = pd.read_csv(nws_lid_attributes_filename, dtype={'huc':str}) - nws_lid_attributes_table = nws_lid_attributes_table.loc[(nws_lid_attributes_table.magnitude==magnitude) & (nws_lid_attributes_table.nws_lid==lid)] - - + nws_lid_attributes_table = nws_lid_attributes_table.loc[(nws_lid_attributes_table.magnitude==magnitude) & (nws_lid_attributes_table.nws_lid==ahps_lid)] extent_poly_diss = extent_poly_diss.merge(nws_lid_attributes_table, left_on=['ahps_lid','magnitude','huc'], right_on=['nws_lid','magnitude','huc']) - extent_poly_diss = extent_poly_diss.drop(columns='nws_lid') - # Save dissolved multipolygon - handle = os.path.split(grid_path)[1].replace('.tif', '') - - diss_extent_filename = os.path.join(gpkg_dir, handle + "_dissolved.gpkg") - + handle = os.path.split(extent_grid)[1].replace('.tif', '') + diss_extent_filename = os.path.join(gpkg_dir, f"{handle}_{huc}_dissolved.gpkg") extent_poly_diss["geometry"] = [MultiPolygon([feature]) if type(feature) == Polygon else feature for feature in extent_poly_diss["geometry"]] - + if not extent_poly_diss.empty: - extent_poly_diss.to_file(diss_extent_filename,driver=getDriver(diss_extent_filename),index=False) - except Exception as e: + except Exception: + pass # Log and clean out the gdb so it's not merged in later - try: - f = open(log_file, 'a+') - f.write(str(diss_extent_filename) + " - dissolve error: " + str(e)) - f.close() - except: - pass +# try: +# print(e) +## f = open(log_file, 'a+') +## f.write(str(diss_extent_filename) + " - dissolve error: " + str(e)) +## f.close() +# except: +# pass + + +def manage_catfim_mapping(fim_run_dir, source_flow_dir, output_catfim_dir, attributes_dir, job_number_huc, job_number_inundate, overwrite, depthtif): + + # Create output directory + if not os.path.exists(output_catfim_dir): + os.mkdir(output_catfim_dir) + + # Create log directory + log_dir = os.path.join(output_catfim_dir, 'logs') + if not os.path.exists(log_dir): + os.mkdir(log_dir) + + # Create error log path + log_file = os.path.join(log_dir, 'errors.log') + + job_number_tif = job_number_inundate + + print("Generating Categorical FIM") + generate_categorical_fim(fim_run_dir, source_flow_dir, output_catfim_dir, job_number_huc, job_number_inundate, depthtif, log_file) + + print("Aggregating Categorical FIM") + # Get fim_version. + fim_version = os.path.basename(os.path.normpath(fim_run_dir)).replace('fim_','').replace('_ms_c', '').replace('_', '.') + post_process_cat_fim_for_viz(job_number_huc, job_number_tif, output_catfim_dir, attributes_dir, log_file, fim_version) if __name__ == '__main__': @@ -292,7 +291,7 @@ def reformat_inundation_maps(args): parser = argparse.ArgumentParser(description='Categorical inundation mapping for FOSS FIM.') parser.add_argument('-r','--fim-run-dir',help='Name of directory containing outputs of fim_run.sh',required=True) parser.add_argument('-s', '--source-flow-dir',help='Path to directory containing flow CSVs to use to generate categorical FIM.',required=True, default="") - parser.add_argument('-o', '--output-cat-fim-dir',help='Path to directory where categorical FIM outputs will be written.',required=True, default="") + parser.add_argument('-o', '--output-catfim-dir',help='Path to directory where categorical FIM outputs will be written.',required=True, default="") parser.add_argument('-j','--number-of-jobs',help='Number of processes to use. Default is 1.',required=False, default="1",type=int) parser.add_argument('-depthtif','--write-depth-tiff',help='Using this option will write depth TIFFs.',required=False, action='store_true') @@ -300,27 +299,8 @@ def reformat_inundation_maps(args): fim_run_dir = args['fim_run_dir'] source_flow_dir = args['source_flow_dir'] - output_cat_fim_dir = args['output_cat_fim_dir'] + output_catfim_dir = args['output_catfim_dir'] number_of_jobs = int(args['number_of_jobs']) depthtif = args['write_depth_tiff'] - - # Create output directory - if not os.path.exists(output_cat_fim_dir): - os.mkdir(output_cat_fim_dir) - - # Create log directory - log_dir = os.path.join(output_cat_fim_dir, 'logs') - if not os.path.exists(log_dir): - os.mkdir(log_dir) - - # Create error log path - log_file = os.path.join(log_dir, 'errors.log') - - # Map path to points with attributes - nws_lid_attributes_filename = os.path.join(source_flow_dir, 'nws_lid_attributes.csv') - - print("Generating Categorical FIM") - generate_categorical_fim(fim_run_dir, source_flow_dir, output_cat_fim_dir, number_of_jobs, depthtif,log_file) - - print("Aggregating Categorical FIM") - post_process_cat_fim_for_viz(number_of_jobs, output_cat_fim_dir,nws_lid_attributes_filename,log_file) + + manage_catfim_mapping(fim_run_dir, source_flow_dir, output_catfim_dir, number_of_jobs, depthtif) diff --git a/tools/gms_tools/inundate_gms.py b/tools/gms_tools/inundate_gms.py index 4437fadab..7b481dd70 100755 --- a/tools/gms_tools/inundate_gms.py +++ b/tools/gms_tools/inundate_gms.py @@ -77,11 +77,9 @@ def Inundate_gms( hydrofabric_dir, forecast, num_workers = 1, hucCodes = [None] * number_of_branches branch_ids = [None] * number_of_branches - executor_generator = { executor.submit(inundate,**inp) : ids for inp,ids in inundate_input_generator - } - + } idx = 0 for future in tqdm(as_completed(executor_generator), total=len(executor_generator), diff --git a/tools/pixel_counter.py b/tools/pixel_counter.py index b7ca06602..e4a96dce9 100644 --- a/tools/pixel_counter.py +++ b/tools/pixel_counter.py @@ -96,6 +96,8 @@ def make_flood_extent_polygon(flood_extent): print(type(fullpath)) return fullpath + + # Function that transforms vector dataset to raster def bbox_to_pixel_offsets(gt, bbox): originX = gt[0] @@ -239,7 +241,10 @@ def zonal_stats(vector_path, raster_path_dict, nodata_value=None, global_src_ext vds = None rds = None - return stats + if stats != []: + return stats + else: + return [] # Creates and prints dataframe containing desired statistics diff --git a/tools/rating_curve_get_usgs_curves.py b/tools/rating_curve_get_usgs_curves.py index cb8a33f56..2c152850b 100644 --- a/tools/rating_curve_get_usgs_curves.py +++ b/tools/rating_curve_get_usgs_curves.py @@ -6,10 +6,16 @@ from tools_shared_functions import get_metadata, get_datum, ngvd_to_navd_ft, get_rating_curve, aggregate_wbd_hucs, get_thresholds, flow_data from dotenv import load_dotenv import os +import numpy as np import argparse import sys sys.path.append('/foss_fim/src') from utils.shared_variables import PREP_PROJECTION +from tools_shared_variables import (acceptable_coord_acc_code_list, + acceptable_coord_method_code_list, + acceptable_alt_acc_thresh, + acceptable_alt_meth_code_list, + acceptable_site_type_list) ''' This script calls the NOAA Tidal API for datum conversions. Experience shows that @@ -26,9 +32,10 @@ EVALUATED_SITES_CSV = os.getenv("EVALUATED_SITES_CSV") NWM_FLOWS_MS = os.getenv("NWM_FLOWS_MS") + def get_all_active_usgs_sites(): ''' - Compile a list of all active usgs gage sites that meet certain criteria. + Compile a list of all active usgs gage sites. Return a GeoDataFrame of all sites. Returns @@ -43,61 +50,15 @@ def get_all_active_usgs_sites(): selector = ['all'] must_include = 'usgs_data.active' metadata_list, metadata_df = get_metadata(metadata_url, select_by, selector, must_include = must_include, upstream_trace_distance = None, downstream_trace_distance = None ) - - #Filter out sites based quality of site. These acceptable codes were initially - #decided upon and may need fine tuning. A link where more information - #regarding the USGS attributes is provided. - - #https://help.waterdata.usgs.gov/code/coord_acy_cd_query?fmt=html - acceptable_coord_acc_code = ['H','1','5','S','R','B','C','D','E'] - #https://help.waterdata.usgs.gov/code/coord_meth_cd_query?fmt=html - acceptable_coord_method_code = ['C','D','W','X','Y','Z','N','M','L','G','R','F','S'] - #https://help.waterdata.usgs.gov/codes-and-parameters/codes#SI - acceptable_alt_acc_thresh = 1 - #https://help.waterdata.usgs.gov/code/alt_meth_cd_query?fmt=html - acceptable_alt_meth_code = ['A','D','F','I','J','L','N','R','W','X','Y','Z'] - #https://help.waterdata.usgs.gov/code/site_tp_query?fmt=html - acceptable_site_type = ['ST'] - - #Cycle through each site and filter out if site doesn't meet criteria. - acceptable_sites_metadata = [] - for metadata in metadata_list: - #Get the usgs info from each site - usgs_data = metadata['usgs_data'] - - #Get site quality attributes - coord_accuracy_code = usgs_data.get('coord_accuracy_code') - coord_method_code = usgs_data.get('coord_method_code') - alt_accuracy_code = usgs_data.get('alt_accuracy_code') - alt_method_code = usgs_data.get('alt_method_code') - site_type = usgs_data.get('site_type') - - #Check to make sure that none of the codes were null, if null values are found, skip to next. - if not all([coord_accuracy_code, coord_method_code, alt_accuracy_code, alt_method_code, site_type]): - continue - - #Test if site meets criteria. - if (coord_accuracy_code in acceptable_coord_acc_code and - coord_method_code in acceptable_coord_method_code and - alt_accuracy_code <= acceptable_alt_acc_thresh and - alt_method_code in acceptable_alt_meth_code and - site_type in acceptable_site_type): - - #If nws_lid is not populated then add a dummy ID so that 'aggregate_wbd_hucs' works correctly. - if not metadata.get('identifiers').get('nws_lid'): - metadata['identifiers']['nws_lid'] = 'Bogus_ID' - - #Append metadata of acceptable site to acceptable_sites list. - acceptable_sites_metadata.append(metadata) - #Get a geospatial layer (gdf) for all acceptable sites - dictionary, gdf = aggregate_wbd_hucs(acceptable_sites_metadata, Path(WBD_LAYER), retain_attributes = False) + print("Aggregating WBD HUCs...") + dictionary, gdf = aggregate_wbd_hucs(metadata_list, Path(WBD_LAYER), retain_attributes=True) #Get a list of all sites in gdf list_of_sites = gdf['identifiers_usgs_site_code'].to_list() #Rename gdf fields gdf.columns = gdf.columns.str.replace('identifiers_','') - return gdf, list_of_sites, acceptable_sites_metadata + return gdf, list_of_sites, metadata_list ############################################################################## #Generate categorical flows for each category across all sites. @@ -217,14 +178,19 @@ def usgs_rating_to_elev(list_of_gage_sites, workspace=False, sleep_time = 1.0): all input sites. Additional metadata also contained in DataFrame ''' + #Define URLs for metadata and rating curve metadata_url = f'{API_BASE_URL}/metadata' rating_curve_url = f'{API_BASE_URL}/rating_curve' - #If 'all' option passed to list of gages sites, it retrieves all acceptable sites within CONUS. + # Create workspace directory if it doesn't exist + if not os.path.exists(workspace): + os.mkdir(workspace) + + #If 'all' option passed to list of gages sites, it retrieves all sites within CONUS. print('getting metadata for all sites') if list_of_gage_sites == ['all']: - acceptable_sites_gdf, acceptable_sites_list, metadata_list = get_all_active_usgs_sites() + sites_gdf, sites_list, metadata_list = get_all_active_usgs_sites() #Otherwise, if a list of sites is passed, retrieve sites from WRDS. else: #Define arguments to retrieve metadata and then get metadata from WRDS @@ -254,22 +220,24 @@ def usgs_rating_to_elev(list_of_gage_sites, workspace=False, sleep_time = 1.0): #For each site in metadata_list for metadata in metadata_list: + print("get_datum") #Get datum information for site (only need usgs_data) nws, usgs = get_datum(metadata) #Filter out sites that are not in contiguous US. If this section is removed be sure to test with datum adjustment section (region will need changed) if usgs['state'] in ['Alaska', 'Puerto Rico', 'Virgin Islands', 'Hawaii']: continue - #Get rating curve for site location_ids = usgs['usgs_site_code'] + if location_ids == None: # Some sites don't have a value for usgs_site_code, skip them + continue curve = get_rating_curve(rating_curve_url, location_ids = [location_ids]) #If no rating curve was returned, skip site. if curve.empty: message = f'{location_ids}: has no rating curve' regular_messages.append(message) continue - + #Adjust datum to NAVD88 if needed. If datum unknown, skip site. if usgs['vcs'] == 'NGVD29': #To prevent time-out errors @@ -283,7 +251,6 @@ def usgs_rating_to_elev(list_of_gage_sites, workspace=False, sleep_time = 1.0): api_failure_messages.append(api_message) print(api_message) continue - #If datum adjustment succeeded, calculate datum in NAVD88 navd88_datum = round(usgs['datum'] + datum_adj_ft, 2) message = f'{location_ids}:succesfully converted NGVD29 to NAVD88' @@ -298,7 +265,8 @@ def usgs_rating_to_elev(list_of_gage_sites, workspace=False, sleep_time = 1.0): message = f"{location_ids}: datum unknown" regular_messages.append(message) continue - + + print("Populating..") #Populate rating curve with metadata and use navd88 datum to convert stage to elevation. curve['active'] = usgs['active'] curve['datum'] = usgs['datum'] @@ -306,23 +274,53 @@ def usgs_rating_to_elev(list_of_gage_sites, workspace=False, sleep_time = 1.0): curve['navd88_datum'] = navd88_datum curve['elevation_navd88'] = curve['stage'] + navd88_datum #Append all rating curves to a dataframe - all_rating_curves = all_rating_curves.append(curve) + all_rating_curves = all_rating_curves.append(curve) #Rename columns and add attribute indicating if rating curve exists - acceptable_sites_gdf.rename(columns = {'nwm_feature_id':'feature_id','usgs_site_code':'location_id'}, inplace = True) + sites_gdf.rename(columns = {'nwm_feature_id':'feature_id','usgs_site_code':'location_id'}, inplace = True) sites_with_data = pd.DataFrame({'location_id':all_rating_curves['location_id'].unique(),'curve':'yes'}) - acceptable_sites_gdf = acceptable_sites_gdf.merge(sites_with_data, on = 'location_id', how = 'left') - acceptable_sites_gdf.fillna({'curve':'no'},inplace = True) + sites_gdf = sites_gdf.merge(sites_with_data, on = 'location_id', how = 'left') + sites_gdf.fillna({'curve':'no'},inplace = True) #Add mainstems attribute to acceptable sites print('Attributing mainstems sites') #Import mainstems segments used in run_by_unit.sh ms_df = gpd.read_file(NWM_FLOWS_MS) ms_segs = ms_df.ID.astype(str).to_list() #Populate mainstems attribute field - acceptable_sites_gdf['mainstem'] = 'no' - acceptable_sites_gdf.loc[acceptable_sites_gdf.eval('feature_id in @ms_segs'),'mainstem'] = 'yes' + sites_gdf['mainstem'] = 'no' + sites_gdf.loc[sites_gdf.eval('feature_id in @ms_segs'),'mainstem'] = 'yes' + sites_gdf.to_csv(os.path.join(workspace, 'acceptable_sites_pre.csv')) + + sites_gdf = sites_gdf.drop(['upstream_nwm_features'], axis=1, errors='ignore') + sites_gdf = sites_gdf.drop(['downstream_nwm_features'], axis=1, errors='ignore') + print("Recasting...") + sites_gdf = sites_gdf.astype({'metadata_sources': str}) + # -- Filter all_rating_curves according to acceptance criteria -- # + # -- We only want acceptable gages in the rating curve CSV -- # + sites_gdf['acceptable_codes'] = (sites_gdf['usgs_data_coord_accuracy_code'].isin(acceptable_coord_acc_code_list) + & sites_gdf['usgs_data_coord_method_code'].isin(acceptable_coord_method_code_list) + & sites_gdf['usgs_data_alt_method_code'].isin(acceptable_alt_meth_code_list) + & sites_gdf['usgs_data_site_type'].isin(acceptable_site_type_list)) + + sites_gdf = sites_gdf.astype({'usgs_data_alt_accuracy_code': float}) + sites_gdf['acceptable_alt_error'] = np.where(sites_gdf['usgs_data_alt_accuracy_code'] <= acceptable_alt_acc_thresh, True, False) + + sites_gdf.to_file(os.path.join(workspace, 'sites_bool_flags.gpkg'), driver='GPKG') + + # Filter and save filtered file for viewing + acceptable_sites_gdf = sites_gdf[(sites_gdf['acceptable_codes'] == True) & (sites_gdf['acceptable_alt_error'] == True)] + acceptable_sites_gdf = acceptable_sites_gdf[acceptable_sites_gdf['curve'] == 'yes'] + acceptable_sites_gdf.to_csv(os.path.join(workspace, 'acceptable_sites_for_rating_curves.csv')) + acceptable_sites_gdf.to_file(os.path.join(workspace, 'acceptable_sites_for_rating_curves.gpkg'),driver='GPKG') + + # Make list of acceptable sites + acceptable_sites_list = acceptable_sites_gdf['location_id'].tolist() + + # Filter out all_rating_curves by list + all_rating_curves = all_rating_curves[all_rating_curves['location_id'].isin(acceptable_sites_list)] + #If workspace is specified, write data to file. if workspace: #Write rating curve dataframe to file @@ -334,10 +332,10 @@ def usgs_rating_to_elev(list_of_gage_sites, workspace=False, sleep_time = 1.0): regular_messages = api_failure_messages + regular_messages all_messages = pd.DataFrame({'Messages':regular_messages}) all_messages.to_csv(Path(workspace) / 'log.csv', index = False) - #If 'all' option specified, reproject then write out shapefile of acceptable sites. + # If 'all' option specified, reproject then write out shapefile of acceptable sites. if list_of_gage_sites == ['all']: - acceptable_sites_gdf = acceptable_sites_gdf.to_crs(PREP_PROJECTION) - acceptable_sites_gdf.to_file(Path(workspace) / 'usgs_gages.gpkg', layer = 'usgs_gages', driver = 'GPKG') + sites_gdf = sites_gdf.to_crs(PREP_PROJECTION) + sites_gdf.to_file(Path(workspace) / 'usgs_gages.gpkg', layer = 'usgs_gages', driver = 'GPKG') #Write out flow files for each threshold across all sites all_data = write_categorical_flow_files(metadata_list, workspace) @@ -366,5 +364,6 @@ def usgs_rating_to_elev(list_of_gage_sites, workspace=False, sleep_time = 1.0): t = float(args['sleep_timer']) #Generate USGS rating curves + print("Executing...") usgs_rating_to_elev(list_of_gage_sites = l, workspace=w, sleep_time = t) \ No newline at end of file diff --git a/tools/test_case_by_hydro_id.py b/tools/test_case_by_hydro_id.py index 3db6c7d6c..4d7657ff6 100644 --- a/tools/test_case_by_hydro_id.py +++ b/tools/test_case_by_hydro_id.py @@ -21,6 +21,7 @@ def perform_zonal_stats(huc_gpkg,agree_rast): stats = zonal_stats(huc_gpkg,{"agreement_raster":agree_rast}, nodata_value=10) return stats + ##################################################### # Creates a pandas df containing Alpha stats by hydroid. # Stats input is the output of zonal_stats function. @@ -154,48 +155,49 @@ def assemble_hydro_alpha_for_single_huc(stats,huc8,mag,bench): if not os.path.exists(test_case_class.fim_dir): print(f'{test_case_class.fim_dir} does not exist') continue - else: - print(test_case_class.fim_dir, end='\r') - - agreement_dict = test_case_class.get_current_agreements() - - for agree_rast in agreement_dict: - - print('performing_zonal_stats') - - branches_dir = os.path.join(test_case_class.fim_dir,'branches') - for branches in os.listdir(branches_dir): - if branches != "0": - continue - huc_gpkg = os.path.join(branches_dir,branches,) - - string_manip = "gw_catchments_reaches_filtered_addedAttributes_crosswalked_" + branches + ".gpkg" - - huc_gpkg = os.path.join(huc_gpkg, string_manip) - - define_mag = agree_rast.split(version) - - define_mag_1 = define_mag[1].split('/') - - mag = define_mag_1[1] + print(test_case_class.fim_dir, end='\r') - stats = perform_zonal_stats(huc_gpkg,agree_rast) - - print('assembling_hydroalpha_for_single_huc') - get_geom = gpd.read_file(huc_gpkg) - - get_geom['geometry'] = get_geom.apply(lambda row: make_valid(row.geometry), axis=1) + agreement_dict = test_case_class.get_current_agreements() + + for agree_rast in agreement_dict: + + print('performing_zonal_stats') + + branches_dir = os.path.join(test_case_class.fim_dir,'branches') + for branches in os.listdir(branches_dir): + if branches != "0": + continue + huc_gpkg = os.path.join(branches_dir,branches,) - in_mem_df = assemble_hydro_alpha_for_single_huc(stats, test_case_class.huc, mag, test_case_class.benchmark_cat) - - hydro_geom_df = get_geom[["HydroID", "geometry"]] - - geom_output = hydro_geom_df.merge(in_mem_df, on='HydroID', how ='inner') - - concat_df_list = [geom_output, csv_output] - - csv_output = pd.concat(concat_df_list, sort=False) + string_manip = "gw_catchments_reaches_filtered_addedAttributes_crosswalked_" + branches + ".gpkg" + + huc_gpkg = os.path.join(huc_gpkg, string_manip) + + define_mag = agree_rast.split(version) + + define_mag_1 = define_mag[1].split('/') + + mag = define_mag_1[1] + + stats = perform_zonal_stats(huc_gpkg,agree_rast) + if stats == []: + continue + + print('assembling_hydroalpha_for_single_huc') + get_geom = gpd.read_file(huc_gpkg) + + get_geom['geometry'] = get_geom.apply(lambda row: make_valid(row.geometry), axis=1) + + in_mem_df = assemble_hydro_alpha_for_single_huc(stats, test_case_class.huc, mag, test_case_class.benchmark_cat) + + hydro_geom_df = get_geom[["HydroID", "geometry"]] + + geom_output = hydro_geom_df.merge(in_mem_df, on='HydroID', how ='inner') + + concat_df_list = [geom_output, csv_output] + + csv_output = pd.concat(concat_df_list, sort=False) print('projecting to 3857') csv_output = csv_output.to_crs('EPSG:3857') diff --git a/tools/tools_shared_functions.py b/tools/tools_shared_functions.py index 119aee6c5..cc317e867 100755 --- a/tools/tools_shared_functions.py +++ b/tools/tools_shared_functions.py @@ -16,6 +16,42 @@ from shapely.geometry import shape from shapely.geometry import Polygon from shapely.geometry import MultiPolygon +from dotenv import load_dotenv + + +def get_env_paths(): + load_dotenv() + #import variables from .env file + API_BASE_URL = os.getenv("API_BASE_URL") + WBD_LAYER = os.getenv("WBD_LAYER") + return API_BASE_URL, WBD_LAYER + + +def filter_nwm_segments_by_stream_order(unfiltered_segments, desired_order): + """ + This function uses the WRDS API to filter out NWM segments from a list if their stream order is different than + the target stream order. + + Args: + unfiltered_segments (list): A list of NWM feature_id strings. + desired_order (str): The desired stream order. + Returns: + filtered_segments (list): A list of NWM feature_id strings, paired down to only those that share the target order. + + """ + API_BASE_URL, WBD_LAYER = get_env_paths() + #Define workspace and wbd_path as a pathlib Path. Convert search distances to integer. + metadata_url = f'{API_BASE_URL}/metadata' + + filtered_segments = [] + for feature_id in unfiltered_segments: + all_lists = get_metadata(metadata_url, select_by = 'nwm_feature_id', selector = [feature_id]) + feature_id_metadata = next(item for item in all_lists)[0] + stream_order = feature_id_metadata['nwm_feature_data']['stream_order'] + if stream_order == desired_order: + filtered_segments.append(feature_id) + + return filtered_segments def check_for_regression(stats_json_to_test, previous_version, previous_version_stats_json_path, regression_test_csv=None): @@ -613,6 +649,8 @@ def get_metadata(metadata_url, select_by, selector, must_include = None, upstrea params['downstream_trace_distance'] = downstream_trace_distance #Request data from url response = requests.get(url, params = params) +# print(response) +# print(url) if response.ok: #Convert data response to a json metadata_json = response.json() @@ -685,13 +723,16 @@ def aggregate_wbd_hucs(metadata_list, wbd_huc8_path, retain_attributes = False): ''' #Import huc8 layer as geodataframe and retain necessary columns + print("Reading WBD...") huc8 = gpd.read_file(wbd_huc8_path, layer = 'WBDHU8') + print("WBD read.") huc8 = huc8[['HUC8','name','states', 'geometry']] #Define EPSG codes for possible latlon datum names (default of NAD83 if unassigned) crs_lookup ={'NAD27':'EPSG:4267', 'NAD83':'EPSG:4269', 'WGS84': 'EPSG:4326'} #Create empty geodataframe and define CRS for potential horizontal datums metadata_gdf = gpd.GeoDataFrame() #Iterate through each site + print("Iterating through metadata list...") for metadata in metadata_list: #Convert metadata to json df = pd.json_normalize(metadata) @@ -701,6 +742,7 @@ def aggregate_wbd_hucs(metadata_list, wbd_huc8_path, retain_attributes = False): df.dropna(subset = ['identifiers_nws_lid','usgs_preferred_latitude', 'usgs_preferred_longitude'], inplace = True) #If dataframe still has data if not df.empty: +# print(df[:5]) #Get horizontal datum h_datum = df['usgs_preferred_latlon_datum_name'].item() #Look up EPSG code, if not returned Assume NAD83 as default. @@ -716,15 +758,16 @@ def aggregate_wbd_hucs(metadata_list, wbd_huc8_path, retain_attributes = False): site_gdf = site_gdf.to_crs(huc8.crs) #Append site geodataframe to metadata geodataframe metadata_gdf = metadata_gdf.append(site_gdf, ignore_index = True) - + #Trim metadata to only have certain fields. if not retain_attributes: metadata_gdf = metadata_gdf[['identifiers_nwm_feature_id', 'identifiers_nws_lid', 'identifiers_usgs_site_code', 'geometry']] #If a list of attributes is supplied then use that list. - elif isinstance(retain_attributes,list): - metadata_gdf = metadata_gdf[retain_attributes] - +# elif isinstance(retain_attributes,list): +# metadata_gdf = metadata_gdf[retain_attributes] + print("Performing spatial and tabular operations on geodataframe...") #Perform a spatial join to get the WBD HUC 8 assigned to each AHPS + print(metadata_gdf) joined_gdf = gpd.sjoin(metadata_gdf, huc8, how = 'inner', op = 'intersects', lsuffix = 'ahps', rsuffix = 'wbd') joined_gdf = joined_gdf.drop(columns = 'index_wbd') @@ -913,7 +956,10 @@ def get_thresholds(threshold_url, select_by, selector, threshold = 'all'): flows['usgs_site_code'] = threshold_data.get('metadata').get('usgs_site_code') stages['units'] = threshold_data.get('metadata').get('stage_units') flows['units'] = threshold_data.get('metadata').get('calc_flow_units') - return stages, flows + return stages, flows + else: + print("WRDS response error: ") +# print(response) ######################################################################## # Function to write flow file @@ -1071,7 +1117,7 @@ def ngvd_to_navd_ft(datum_info, region = 'contiguous'): lon = datum_info['lon'] #Define url for datum API - datum_url = 'https://vdatum.noaa.gov/vdatumweb/api/tidal' + datum_url = 'https://vdatum.noaa.gov/vdatumweb/api/convert' #Define parameters. Hard code most parameters to convert NGVD to NAVD. params = {} @@ -1087,16 +1133,18 @@ def ngvd_to_navd_ft(datum_info, region = 'contiguous'): #Call the API response = requests.get(datum_url, params = params) - #If succesful get the navd adjustment + + #If successful get the navd adjustment if response: results = response.json() #Get adjustment in meters (NGVD29 to NAVD88) - adjustment = results['tar_height'] + adjustment = results['t_z'] #convert meters to feet adjustment_ft = round(float(adjustment) * 3.28084,2) else: adjustment_ft = None - return adjustment_ft + return adjustment_ft + ####################################################################### #Function to download rating curve from API ####################################################################### @@ -1121,6 +1169,7 @@ def get_rating_curve(rating_curve_url, location_ids): #Define DataFrame to contain all returned curves. all_curves = pd.DataFrame() + print(location_ids) #Define call to retrieve all rating curve information from WRDS. joined_location_ids = '%2C'.join(location_ids) url = f'{rating_curve_url}/{joined_location_ids}' diff --git a/tools/tools_shared_variables.py b/tools/tools_shared_variables.py index 82d63d0b0..e63d1cf80 100755 --- a/tools/tools_shared_variables.py +++ b/tools/tools_shared_variables.py @@ -61,3 +61,16 @@ TWHITE = '\033[37m' WHITE_BOLD = '\033[37;1m' CYAN_BOLD = '\033[36;1m' + +# USGS gages acceptance criteria. Likely not constants, so not using all caps. +# ANY CHANGE TO THESE VALUES SHOULD WARRANT A CODE VERSION CHANGE +# https://help.waterdata.usgs.gov/code/coord_acy_cd_query?fmt=html +acceptable_coord_acc_code_list = ['H','1','5','S','R','B','C','D','E'] +# https://help.waterdata.usgs.gov/code/coord_meth_cd_query?fmt=html +acceptable_coord_method_code_list = ['C','D','W','X','Y','Z','N','M','L','G','R','F','S'] +# https://help.waterdata.usgs.gov/codes-and-parameters/codes#SI +acceptable_alt_acc_thresh = 1.0 +# https://help.waterdata.usgs.gov/code/alt_meth_cd_query?fmt=html +acceptable_alt_meth_code_list = ['A','D','F','I','J','L','N','R','W','X','Y','Z'] +# https://help.waterdata.usgs.gov/code/site_tp_query?fmt=html +acceptable_site_type_list = ['ST']