Skip to content

Commit

Permalink
Merge pull request #55 from tsdataclinic/pipeline_improvements_october
Browse files Browse the repository at this point in the history
Pipeline improvements october
  • Loading branch information
kaushik12 authored Oct 17, 2023
2 parents 7477188 + 082eecc commit 219d31e
Show file tree
Hide file tree
Showing 13 changed files with 84,944 additions and 18,819 deletions.
8 changes: 6 additions & 2 deletions analysis/src/config.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
{ "base_path": "/home/data/test",
"transit_land_api_key": "<enter_key_here>",
"transit_land_api_key": "",
"national":
{
"poi_url":"https://geonames.usgs.gov/docs/stategaz/NationalFile.zip",
"flood_risk_path":"/home/data/test/national/fsf_flood_tract_summary.csv",
"fsf_climate_risk" : {
"flood":"/home/data/test/national/fsf_flood_tract_summary.csv",
"fire":"/home/data/test/national/fsf_fire_tract_summary.csv",
"heat":"/home/data/test/national/fsf_heat_tract_summary.csv"
},
"svi_path":"/home/data/test/national/SVI2020_US.csv"
},

Expand Down
3 changes: 1 addition & 2 deletions analysis/src/data/get_osm_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import pickle
import os
import json
ox.config(log_console=True)

ox.settings.log_console=True

def get_walk_graph(geo_file_path, network_type='walk'):
"""
Expand Down
13 changes: 10 additions & 3 deletions analysis/src/features/build_stop_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@

COLUMNS_TO_KEEP = ["stop_id",
"stop_name",
"agency_ids_serviced",
"agencies_serviced",
"city",
"route_type",
"routes_serviced",
"flood_risk_category",
"flood_risk_score",
"heat_risk_category",
"heat_risk_score",
"fire_risk_category",
"fire_risk_score",
"job_access_category",
"jobs_access_count",
"worker_vulnerability_category",
Expand All @@ -41,10 +47,11 @@ def add_fs_flood_risk(stops, config):
"""
fsf_feature = process_fsf(config)

stops = stops.merge(fsf_feature[['GEOID','risk_score','pct_moderate_plus']],how='left',
stops = stops.merge(fsf_feature[['GEOID','flood_risk_score','flood_pct_moderate_plus','flood_risk_category',
'heat_risk_score','heat_pct_moderate_plus','heat_risk_category',
'fire_risk_score','fire_pct_moderate_plus','fire_risk_category',
]],how='left',
left_on = "GEOID_2020", right_on = "GEOID")
stops['flood_risk_category'] = pd.qcut(stops['risk_score'],3,labels=False,duplicates='drop')
stops = stops.rename(columns={'risk_score':'flood_risk_score'})
return stops


Expand Down
44 changes: 25 additions & 19 deletions analysis/src/process/process_FEMA_floodmaps.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import geopandas as geopd
import json
import subprocess
import os
import pandas as pd
import requests
import os
import zipfile
import argparse
Expand All @@ -26,20 +22,10 @@ def find_largest_file(directory):

return largest_file

def main():
parser = argparse.ArgumentParser("Process FEMA floodmaps")
parser.add_argument("--config", required=True)
parser.add_argument("--city", required=True)

opts = parser.parse_args()

with open(opts.config) as f:
config = json.load(f)
city = opts.city

def process_floodmap(config, city_key):
base_path = config["base_path"]
FEMA_file_prefix = config[city]["FEMA_file_name"]
city_path = f"{base_path}/cities/{city}"
FEMA_file_prefix = config[city_key]["FEMA_file_name"]
city_path = f"{base_path}/cities/{city_key}"
zip_path = f"{base_path}/national/floodmaps/{FEMA_file_prefix}.zip"
extracted_folder_path = f"{city_path}/floodmaps/{FEMA_file_prefix}"
gdb_folder_path = f"{extracted_folder_path}/{FEMA_file_prefix}.gdb"
Expand All @@ -52,14 +38,34 @@ def main():

FEMA_flood = geopd.read_file(gdb_table_to_read)
FEMA_flood = FEMA_flood.to_crs(4326)
FEMA_flood["geometry"] = FEMA_flood.simplify(0.0001)
FEMA_flood["geometry"] = FEMA_flood.simplify(0.0001).buffer(0)

tracts_combined = geopd.GeoDataFrame({'geometry' : geopd.read_file(tract_path).unary_union}, index=[0]).set_crs(4326)
tracts_combined = geopd.GeoDataFrame({'geometry' : [geopd.read_file(tract_path).unary_union]}, index=[0]).set_crs(4326)

out = FEMA_flood.overlay(tracts_combined)
out.to_file(f"{city_path}/floodmaps/processed_fema.geojson")
print(f"Processed FEMA floodmap data written to: {city_path}/floodmaps/processed_fema.geojson")

def process_fema(config, city_key):
if "FEMA_file_name" in config.get(city_key, {}):
process_floodmap(config, city_key)
else:
print(f"Skipping {city_key}: 'FEMA_file_name' not found in config.")


def main():
parser = argparse.ArgumentParser("Process FEMA floodmaps")
parser.add_argument("--config", required=True)
parser.add_argument("--city", required=True)

opts = parser.parse_args()

with open(opts.config) as f:
config = json.load(f)
city_key = opts.city

process_fema(config, city_key)


if __name__ == "__main__":
main()
5 changes: 4 additions & 1 deletion analysis/src/process/process_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from process.process_stops import process_stops
from process.process_hospitals import process_hospitals
from process.process_walksheds import process_walksheds
from process.process_FEMA_floodmaps import process_fema
import json
import argparse

Expand All @@ -13,7 +14,9 @@ def process_data(config, city_key):
process_hospitals(config, city_key,out=True)
print("Processing Walksheds")
process_walksheds(config, city_key)

print("Processing FEMA floodmaps")
process_fema(config, city_key)


def main():
parser = argparse.ArgumentParser("Process all data")
Expand Down
102 changes: 58 additions & 44 deletions analysis/src/process/process_fsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import argparse
import json
from functools import reduce

def process_fsf(config):
"""
Expand All @@ -18,52 +19,65 @@ def process_fsf(config):
----------
DataFrame with the following features:
- GEOID: 2020 Census Tract ID
- pct_{}: % of properties with minor, moderate, major, severe, and extreme flooding risk
- risk_score: Aggregate flood risk score ranging from 1-10
- pct_moderate_plus: % of propoerties in tract with moderate or higher flooding risk
- pct_{}: % of properties with minor, moderate, major, severe, and extreme climate risk
- risk_score: Aggregate climate risk score ranging from 1-10
- pct_moderate_plus: % of propoerties in tract with moderate or higher climate risk
- risk_category: pct_moderate_plus categorized into 3 bins
"""
path = config["national"]["flood_risk_path"]

fsf = pd.read_csv(path)
paths = config["national"]["fsf_climate_risk"]

fsf["GEOID"] = fsf["fips"].astype(str).str.zfill(11)
fsf = fsf.drop("fips", axis = 1)

fsf[['count_floodfactor1', 'count_floodfactor2',
'count_floodfactor3', 'count_floodfactor4', 'count_floodfactor5',
'count_floodfactor6', 'count_floodfactor7', 'count_floodfactor8',
'count_floodfactor9', 'count_floodfactor10']] = fsf[['count_floodfactor1', 'count_floodfactor2',
'count_floodfactor3', 'count_floodfactor4', 'count_floodfactor5',
'count_floodfactor6', 'count_floodfactor7', 'count_floodfactor8',
'count_floodfactor9', 'count_floodfactor10']].div(fsf.count_property, axis=0)

fsf['pct_minor'] = fsf['count_floodfactor1'] + fsf['count_floodfactor2']
fsf['pct_moderate'] = fsf['count_floodfactor3'] + fsf['count_floodfactor4']
fsf['pct_major'] = fsf['count_floodfactor5'] + fsf['count_floodfactor6']
fsf['pct_severe'] = fsf['count_floodfactor7'] + fsf['count_floodfactor8']
fsf['pct_extreme'] = fsf['count_floodfactor9'] + fsf['count_floodfactor10']

fsf_sum = fsf[['GEOID','count_floodfactor1', 'count_floodfactor2',
'count_floodfactor3', 'count_floodfactor4', 'count_floodfactor5',
'count_floodfactor6', 'count_floodfactor7', 'count_floodfactor8',
'count_floodfactor9', 'count_floodfactor10']].copy(deep=True)

fsf = fsf.drop(columns=['count_floodfactor1', 'count_floodfactor2',
'count_floodfactor3', 'count_floodfactor4', 'count_floodfactor5',
'count_floodfactor6', 'count_floodfactor7', 'count_floodfactor8',
'count_floodfactor9', 'count_floodfactor10'])

fsf_sum = pd.wide_to_long(fsf_sum, stubnames='count_floodfactor', i='GEOID', j='risk').reset_index()
fsf_sum['risk_score'] = fsf_sum['risk']*fsf_sum['count_floodfactor']
fsf_sum = fsf_sum.groupby('GEOID').sum().reset_index()

fsf = fsf.merge(fsf_sum[['GEOID','risk_score']], how='left', on='GEOID')

fsf["pct_moderate_plus"] = fsf["pct_moderate"] + fsf["pct_major"] + fsf["pct_severe"] + fsf["pct_extreme"]
fsf["pct_moderate_plus"] = fsf["pct_moderate_plus"].fillna(0)
fsf["risk_category"] = pd.qcut(fsf["pct_moderate_plus"], 3, labels=False, duplicates='drop')
fsf_risk_dfs = []
for risk in paths.keys():
print(f"Processing {risk} risk")
fsf = pd.read_csv(paths[risk])

fsf["GEOID"] = fsf["fips"].astype(str).str.zfill(11)
fsf = fsf.drop("fips", axis = 1)

fsf[[f'count_{risk}factor1', f'count_{risk}factor2',
f'count_{risk}factor3', f'count_{risk}factor4', f'count_{risk}factor5',
f'count_{risk}factor6', f'count_{risk}factor7', f'count_{risk}factor8',
f'count_{risk}factor9', f'count_{risk}factor10']] = fsf[[f'count_{risk}factor1', f'count_{risk}factor2',
f'count_{risk}factor3', f'count_{risk}factor4', f'count_{risk}factor5',
f'count_{risk}factor6', f'count_{risk}factor7', f'count_{risk}factor8',
f'count_{risk}factor9', f'count_{risk}factor10']].div(fsf.count_property, axis=0)

fsf[f'{risk}_pct_minor'] = fsf[f'count_{risk}factor1'] + fsf[f'count_{risk}factor2']
fsf[f'{risk}_pct_moderate'] = fsf[f'count_{risk}factor3'] + fsf[f'count_{risk}factor4']
fsf[f'{risk}_pct_major'] = fsf[f'count_{risk}factor5'] + fsf[f'count_{risk}factor6']
fsf[f'{risk}_pct_severe'] = fsf[f'count_{risk}factor7'] + fsf[f'count_{risk}factor8']
fsf[f'{risk}_pct_extreme'] = fsf[f'count_{risk}factor9'] + fsf[f'count_{risk}factor10']

fsf_sum = fsf[['GEOID',f'count_{risk}factor1', f'count_{risk}factor2',
f'count_{risk}factor3', f'count_{risk}factor4', f'count_{risk}factor5',
f'count_{risk}factor6', f'count_{risk}factor7', f'count_{risk}factor8',
f'count_{risk}factor9', f'count_{risk}factor10']].copy(deep=True)

fsf = fsf.drop(columns=[f'count_{risk}factor1', f'count_{risk}factor2',
f'count_{risk}factor3', f'count_{risk}factor4', f'count_{risk}factor5',
f'count_{risk}factor6', f'count_{risk}factor7', f'count_{risk}factor8',
f'count_{risk}factor9', f'count_{risk}factor10'])

fsf_sum = pd.wide_to_long(fsf_sum, stubnames=f'count_{risk}factor', i='GEOID', j='risk').reset_index()
fsf_sum[f'{risk}_risk_score'] = fsf_sum['risk']*fsf_sum[f'count_{risk}factor']
fsf_sum = fsf_sum.groupby('GEOID').sum().reset_index()

fsf = fsf.merge(fsf_sum[['GEOID',f'{risk}_risk_score']], how='left', on='GEOID')

fsf[f"{risk}_pct_moderate_plus"] = fsf[f"{risk}_pct_moderate"] + fsf[f"{risk}_pct_major"] + fsf[f"{risk}_pct_severe"] + fsf[f"{risk}_pct_extreme"]
fsf[f"{risk}_pct_moderate_plus"] = fsf[f"{risk}_pct_moderate_plus"].fillna(0)
fsf[f"{risk}_risk_category"] = pd.qcut(fsf[f"{risk}_pct_moderate_plus"], 3, labels=False, duplicates='drop')
# fsf["risk_category"] = fsf.risk_category.cat.codes
fsf_risk_dfs.append(fsf)


return fsf
fsf_risk_features = reduce(lambda df1,df2: pd.merge(df1,df2,on='GEOID'), fsf_risk_dfs)
return fsf_risk_features

if __name__ == "__main__":
parser = argparse.ArgumentParser("Run fsf data processing")
parser.add_argument("--config", required=True)
opts = parser.parse_args()

with open(opts.config) as f:
config = json.load(f)
process_fsf(config)
2 changes: 1 addition & 1 deletion analysis/src/process/process_hospitals.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def subset_hospital_points(points, extent, county_list=[]):
## Excluding histaorical locations
hospitals_gdf = hospitals_gdf[~hospitals_gdf.FEATURE_NAME.str.contains('historical')]

hospitals_gdf_extent = hospitals_gdf.sjoin(extent, how='inner')
hospitals_gdf_extent = hospitals_gdf.sjoin(extent.to_crs(hospitals_gdf.crs), how='inner')
if len(county_list) > 0:
hospitals_gdf_extent = hospitals_gdf_extent[hospitals_gdf_extent.COUNTY_NUMERIC.isin(county_list)]

Expand Down
26 changes: 20 additions & 6 deletions analysis/src/process/process_stops.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,16 @@ def make_stops(folder_path):
trips = pd.read_csv(folder_path + "/trips.txt", dtype = "str")
stop_times = pd.read_csv(folder_path + "/stop_times.txt", dtype = "str")

if 'agency_id' in routes.columns:
agencies = pd.read_csv(folder_path + "/agency.txt", dtype = "str")
routes = routes.merge(agencies[["agency_id", "agency_name"]])
else:
routes["agency_id"] = feed_name.upper()
routes["agency_name"] = feed_name.upper()

# Get route types
routes["route_id"] = routes["route_id"].astype(str)
routes_types = routes[["route_id", "route_type"]].drop_duplicates()
routes_types = routes[["route_id", "route_type", "agency_id", "agency_name"]].drop_duplicates()
routes_types = routes_types.replace({"route_type" : ROUTE_DICT})

# Get most common services for each line
Expand All @@ -50,7 +57,7 @@ def make_stops(folder_path):

# Final df
trips_with_stops = trips_to_include.merge(stop_times)
stops_with_trips = trips_with_stops.merge(stops).merge(routes_types)[["route_id", "stop_id", "route_type", "stop_name", "stop_lat", "stop_lon"]].drop_duplicates().reset_index(drop = True)
stops_with_trips = trips_with_stops.merge(stops).merge(routes_types)[["route_id", "stop_id", "route_type", "stop_name", "stop_lat", "stop_lon", "agency_id", "agency_name"]].drop_duplicates().reset_index(drop = True)

# Add geometry
stops_with_trips["geometry"] = geopd.points_from_xy(stops_with_trips.stop_lon, stops_with_trips.stop_lat, crs="EPSG:4326")
Expand Down Expand Up @@ -79,7 +86,7 @@ def tag_with_tracts(stops, tracts_path):
tracts_2010 = geopd.read_file(tracts_path + "tracts_2010.geojson")[["GEOID", "geometry"]].rename({"GEOID" : "GEOID_2010"}, axis = 1)
tracts_2020 = geopd.read_file(tracts_path + "tracts.geojson")[["GEOID", "geometry"]].rename({"GEOID" : "GEOID_2020"}, axis = 1)

return stops.overlay(tracts_2010).overlay(tracts_2020)
return stops.overlay(tracts_2010.to_crs(stops.crs)).overlay(tracts_2020.to_crs(stops.crs))


def process_stops(config, city_key, out=False):
Expand Down Expand Up @@ -114,11 +121,18 @@ def process_stops(config, city_key, out=False):

# Routes as list
routes_list = stops_out.groupby('stop_id')['route_id'].apply(list).reset_index().rename(columns={'route_id':'routes_serviced'})


agencies_list = stops_out.groupby('stop_id')['agency_id'].apply(list).reset_index().rename(columns={'agency_id':'agency_ids_serviced'})
agencies_name_list = stops_out.groupby('stop_id')['agency_name'].apply(list).reset_index().rename(columns={'agency_name' : 'agencies_serviced'})

stops_out = stops_out.merge(routes_list,how='left',on='stop_id')
stops_out = stops_out.merge(agencies_list,how='left',on='stop_id')
stops_out = stops_out.merge(agencies_name_list,how='left',on='stop_id')

stops_out['routes_serviced'] = [','.join(map(str, list(set(l)))) for l in stops_out['routes_serviced']]
stops_out = stops_out.drop("route_id", axis = 1).drop_duplicates(subset=['stop_id']).reset_index().drop("index", axis = 1)
stops_out['agency_ids_serviced'] = [','.join(map(str, list(set(l)))) for l in stops_out['agency_ids_serviced']]
stops_out['agencies_serviced'] = [','.join(map(str, list(set(l)))) for l in stops_out['agencies_serviced']]

stops_out = stops_out.drop(["route_id", "agency_id", "agency_name"], axis = 1).drop_duplicates(subset=['stop_id']).reset_index().drop("index", axis = 1)


stops_out = stops_out.merge(feed_city_mapping, how='left', on='feed_name')
Expand Down
Loading

0 comments on commit 219d31e

Please sign in to comment.