Skip to content

Commit

Permalink
Thalweg Profile Tool and lateral Thalweg Adjustment Threshold
Browse files Browse the repository at this point in the history
Adding a thalweg profile tool to identify significant drops in thalweg elevation. Also setting lateral thalweg adjustment threshold in hydroconditioning.

- thalweg_drop_check.py checks the elevation along the thalweg for each stream path downstream of MS headwaters within a HUC.
- Removing dissolveLinks arg from clip_vectors_to_wbd.py.
- Cleaned up code in split_flows.py to make it more readable.
- Refactored reduce_nhd_stream_density.py and adjust_headwater_streams.py to limit MS headwater points in agg_nhd_headwaters_adj.gpkg.
- Fixed a bug in adjust_thalweg_lateral.py lateral elevation replacement threshold; changed threshold to 3 meters.
- Updated aggregate_vector_inputs.py to log intermediate processes.

This resolves #408, resolves #18, and resolves #409.
  • Loading branch information
BrianAvant committed Jun 21, 2021
1 parent ca8cb7e commit ac4ae06
Show file tree
Hide file tree
Showing 14 changed files with 567 additions and 610 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,25 @@ All notable changes to this project will be documented in this file.
We follow the [Semantic Versioning 2.0.0](http://semver.org/) format.
<br/><br/>

## v3.0.19.1 - 2021-06-17 - [PR #417](https://github.com/NOAA-OWP/cahaba/pull/417)

Adding a thalweg profile tool to identify significant drops in thalweg elevation. Also setting lateral thalweg adjustment threshold in hydroconditioning.

## Additions
- `thalweg_drop_check.py` checks the elevation along the thalweg for each stream path downstream of MS headwaters within a HUC.

## Removals
- Removing `dissolveLinks` arg from `clip_vectors_to_wbd.py`.


## Changes
- Cleaned up code in `split_flows.py` to make it more readable.
- Refactored `reduce_nhd_stream_density.py` and `adjust_headwater_streams.py` to limit MS headwater points in `agg_nhd_headwaters_adj.gpkg`.
- Fixed a bug in `adjust_thalweg_lateral.py` lateral elevation replacement threshold; changed threshold to 3 meters.
- Updated `aggregate_vector_inputs.py` to log intermediate processes.

<br/><br/>

## v3.0.19.0 - 2021-06-10 - [PR #415](https://github.com/NOAA-OWP/cahaba/pull/415)

Feature to evaluate performance of alternative CatFIM techniques.
Expand Down
1 change: 1 addition & 0 deletions config/params_calibrated.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
export negative_burn_value=1000
export agree_DEM_buffer=70
export wbd_buffer=5000
export thalweg_lateral_elev_threshold=3

#### geospatial parameters ####
export max_split_distance_meters=1500
Expand Down
1 change: 1 addition & 0 deletions config/params_template.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
export negative_burn_value=1000
export agree_DEM_buffer=70
export wbd_buffer=5000
export thalweg_lateral_elev_threshold=3

#### geospatial parameters ####
export max_split_distance_meters=1500
Expand Down
8 changes: 4 additions & 4 deletions src/adjust_headwater_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ def adjust_headwaters(huc,nhd_streams,nwm_headwaters,nws_lids,headwater_id):
nws_lid_limited = nws_lid_limited.loc[nws_lid_limited.nws_lid!='']
nws_lid_limited = nws_lid_limited.drop(columns=['nws_lid'])

# Check for issues in nws_lid layer
if len(nws_lid_limited) < len(nws_lids):
missing_nws_lids = list(set(nws_lids.site_id) - set(nws_lid_limited.site_id))
print (f"nws lid(s) {missing_nws_lids} missing from aggregate dataset in huc {huc}")
# Check for issues in nws_lid layer (now this reports back non-headwater nws_lids)
# if len(nws_lid_limited) < len(nws_lids):
# missing_nws_lids = list(set(nws_lids.site_id) - set(nws_lid_limited.site_id))
# print (f"nws lid(s) {missing_nws_lids} missing from aggregate dataset in huc {huc}")

# Combine NWM headwaters and AHPS sites to be snapped to NHDPlus HR segments
headwater_pts = headwater_limited.append(nws_lid_limited)
Expand Down
88 changes: 42 additions & 46 deletions src/adjust_thalweg_lateral.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,115 +7,113 @@
import numpy as np

@profile
def adjust_thalweg_laterally(elevation_raster, stream_raster, allocation_raster, cost_distance_raster, cost_distance_tolerance, dem_lateral_thalweg_adj):
def adjust_thalweg_laterally(elevation_raster, stream_raster, allocation_raster, cost_distance_raster, cost_distance_tolerance, dem_lateral_thalweg_adj,lateral_elevation_threshold):

# ------------------------------------------- Get catchment_min_dict --------------------------------------------------- #
# The following algorithm searches for the zonal minimum elevation in each pixel catchment
# It updates the catchment_min_dict with this zonal minimum elevation value.
@njit
def make_zone_min_dict(elevation_window, zone_min_dict, zone_window, cost_window, cost_tolerance, ndv):
for i,cm in enumerate(zone_window):
for i,elev_m in enumerate(zone_window):
# If the zone really exists in the dictionary, compare elevation values.
i = int(i)
cm = int(cm)
elev_m = int(elev_m)

if (cost_window[i] <= cost_tolerance):
if elevation_window[i] > 0: # Don't allow bad elevation values
if (cm in zone_min_dict):
if (elevation_window[i] < zone_min_dict[cm]):
if (elev_m in zone_min_dict):

if (elevation_window[i] < zone_min_dict[elev_m]):
# If the elevation_window's elevation value is less than the zone_min_dict min, update the zone_min_dict min.
zone_min_dict[cm] = elevation_window[i]
zone_min_dict[elev_m] = elevation_window[i]
else:
zone_min_dict[cm] = elevation_window[i]
zone_min_dict[elev_m] = elevation_window[i]

return(zone_min_dict)

# Open the masked gw_catchments_pixels_masked and dem_thalwegCond_masked.
elevation_raster_object = rasterio.open(elevation_raster)
allocation_zone_raster_object = rasterio.open(allocation_raster)
cost_distance_raster_object = rasterio.open(cost_distance_raster)

meta = elevation_raster_object.meta.copy()
meta['tiled'], meta['compress'] = True, 'lzw'

# -- Create zone_min_dict -- #
print("Create zone_min_dict")
zone_min_dict = typed.Dict.empty(types.int32,types.float32) # Initialize an empty dictionary to store the catchment minimums.
# Update catchment_min_dict with pixel sheds minimum.

for ji, window in elevation_raster_object.block_windows(1): # Iterate over windows, using elevation_raster_object as template.
elevation_window = elevation_raster_object.read(1,window=window).ravel() # Define elevation_window.
zone_window = allocation_zone_raster_object.read(1,window=window).ravel() # Define zone_window.
cost_window = cost_distance_raster_object.read(1, window=window).ravel() # Define cost_window.

# Call numba-optimized function to update catchment_min_dict with pixel sheds minimum.
zone_min_dict = make_zone_min_dict(elevation_window, zone_min_dict, zone_window, cost_window, int(cost_distance_tolerance), meta['nodata'])
# ------------------------------------------------------------------------------------------------------------------------ #

# ------------------------------------------------------------------------------------------------------------------------ #

elevation_raster_object.close()
allocation_zone_raster_object.close()
cost_distance_raster_object.close()


# ------------------------------------------- Assign zonal min to thalweg ------------------------------------------------ #
@njit
def minimize_thalweg_elevation(dem_window, zone_min_dict, zone_window, thalweg_window):

# Copy elevation values into new array that will store the minimized elevation values.
dem_window_to_return = np.empty_like (dem_window)
dem_window_to_return[:] = dem_window


for i,cm in enumerate(zone_window):

for i,elev_m in enumerate(zone_window):
i = int(i)
cm = int(cm)
elev_m = int(elev_m)
thalweg_cell = thalweg_window[i] # From flows_grid_boolean.tif (0s and 1s)
if thalweg_cell == 1: # Make sure thalweg cells are checked.
if cm in zone_min_dict:
zone_min_elevation = zone_min_dict[cm]
if elev_m in zone_min_dict:
zone_min_elevation = zone_min_dict[elev_m]
dem_thalweg_elevation = dem_window[i]
elevation_difference = zone_min_elevation - dem_thalweg_elevation
if zone_min_elevation < dem_thalweg_elevation and elevation_difference <= 5:

elevation_difference = dem_thalweg_elevation - zone_min_elevation

if (zone_min_elevation < dem_thalweg_elevation) and (elevation_difference <= lateral_elevation_threshold):
dem_window_to_return[i] = zone_min_elevation

return(dem_window_to_return)

# Specify raster object metadata.
elevation_raster_object = rasterio.open(elevation_raster)
allocation_zone_raster_object = rasterio.open(allocation_raster)
thalweg_object = rasterio.open(stream_raster)


dem_lateral_thalweg_adj_object = rasterio.open(dem_lateral_thalweg_adj, 'w', **meta)

for ji, window in elevation_raster_object.block_windows(1): # Iterate over windows, using dem_rasterio_object as template.
dem_window = elevation_raster_object.read(1,window=window) # Define dem_window.
window_shape = dem_window.shape
dem_window = dem_window.ravel()

zone_window = allocation_zone_raster_object.read(1,window=window).ravel() # Define catchments_window.
thalweg_window = thalweg_object.read(1,window=window).ravel() # Define thalweg_window.

# Call numba-optimized function to reassign thalweg cell values to catchment minimum value.
minimized_dem_window = minimize_thalweg_elevation(dem_window, zone_min_dict, zone_window, thalweg_window)
minimized_dem_window = minimized_dem_window.reshape(window_shape).astype(np.float32)


dem_lateral_thalweg_adj_object.write(minimized_dem_window, window=window, indexes=1)
dem_lateral_thalweg_adj_object.write(minimized_dem_window, window=window, indexes=1)

elevation_raster_object.close()
allocation_zone_raster_object.close()
cost_distance_raster_object.close()

# Delete allocation_raster and distance_raster.





if __name__ == '__main__':

# Parse arguments.
parser = argparse.ArgumentParser(description='Adjusts the elevation of the thalweg to the lateral zonal minimum.')
parser.add_argument('-e','--elevation_raster',help='Raster of elevation.',required=True)
Expand All @@ -124,11 +122,9 @@ def minimize_thalweg_elevation(dem_window, zone_min_dict, zone_window, thalweg_w
parser.add_argument('-d','--cost_distance_raster',help='Raster of cost distances for the allocation raster.',required=True)
parser.add_argument('-t','--cost_distance_tolerance',help='Tolerance in meters to use when searching for zonal minimum.',required=True)
parser.add_argument('-o','--dem_lateral_thalweg_adj',help='Output elevation raster with adjusted thalweg.',required=True)

parser.add_argument('-th','--lateral_elevation_threshold',help='Maximum difference between current thalweg elevation and lowest lateral elevation in meters.',required=True,type=int)

# Extract to dictionary and assign to variables.
args = vars(parser.parse_args())

adjust_thalweg_laterally(**args)



42 changes: 29 additions & 13 deletions src/aggregate_vector_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def find_nwm_incoming_streams(nwm_streams_,wbd,huc_unit):

# Input wbd
if isinstance(wbd,str):

layer = f"WBDHU{huc_unit}"
wbd = gpd.read_file(wbd, layer=layer)
elif isinstance(wbd,gpd.GeoDataFrame):
Expand Down Expand Up @@ -175,14 +176,16 @@ def find_nwm_incoming_streams(nwm_streams_,wbd,huc_unit):

huc_intersection = gpd.GeoDataFrame({'geometry': intersecting_points, 'NHDPlusID': nhdplus_ids,'mainstem': mainstem_flag},crs=nwm_streams.crs,geometry='geometry')
huc_intersection = huc_intersection.drop_duplicates()

del nwm_streams,wbd

return huc_intersection


def collect_stream_attributes(nhdplus_vectors_dir, huc):

print ('Starting huc: ' + str(huc))
print (f"Starting attribute collection for HUC {huc}",flush=True)

# Collecting NHDPlus HR attributes
burnline_filename = os.path.join(nhdplus_vectors_dir,huc,'NHDPlusBurnLineEvent' + str(huc) + '.gpkg')
vaa_filename = os.path.join(nhdplus_vectors_dir,huc,'NHDPlusFlowLineVAA' + str(huc) + '.gpkg')
Expand Down Expand Up @@ -214,10 +217,10 @@ def collect_stream_attributes(nhdplus_vectors_dir, huc):
nhd_streams.to_file(nhd_streams_agg_fileName,driver=getDriver(nhd_streams_agg_fileName),index=False)
del nhd_streams

print ('finished huc: ' + str(huc))
print (f"finished attribute collection for HUC {huc}",flush=True)

else:
print ('missing data for huc ' + str(huc))
print (f"missing data for HUC {huc}",flush=True)


def subset_stream_networks(args, huc):
Expand All @@ -229,10 +232,11 @@ def subset_stream_networks(args, huc):
nhdplus_vectors_dir = args[4]
nwm_huc4_intersections_filename = args[5]

print("starting HUC " + str(huc),flush=True)
print(f"starting stream subset for HUC {huc}",flush=True)
nwm_headwater_id = 'ID'
ahps_headwater_id = 'nws_lid'
headwater_pts_id = 'site_id'

column_order = ['pt_type', headwater_pts_id, 'mainstem', 'geometry']
nhd_streams_filename = os.path.join(nhdplus_vectors_dir,huc,'NHDPlusBurnLineEvent' + str(huc) + '_agg.gpkg')

Expand All @@ -247,13 +251,20 @@ def subset_stream_networks(args, huc):
huc_mask = huc_mask.reset_index(drop=True)

if len(selected_wbd8.HUC8) > 0:

selected_wbd8 = selected_wbd8.reset_index(drop=True)

# Identify FR/NWM headwaters and subset HR network
nhd_streams_fr = subset_nhd_network(huc,huc_mask,selected_wbd8,nhd_streams_filename,nwm_headwaters_filename,nwm_headwater_id,nwm_huc4_intersections_filename)
try:
nhd_streams_fr = subset_nhd_network(huc,huc_mask,selected_wbd8,nhd_streams_filename,nwm_headwaters_filename,nwm_headwater_id,nwm_huc4_intersections_filename)
except:
print (f"Error subsetting NHD HR network for HUC {huc}",flush=True)

# Identify nhd mainstem streams
nhd_streams_all = subset_nhd_network(huc,huc_mask,selected_wbd8,nhd_streams_fr,ahps_filename,ahps_headwater_id,nwm_huc4_intersections_filename,True)
try:
nhd_streams_all = subset_nhd_network(huc,huc_mask,selected_wbd8,nhd_streams_fr,ahps_filename,ahps_headwater_id,nwm_huc4_intersections_filename,True)
except:
print (f"Error identifing MS network for HUC {huc}",flush=True)

# Identify HUC8 intersection points
nhd_huc8_intersections = find_nwm_incoming_streams(nhd_streams_all,selected_wbd8,8)
Expand All @@ -265,17 +276,17 @@ def subset_stream_networks(args, huc):

# Load nws lids
nws_lids = gpd.read_file(ahps_filename, mask=huc_mask)
nws_lids = nws_lids.drop(columns=['name','nwm_featur'])
nws_lids = nws_lids.drop(columns=['name','nwm_feature_id','usgs_site_code','states','HUC8','is_headwater', 'is_colocated'])
nws_lids = nws_lids.rename(columns={"nws_lid": headwater_pts_id})
nws_lids['pt_type'] = 'nws_lid'
nws_lids['mainstem'] = True

if (len(nwm_headwaters) > 0) or (len(nws_lids) > 0):

# Adjust FR/NWM headwater segments
adj_nhd_streams_all, adj_nhd_headwater_points = adjust_headwaters(huc,nhd_streams_all,nwm_headwaters,nws_lids,headwater_pts_id)

adj_nhd_headwater_points = adj_nhd_headwater_points[column_order]

nhd_huc8_intersections['pt_type'] = 'nhd_huc8_intersections'
nhd_huc8_intersections = nhd_huc8_intersections.rename(columns={"NHDPlusID": headwater_pts_id})
nhd_huc8_intersections = nhd_huc8_intersections[column_order]
Expand All @@ -290,11 +301,14 @@ def subset_stream_networks(args, huc):
adj_nhd_headwater_points_all.to_file(adj_nhd_headwaters_all_fileName,driver=getDriver(adj_nhd_headwaters_all_fileName),index=False)

del adj_nhd_streams_all, adj_nhd_headwater_points_all

else:
print (f"skipping headwater adjustments for HUC: {huc}")

print (f"skipping headwater adjustments for HUC {huc}")

del nhd_streams_fr

print(f"finished stream subset for HUC {huc}",flush=True)

def aggregate_stream_networks(nhdplus_vectors_dir,agg_nhd_headwaters_adj_fileName,agg_nhd_streams_adj_fileName,huc_list):

Expand Down Expand Up @@ -330,6 +344,7 @@ def aggregate_stream_networks(nhdplus_vectors_dir,agg_nhd_headwaters_adj_fileNam
def clean_up_intermediate_files(nhdplus_vectors_dir):

for huc in os.listdir(nhdplus_vectors_dir):

agg_path= os.path.join(nhdplus_vectors_dir,huc,'NHDPlusBurnLineEvent' + str(huc) + '_agg.gpkg')
streams_adj_path= os.path.join(nhdplus_vectors_dir,huc,'NHDPlusBurnLineEvent' + str(huc) + '_adj.gpkg')
headwater_adj_path= os.path.join(nhdplus_vectors_dir,huc,'nhd' + str(huc) + '_headwaters_adj.gpkg')
Expand Down Expand Up @@ -385,18 +400,19 @@ def clean_up_intermediate_files(nhdplus_vectors_dir):
if not os.path.isfile(streams_adj_path):
missing_subsets = missing_subsets + [huc]

print (f"running subset_results on {len(missing_subsets)} HUC4s")
print (f"Subsetting stream network for {len(missing_subsets)} HUC4s")
num_workers=11

with ProcessPoolExecutor(max_workers=num_workers) as executor:
# Preprocess nhd hr and add attributes
# collect_attributes = [executor.submit(collect_stream_attributes, nhdplus_vectors_dir, str(huc)) for huc in huc_list]
collect_attributes = [executor.submit(collect_stream_attributes, nhdplus_vectors_dir, str(huc)) for huc in huc_list]
# Subset nhd hr network
subset_results = [executor.submit(subset_stream_networks, subset_arg_list, str(huc)) for huc in missing_subsets]

# del wbd4,wbd8
del wbd4,wbd8

# Aggregate fr and ms nhd netowrks for entire nwm domain
# Aggregate subset nhd networks for entire nwm domain
print ('Aggregating subset NHD networks for entire NWM domain')
aggregate_stream_networks(nhdplus_vectors_dir,agg_nhd_headwaters_adj_fileName,agg_nhd_streams_adj_fileName,huc_list)

# Remove intermediate files
Expand Down
Loading

0 comments on commit ac4ae06

Please sign in to comment.