From ad2eb622f247437b987ab4a116d8233fe6ca7f63 Mon Sep 17 00:00:00 2001 From: Rob Hanna Date: Wed, 3 Jul 2024 17:07:28 +0000 Subject: [PATCH] more updates --- tools/generate_categorical_fim.py | 48 ++++++----- tools/generate_categorical_fim_flows.py | 106 +++++++++++++++--------- 2 files changed, 95 insertions(+), 59 deletions(-) diff --git a/tools/generate_categorical_fim.py b/tools/generate_categorical_fim.py index 2de6641d..8838a17f 100755 --- a/tools/generate_categorical_fim.py +++ b/tools/generate_categorical_fim.py @@ -140,15 +140,26 @@ def process_generate_categorical_fim( # we are getting too many folders and files. We want just huc folders. # output_flow_dir_list = os.listdir(fim_run_dir) # looking for folders only starting with 0, 1, or 2 - lst_hucs = [ + + # for now, we are dropping all Alaska HUCS + + # DEBUG: July 2, 2024 + # Override to keep just the one huc. + lst_hucs = ['19020301'] + + + valid_ahps_hucs = [ x for x in os.listdir(fim_run_dir) - if os.path.isdir(os.path.join(fim_run_dir, x)) and x[0] in ['0', '1', '2'] + if os.path.isdir(os.path.join(fim_run_dir, x)) and + x[0] in ['0', '1', '2'] and + x[:2] != "19" ] - # print(lst_hucs) - lst_hucs.sort() + # print(valid_ahps_hucs) + + valid_ahps_hucs.sort() - num_hucs = len(lst_hucs) + num_hucs = len(valid_ahps_hucs) if num_hucs == 0: raise ValueError( f'Output directory {fim_run_dir} is empty. Verify that you have the correct input folder.' @@ -172,7 +183,7 @@ def process_generate_categorical_fim( FLOG.lprint(f"Start generate categorical fim for {catfim_method} - (UTC): {dt_string}") FLOG.lprint("") - FLOG.lprint(f"Processing {num_hucs} huc(s)") + FLOG.lprint(f"Processing {num_hucs} huc(s) with Alaska removed") load_dotenv(env_file) API_BASE_URL = os.getenv('API_BASE_URL') @@ -191,16 +202,16 @@ def process_generate_categorical_fim( # if not os.path.exists(fim_inputs_csv_path): # raise ValueError(f'{fim_inputs_csv_path} not found. Verify that you have the correct input files.') - print() + # print() - FLOG.lprint("Filtering out HUCs that do not have related ahps site in them.") - valid_ahps_hucs = __filter_hucs_to_ahps(lst_hucs) + # FLOG.lprint("Filtering out HUCs that do not have related ahps site in them.") + # valid_ahps_hucs = __filter_hucs_to_ahps(lst_hucs) - num_valid_hucs = len(valid_ahps_hucs) - if num_valid_hucs == 0: - raise Exception("None of the HUCs supplied have ahps sites in them. Check your fim output folder") - else: - FLOG.lprint(f"Processing {num_valid_hucs} huc(s) with AHPS sites") + # num_valid_hucs = len(valid_ahps_hucs) + # if num_valid_hucs == 0: + # raise Exception("None of the HUCs supplied have ahps sites in them. Check your fim output folder") + # else: + # FLOG.lprint(f"Processing {num_valid_hucs} huc(s) with AHPS sites") # Define upstream and downstream search in miles nwm_us_search, nwm_ds_search = search, search @@ -354,8 +365,8 @@ def update_mapping_status(output_mapping_dir, nws_sites_layer): ---------- output_mapping_dir : STR Path to the output directory of all inundation maps. - output_flows_dir : STR - Path to the directory containing all flows. + nws_sites_layer : STR + Returns ------- @@ -1128,7 +1139,7 @@ def generate_stage_based_categorical_fim( with open(full_message_csv_path, newline='') as message_file: reader = csv.reader(message_file) for row in reader: - all_messages.append(row) + all_messages.append(row.strip()) # Filter out columns and write out to file nws_sites_layer = os.path.join(output_mapping_dir, 'nws_lid_sites.gpkg') @@ -1140,8 +1151,6 @@ def generate_stage_based_categorical_fim( FLOG.lprint(f"nws_sites_layer does not exist") - # FIX: (DO WE NEED IT?) - # Write messages to DataFrame, split into columns, aggregate messages. if len(all_messages) > 0: @@ -1493,3 +1502,4 @@ def produce_stage_based_catfim_tifs( except Exception: FLOG.critical(traceback.format_exc()) + diff --git a/tools/generate_categorical_fim_flows.py b/tools/generate_categorical_fim_flows.py index df97d0a6..369ef09d 100755 --- a/tools/generate_categorical_fim_flows.py +++ b/tools/generate_categorical_fim_flows.py @@ -87,34 +87,36 @@ def generate_flows_for_huc( # Get stages and flows for each threshold from the WRDS API. Priority given to USGS calculated # flows. + # TODO: Jun 17, 2024 - This gets recalled for every huc but only uses the nws_list. # Move this somewhere outside the huc list so it doesn't need to be called over and over again + + # Careful, for "all_message.append" the syntax into it must be f'{lid}: (whever messages) + # this is gets parsed and logic used against it. + MP_LOG.trace(f'Getting thresholds for {lid}') stages, flows = get_thresholds( threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' ) if len(stages) == 0 or len(flows) == 0: - message = f'{huc} - {lid}: stages or flows is none, likely WRDS error' + message = f'{lid}: no stages or flows exist, likely WRDS error' all_messages.append(message) - MP_LOG.warning(message) - # print(message) # TODO: Make verbose option + MP_LOG.warning(f"{huc} - {message}") continue # Check if stages are supplied, if not write message and exit. if all(stages.get(category, None) is None for category in flood_categories): - message = f'{huc} - {lid}: missing threshold stages' + message = f'{lid}: missing threshold stages' all_messages.append(message) - MP_LOG.warning(message) - # print(message) # TODO: Make verbose option + MP_LOG.warning(f"{huc} - {message}") continue # Check if calculated flows are supplied, if not write message and exit. if all(flows.get(category, None) is None for category in flood_categories): - message = f'{huc} - {lid}: missing calculated flows' + message = f'{lid}: missing calculated flows' all_messages.append(message) - MP_LOG.warning(message) - # print(message) # TODO: Make verbose option + MP_LOG.warning(f"{huc} - {message}") continue # Find lid metadata from master list of metadata dictionaries (line 66). @@ -134,7 +136,7 @@ def generate_flows_for_huc( if not segments or len(segments) == 0: message = f'{lid}: missing nwm segments' all_messages.append(message) - MP_LOG.warning(message) + MP_LOG.warning(f"{huc} - {message}") continue # For each flood category @@ -161,9 +163,9 @@ def generate_flows_for_huc( flow_info.to_csv(output_file, index=False) else: - message = f'{huc} - {lid}: magnitude : {category} is missing calculated flow' + message = f'{lid}: {category} is missing calculated flow' all_messages.append(message) - MP_LOG.warning(message) + MP_LOG.warning(f"{huc} - {message}") # Get various attributes of the site. lat = float(metadata['nws_preferred']['latitude']) @@ -228,6 +230,7 @@ def generate_flows_for_huc( huc_messages_txt_file = os.path.join(huc_messages_dir, str(huc) + '_messages.txt') with open(huc_messages_txt_file, 'w') as f: for item in all_messages: + item = item.strip() f.write("%s\n" % item) MP_LOG.lprint(f'--- generate_flow_for_huc done for {huc}') @@ -307,8 +310,9 @@ def generate_flows( nwm_flows_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows.gpkg' nwm_flows_df = gpd.read_file(nwm_flows_gpkg) - nwm_flows_alaska_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows_alaska_nwmV3_ID.gpkg' - nwm_flows_alaska_df = gpd.read_file(nwm_flows_alaska_gpkg) + # Jul 3, 2024 - we are skipping Alaska for now. + # nwm_flows_alaska_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows_alaska_nwmV3_ID.gpkg' + # nwm_flows_alaska_df = gpd.read_file(nwm_flows_alaska_gpkg) all_meta_lists = __load_nwm_metadata( output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_search, nwm_metafile @@ -325,31 +329,14 @@ def generate_flows( # Get a dictionary of hucs (key) and sites (values) as well as a GeoDataFrame # of all sites used later in script. - FLOG.lprint("\nStart aggregate_wbd_hucs") + FLOG.lprint("Start aggregate_wbd_hucs") start_dt = datetime.now(timezone.utc) - - # ++++++++++++++++++++++++++++ - # TEMP DEBUGGING - is_debug_override = False # If true, it will skip loading the agg from code and load the gpkg in stead - if is_debug_override: - # Ensured I had a 12040101 fim output dir to match this. - # but you might have to run this once to get the agg_wbd that matches for the 12040101 - agg_gpkg = os.path.join(output_catfim_dir, "agg_wbd.gpkg") - huc_dictionary = {} - huc_dictionary["12040101"] = ['CFKT2', 'FCWT2', 'HMMT2', 'POET2', 'PTET2'] - out_gdf = gpd.read_file(agg_gpkg) - else: - huc_dictionary, out_gdf = aggregate_wbd_hucs(all_meta_lists, WBD_LAYER, True, lst_hucs) - - if is_debug_override: - out_gdf.to_file(agg_gpkg, driver='GPKG') - # ++++++++++++++++++++++++++++ + huc_dictionary, out_gdf = aggregate_wbd_hucs(all_meta_lists, WBD_LAYER, True, lst_hucs) end_dt = datetime.now(timezone.utc) time_duration = end_dt - start_dt FLOG.lprint(f"End aggregate_wbd_hucs - Duration: {str(time_duration).split('.')[0]}") - print("") FLOG.lprint("\nStart Flow Generation") @@ -361,7 +348,6 @@ def generate_flows( threshold_url, all_meta_lists, nwm_flows_df, - nwm_flows_alaska_df, ) start_dt = datetime.now(timezone.utc) @@ -375,7 +361,7 @@ def generate_flows( with ProcessPoolExecutor(max_workers=job_number_huc) as executor: for huc in huc_dictionary: - flows_df = nwm_flows_alaska_df if huc[:2] == '19' else nwm_flows_df + # flows_df = nwm_flows_alaska_df if huc[:2] == '19' else nwm_flows_df executor.submit( generate_flows_for_huc, huc, @@ -385,14 +371,14 @@ def generate_flows( output_flows_dir, attributes_dir, huc_messages_dir, - flows_df, + nwm_flows_df, log_output_file, child_log_file_prefix, ) # end ProcessPoolExecutor # rolls up logs from child MP processes into this parent_log_output_file - MP_LOG.merge_log_files(log_output_file, child_log_file_prefix) + FLOG.merge_log_files(log_output_file, child_log_file_prefix) end_dt = datetime.now(timezone.utc) time_duration = end_dt - start_dt @@ -425,7 +411,6 @@ def generate_flows( # Preprocess the out_gdf GeoDataFrame. Reproject and reformat fields. - # TODO: Accomodate AK projection? viz_out_gdf = out_gdf.to_crs(VIZ_PROJECTION) viz_out_gdf.rename( columns={ @@ -436,6 +421,10 @@ def generate_flows( inplace=True, ) viz_out_gdf['nws_lid'] = viz_out_gdf['nws_lid'].str.lower() + FLOG.lprint("+++++++++++++++++++++\nviz_out_gdf part 1 is") + FLOG.lprint(f"len is {len(viz_out_gdf)}") + FLOG.lprint(viz_out_gdf) + FLOG.lprint("+++++++++++++++++++++\n") # Using list of csv_files, populate DataFrame of all nws_lids that had # a flow file produced and denote with "mapped" column. @@ -444,13 +433,21 @@ def generate_flows( nws_lids.append(csv_file.split('_attributes')[0]) lids_df = pd.DataFrame(nws_lids, columns=['nws_lid']) lids_df['mapped'] = 'yes' + FLOG.lprint("nws_lid...") + FLOG.lprint(lids_df) # Identify what lids were mapped by merging with lids_df. Populate # 'mapped' column with 'No' if sites did not map. viz_out_gdf = viz_out_gdf.merge(lids_df, how='left', on='nws_lid') viz_out_gdf['mapped'] = viz_out_gdf['mapped'].fillna('no') - # Read all messages for all HUCs TODO + FLOG.lprint("+++++++++++++++++++++\nviz_out_gdf part 2 is") + FLOG.lprint(f"len is {len(viz_out_gdf)}") + FLOG.lprint(viz_out_gdf) + FLOG.lprint("+++++++++++++++++++++\nv") + + + # Read all messages for all HUCs huc_message_list = [] huc_messages_dir_list = os.listdir(huc_messages_dir) for huc_message_file in huc_messages_dir_list: @@ -459,6 +456,7 @@ def generate_flows( if full_path_file.endswith('.txt'): lines = f.readlines() for line in lines: + line = line.strip() huc_message_list.append(line) # Write messages to DataFrame, split into columns, aggregate messages. @@ -469,13 +467,40 @@ def generate_flows( .str.split(':', n=1, expand=True) .rename(columns={0: 'nws_lid', 1: 'status'}) ) + # There could be duplicate message for one ahps (ie. missing nwm segments), so drop dups + messages_df.drop_duplicates(subset="status", keep="first", inplace=True) + + # We want one viz_out_gdf record per ahps and if there are more than one, contact the messages + + FLOG.lprint("^^^^^^^^^^^^^^^^^^^") + FLOG.lprint("messages_df..") + FLOG.lprint(f"len is: {len(messages_df)}") + FLOG.lprint(messages_df) + + + #status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index() + #df1 = df.groupby(['ID1','ID2'])['Status'].agg(lambda x: ','.join(x.dropna())).reset_index() + status_df = messages_df.groupby(['nws_lid'])['status'].agg(lambda x: ', '.join(x)).reset_index() + + FLOG.lprint("^^^^^^^^^^^^^^^^^^^") + FLOG.lprint("status_df..") + FLOG.lprint(f"len is: {len(status_df)}") + FLOG.lprint(status_df) - status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index() # Join messages to populate status field to candidate sites. Assign # status for null fields. viz_out_gdf = viz_out_gdf.merge(status_df, how='left', on='nws_lid') + + + viz_out_gdf['status'] = viz_out_gdf['status'].fillna('all calculated flows available') + + FLOG.lprint("+++++++++++++++++++++\nviz_out_gdf part 2 is") + FLOG.lprint(f"len is {len(viz_out_gdf)}") + FLOG.lprint(viz_out_gdf) + FLOG.lprint("+++++++++++++++++++++\nv") + # Filter out columns and write out to file # viz_out_gdf = viz_out_gdf.filter( @@ -540,3 +565,4 @@ def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_s return all_meta_lists # Can't be used via command line +