Skip to content

Commit

Permalink
more updates
Browse files Browse the repository at this point in the history
  • Loading branch information
RobHanna-NOAA committed Jul 3, 2024
1 parent 5d3f826 commit ad2eb62
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 59 deletions.
48 changes: 29 additions & 19 deletions tools/generate_categorical_fim.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,26 @@ def process_generate_categorical_fim(
# we are getting too many folders and files. We want just huc folders.
# output_flow_dir_list = os.listdir(fim_run_dir)
# looking for folders only starting with 0, 1, or 2
lst_hucs = [

# for now, we are dropping all Alaska HUCS

# DEBUG: July 2, 2024
# Override to keep just the one huc.
lst_hucs = ['19020301']


valid_ahps_hucs = [
x
for x in os.listdir(fim_run_dir)
if os.path.isdir(os.path.join(fim_run_dir, x)) and x[0] in ['0', '1', '2']
if os.path.isdir(os.path.join(fim_run_dir, x)) and
x[0] in ['0', '1', '2'] and
x[:2] != "19"
]
# print(lst_hucs)
lst_hucs.sort()
# print(valid_ahps_hucs)

valid_ahps_hucs.sort()

num_hucs = len(lst_hucs)
num_hucs = len(valid_ahps_hucs)
if num_hucs == 0:
raise ValueError(
f'Output directory {fim_run_dir} is empty. Verify that you have the correct input folder.'
Expand All @@ -172,7 +183,7 @@ def process_generate_categorical_fim(
FLOG.lprint(f"Start generate categorical fim for {catfim_method} - (UTC): {dt_string}")
FLOG.lprint("")

FLOG.lprint(f"Processing {num_hucs} huc(s)")
FLOG.lprint(f"Processing {num_hucs} huc(s) with Alaska removed")

load_dotenv(env_file)
API_BASE_URL = os.getenv('API_BASE_URL')
Expand All @@ -191,16 +202,16 @@ def process_generate_categorical_fim(
# if not os.path.exists(fim_inputs_csv_path):
# raise ValueError(f'{fim_inputs_csv_path} not found. Verify that you have the correct input files.')

print()
# print()

FLOG.lprint("Filtering out HUCs that do not have related ahps site in them.")
valid_ahps_hucs = __filter_hucs_to_ahps(lst_hucs)
# FLOG.lprint("Filtering out HUCs that do not have related ahps site in them.")
# valid_ahps_hucs = __filter_hucs_to_ahps(lst_hucs)

num_valid_hucs = len(valid_ahps_hucs)
if num_valid_hucs == 0:
raise Exception("None of the HUCs supplied have ahps sites in them. Check your fim output folder")
else:
FLOG.lprint(f"Processing {num_valid_hucs} huc(s) with AHPS sites")
# num_valid_hucs = len(valid_ahps_hucs)
# if num_valid_hucs == 0:
# raise Exception("None of the HUCs supplied have ahps sites in them. Check your fim output folder")
# else:
# FLOG.lprint(f"Processing {num_valid_hucs} huc(s) with AHPS sites")

# Define upstream and downstream search in miles
nwm_us_search, nwm_ds_search = search, search
Expand Down Expand Up @@ -354,8 +365,8 @@ def update_mapping_status(output_mapping_dir, nws_sites_layer):
----------
output_mapping_dir : STR
Path to the output directory of all inundation maps.
output_flows_dir : STR
Path to the directory containing all flows.
nws_sites_layer : STR
Returns
-------
Expand Down Expand Up @@ -1128,7 +1139,7 @@ def generate_stage_based_categorical_fim(
with open(full_message_csv_path, newline='') as message_file:
reader = csv.reader(message_file)
for row in reader:
all_messages.append(row)
all_messages.append(row.strip())

# Filter out columns and write out to file
nws_sites_layer = os.path.join(output_mapping_dir, 'nws_lid_sites.gpkg')
Expand All @@ -1140,8 +1151,6 @@ def generate_stage_based_categorical_fim(

FLOG.lprint(f"nws_sites_layer does not exist")

# FIX: (DO WE NEED IT?)

# Write messages to DataFrame, split into columns, aggregate messages.
if len(all_messages) > 0:

Expand Down Expand Up @@ -1493,3 +1502,4 @@ def produce_stage_based_catfim_tifs(

except Exception:
FLOG.critical(traceback.format_exc())

106 changes: 66 additions & 40 deletions tools/generate_categorical_fim_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,34 +87,36 @@ def generate_flows_for_huc(

# Get stages and flows for each threshold from the WRDS API. Priority given to USGS calculated
# flows.

# TODO: Jun 17, 2024 - This gets recalled for every huc but only uses the nws_list.
# Move this somewhere outside the huc list so it doesn't need to be called over and over again

# Careful, for "all_message.append" the syntax into it must be f'{lid}: (whever messages)
# this is gets parsed and logic used against it.

MP_LOG.trace(f'Getting thresholds for {lid}')
stages, flows = get_thresholds(
threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all'
)

if len(stages) == 0 or len(flows) == 0:
message = f'{huc} - {lid}: stages or flows is none, likely WRDS error'
message = f'{lid}: no stages or flows exist, likely WRDS error'
all_messages.append(message)
MP_LOG.warning(message)
# print(message) # TODO: Make verbose option
MP_LOG.warning(f"{huc} - {message}")
continue

# Check if stages are supplied, if not write message and exit.
if all(stages.get(category, None) is None for category in flood_categories):
message = f'{huc} - {lid}: missing threshold stages'
message = f'{lid}: missing threshold stages'
all_messages.append(message)
MP_LOG.warning(message)
# print(message) # TODO: Make verbose option
MP_LOG.warning(f"{huc} - {message}")
continue

# Check if calculated flows are supplied, if not write message and exit.
if all(flows.get(category, None) is None for category in flood_categories):
message = f'{huc} - {lid}: missing calculated flows'
message = f'{lid}: missing calculated flows'
all_messages.append(message)
MP_LOG.warning(message)
# print(message) # TODO: Make verbose option
MP_LOG.warning(f"{huc} - {message}")
continue

# Find lid metadata from master list of metadata dictionaries (line 66).
Expand All @@ -134,7 +136,7 @@ def generate_flows_for_huc(
if not segments or len(segments) == 0:
message = f'{lid}: missing nwm segments'
all_messages.append(message)
MP_LOG.warning(message)
MP_LOG.warning(f"{huc} - {message}")
continue

# For each flood category
Expand All @@ -161,9 +163,9 @@ def generate_flows_for_huc(
flow_info.to_csv(output_file, index=False)

else:
message = f'{huc} - {lid}: magnitude : {category} is missing calculated flow'
message = f'{lid}: {category} is missing calculated flow'
all_messages.append(message)
MP_LOG.warning(message)
MP_LOG.warning(f"{huc} - {message}")

# Get various attributes of the site.
lat = float(metadata['nws_preferred']['latitude'])
Expand Down Expand Up @@ -228,6 +230,7 @@ def generate_flows_for_huc(
huc_messages_txt_file = os.path.join(huc_messages_dir, str(huc) + '_messages.txt')
with open(huc_messages_txt_file, 'w') as f:
for item in all_messages:
item = item.strip()
f.write("%s\n" % item)
MP_LOG.lprint(f'--- generate_flow_for_huc done for {huc}')

Expand Down Expand Up @@ -307,8 +310,9 @@ def generate_flows(
nwm_flows_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows.gpkg'
nwm_flows_df = gpd.read_file(nwm_flows_gpkg)

nwm_flows_alaska_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows_alaska_nwmV3_ID.gpkg'
nwm_flows_alaska_df = gpd.read_file(nwm_flows_alaska_gpkg)
# Jul 3, 2024 - we are skipping Alaska for now.
# nwm_flows_alaska_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows_alaska_nwmV3_ID.gpkg'
# nwm_flows_alaska_df = gpd.read_file(nwm_flows_alaska_gpkg)

all_meta_lists = __load_nwm_metadata(
output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_search, nwm_metafile
Expand All @@ -325,31 +329,14 @@ def generate_flows(
# Get a dictionary of hucs (key) and sites (values) as well as a GeoDataFrame
# of all sites used later in script.

FLOG.lprint("\nStart aggregate_wbd_hucs")
FLOG.lprint("Start aggregate_wbd_hucs")
start_dt = datetime.now(timezone.utc)


# ++++++++++++++++++++++++++++
# TEMP DEBUGGING
is_debug_override = False # If true, it will skip loading the agg from code and load the gpkg in stead
if is_debug_override:
# Ensured I had a 12040101 fim output dir to match this.
# but you might have to run this once to get the agg_wbd that matches for the 12040101
agg_gpkg = os.path.join(output_catfim_dir, "agg_wbd.gpkg")
huc_dictionary = {}
huc_dictionary["12040101"] = ['CFKT2', 'FCWT2', 'HMMT2', 'POET2', 'PTET2']
out_gdf = gpd.read_file(agg_gpkg)
else:
huc_dictionary, out_gdf = aggregate_wbd_hucs(all_meta_lists, WBD_LAYER, True, lst_hucs)

if is_debug_override:
out_gdf.to_file(agg_gpkg, driver='GPKG')
# ++++++++++++++++++++++++++++
huc_dictionary, out_gdf = aggregate_wbd_hucs(all_meta_lists, WBD_LAYER, True, lst_hucs)

end_dt = datetime.now(timezone.utc)
time_duration = end_dt - start_dt
FLOG.lprint(f"End aggregate_wbd_hucs - Duration: {str(time_duration).split('.')[0]}")
print("")

FLOG.lprint("\nStart Flow Generation")

Expand All @@ -361,7 +348,6 @@ def generate_flows(
threshold_url,
all_meta_lists,
nwm_flows_df,
nwm_flows_alaska_df,
)

start_dt = datetime.now(timezone.utc)
Expand All @@ -375,7 +361,7 @@ def generate_flows(
with ProcessPoolExecutor(max_workers=job_number_huc) as executor:
for huc in huc_dictionary:

flows_df = nwm_flows_alaska_df if huc[:2] == '19' else nwm_flows_df
# flows_df = nwm_flows_alaska_df if huc[:2] == '19' else nwm_flows_df
executor.submit(
generate_flows_for_huc,
huc,
Expand All @@ -385,14 +371,14 @@ def generate_flows(
output_flows_dir,
attributes_dir,
huc_messages_dir,
flows_df,
nwm_flows_df,
log_output_file,
child_log_file_prefix,
)
# end ProcessPoolExecutor

# rolls up logs from child MP processes into this parent_log_output_file
MP_LOG.merge_log_files(log_output_file, child_log_file_prefix)
FLOG.merge_log_files(log_output_file, child_log_file_prefix)

end_dt = datetime.now(timezone.utc)
time_duration = end_dt - start_dt
Expand Down Expand Up @@ -425,7 +411,6 @@ def generate_flows(

# Preprocess the out_gdf GeoDataFrame. Reproject and reformat fields.

# TODO: Accomodate AK projection?
viz_out_gdf = out_gdf.to_crs(VIZ_PROJECTION)
viz_out_gdf.rename(
columns={
Expand All @@ -436,6 +421,10 @@ def generate_flows(
inplace=True,
)
viz_out_gdf['nws_lid'] = viz_out_gdf['nws_lid'].str.lower()
FLOG.lprint("+++++++++++++++++++++\nviz_out_gdf part 1 is")
FLOG.lprint(f"len is {len(viz_out_gdf)}")
FLOG.lprint(viz_out_gdf)
FLOG.lprint("+++++++++++++++++++++\n")

# Using list of csv_files, populate DataFrame of all nws_lids that had
# a flow file produced and denote with "mapped" column.
Expand All @@ -444,13 +433,21 @@ def generate_flows(
nws_lids.append(csv_file.split('_attributes')[0])
lids_df = pd.DataFrame(nws_lids, columns=['nws_lid'])
lids_df['mapped'] = 'yes'
FLOG.lprint("nws_lid...")
FLOG.lprint(lids_df)

# Identify what lids were mapped by merging with lids_df. Populate
# 'mapped' column with 'No' if sites did not map.
viz_out_gdf = viz_out_gdf.merge(lids_df, how='left', on='nws_lid')
viz_out_gdf['mapped'] = viz_out_gdf['mapped'].fillna('no')

# Read all messages for all HUCs TODO
FLOG.lprint("+++++++++++++++++++++\nviz_out_gdf part 2 is")
FLOG.lprint(f"len is {len(viz_out_gdf)}")
FLOG.lprint(viz_out_gdf)
FLOG.lprint("+++++++++++++++++++++\nv")


# Read all messages for all HUCs
huc_message_list = []
huc_messages_dir_list = os.listdir(huc_messages_dir)
for huc_message_file in huc_messages_dir_list:
Expand All @@ -459,6 +456,7 @@ def generate_flows(
if full_path_file.endswith('.txt'):
lines = f.readlines()
for line in lines:
line = line.strip()
huc_message_list.append(line)

# Write messages to DataFrame, split into columns, aggregate messages.
Expand All @@ -469,13 +467,40 @@ def generate_flows(
.str.split(':', n=1, expand=True)
.rename(columns={0: 'nws_lid', 1: 'status'})
)
# There could be duplicate message for one ahps (ie. missing nwm segments), so drop dups
messages_df.drop_duplicates(subset="status", keep="first", inplace=True)

# We want one viz_out_gdf record per ahps and if there are more than one, contact the messages

FLOG.lprint("^^^^^^^^^^^^^^^^^^^")
FLOG.lprint("messages_df..")
FLOG.lprint(f"len is: {len(messages_df)}")
FLOG.lprint(messages_df)


#status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index()
#df1 = df.groupby(['ID1','ID2'])['Status'].agg(lambda x: ','.join(x.dropna())).reset_index()
status_df = messages_df.groupby(['nws_lid'])['status'].agg(lambda x: ', '.join(x)).reset_index()

FLOG.lprint("^^^^^^^^^^^^^^^^^^^")
FLOG.lprint("status_df..")
FLOG.lprint(f"len is: {len(status_df)}")
FLOG.lprint(status_df)

status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index()

# Join messages to populate status field to candidate sites. Assign
# status for null fields.
viz_out_gdf = viz_out_gdf.merge(status_df, how='left', on='nws_lid')



viz_out_gdf['status'] = viz_out_gdf['status'].fillna('all calculated flows available')

FLOG.lprint("+++++++++++++++++++++\nviz_out_gdf part 2 is")
FLOG.lprint(f"len is {len(viz_out_gdf)}")
FLOG.lprint(viz_out_gdf)
FLOG.lprint("+++++++++++++++++++++\nv")


# Filter out columns and write out to file
# viz_out_gdf = viz_out_gdf.filter(
Expand Down Expand Up @@ -540,3 +565,4 @@ def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_s
return all_meta_lists

# Can't be used via command line

0 comments on commit ad2eb62

Please sign in to comment.