Skip to content

Commit

Permalink
re-org folders
Browse files Browse the repository at this point in the history
  • Loading branch information
cgentemann committed Sep 25, 2021
1 parent 20a103c commit e110831
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 0 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
350 changes: 350 additions & 0 deletions zarr_meta/era5_AWS_downsample.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "greatest-nerve",
"metadata": {},
"source": [
"# AWS ERA5 data\n",
"\n",
"- remaking ERA5 from AWS zarr store\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "announced-reason",
"metadata": {},
"outputs": [],
"source": [
"#notebook based on zflemings:https://nbviewer.jupyter.org/github/zflamig/dask-era5/blob/main/notebook/era5_fargate_dask.ipynb\n",
"import xarray as xr\n",
"import fsspec\n",
"import dask\n",
"import s3fs\n",
"import os\n",
"\n",
"import numpy as np\n",
"xr.set_options(display_style=\"html\") #display dataset nicely "
]
},
{
"cell_type": "markdown",
"id": "c4f508d4-9474-46d1-a586-231443e1dc5e",
"metadata": {},
"source": [
"- open up cluster in dashboard and connect directly once you know IP\n",
"- not sure why but gateway didn't work for this hub"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "about-allah",
"metadata": {},
"outputs": [],
"source": [
"#from dask_gateway import Gateway\n",
"#from dask.distributed import Client\n",
"#gateway = Gateway()\n",
"#cluster = gateway.new_cluster()\n",
"#cluster.adapt(minimum=1, maximum=70)\n",
"#cluster\n",
"from dask.distributed import Client\n",
"client = Client(\"tcp://127.0.0.1:34163\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cross-dayton",
"metadata": {},
"outputs": [],
"source": [
"def fix_accum_var_dims(ds, var):\n",
" # Some varibles like precip have extra time bounds varibles, we drop them here to allow merging with other variables\n",
" \n",
" # Select variable of interest (drops dims that are not linked to current variable)\n",
" ds = ds[[var]] \n",
"\n",
" if var in ['air_temperature_at_2_metres',\n",
" 'dew_point_temperature_at_2_metres',\n",
" 'air_pressure_at_mean_sea_level',\n",
" 'northward_wind_at_10_metres',\n",
" 'eastward_wind_at_10_metres',\n",
" 'eastward_wind_at_100_metres',\n",
" 'northward_wind_at_100_metres',\n",
" 'lwe_thickness_of_surface_snow_amount',\n",
" 'sea_surface_temperature',\n",
" 'surface_air_pressure',\n",
" 'snow_density']:\n",
" \n",
" ds = ds.rename({'time0':'time','lat':'latitude','lon':'longitude'})\n",
" \n",
" elif var in ['precipitation_amount_1hour_Accumulation',\n",
" 'integral_wrt_time_of_surface_direct_downwelling_shortwave_flux_in_air_1hour_Accumulation',\n",
" 'air_temperature_at_2_metres_1hour_Maximum',\n",
" 'air_temperature_at_2_metres_1hour_Minimum']:\n",
" \n",
" ds = ds.rename({'time1':'time','lat':'latitude','lon':'longitude'})\n",
" \n",
" else:\n",
" print(\"Warning, Haven't seen {var} varible yet! Time renaming might not work.\".format(var=var))\n",
" \n",
" return ds\n",
"\n",
"@dask.delayed\n",
"def s3open(path):\n",
" fs = s3fs.S3FileSystem(anon=True, default_fill_cache=False, \n",
" config_kwargs = {'max_pool_connections': 20})\n",
" return s3fs.S3Map(path, s3=fs)\n",
"\n",
"\n",
"def open_era5_range(start_year, end_year, variables):\n",
" ''' Opens ERA5 monthly Zarr files in S3, given a start and end year (all months loaded) and a list of variables'''\n",
" \n",
" \n",
" file_pattern = 'era5-pds/zarr/{year}/{month}/data/{var}.zarr/'\n",
" \n",
" years = list(np.arange(start_year, end_year+1, 1))\n",
" months = [\"01\", \"02\", \"03\", \"04\", \"05\", \"06\", \"07\", \"08\", \"09\", \"10\", \"11\", \"12\"]\n",
" \n",
" l = []\n",
" for var in variables:\n",
" print('opening',var)\n",
" \n",
" # Get files\n",
" files_mapper = [s3open(file_pattern.format(year=year, month=month, var=var)) for year in years for month in months]\n",
" \n",
" # Look up correct time dimension by variable name\n",
" \n",
" if var in ['precipitation_amount_1hour_Accumulation',\n",
" 'integral_wrt_time_of_surface_direct_downwelling_shortwave_flux_in_air_1hour_Accumulation',\n",
" 'air_temperature_at_2_metres_1hour_Maximum',\n",
" 'air_temperature_at_2_metres_1hour_Minimum']:\n",
" concat_dim='time1'\n",
" else:\n",
" concat_dim='time0'\n",
" \n",
" # Lazy load\n",
" ds = xr.open_mfdataset(files_mapper, engine='zarr', \n",
" concat_dim=concat_dim, combine='nested', \n",
" coords='minimal', compat='override', parallel=True)\n",
" \n",
" # Fix dimension names\n",
" ds = fix_accum_var_dims(ds, var)\n",
" l.append(ds)\n",
" \n",
" ds_out = xr.merge(l)\n",
" \n",
" return ds_out\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "reasonable-smart",
"metadata": {},
"outputs": [],
"source": [
"# set AWS region to access ERA5 data\n",
"s3 = s3fs.S3FileSystem(anon=True, client_kwargs={'region_name':'us-east-1'})\n",
"data_var = s3.ls('era5-pds/zarr/2021/04/data/')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "74b9260d-9e36-4fba-873a-c184d35cff80",
"metadata": {},
"outputs": [],
"source": [
"ddvar = []\n",
"for dvar in data_var:\n",
" print(dvar[27:-5])\n",
" ddvar.append(dvar[27:-5])"
]
},
{
"cell_type": "markdown",
"id": "8669aed5-1a66-437e-8253-d6dcf1b1083b",
"metadata": {},
"source": [
"- it took me a while to get this to run\n",
"- the cluster kept crashing until I just continued breaking it down into smaller and smaller bits\n",
"- subseting to year/month was finally the trick with ~30 workers and 200GB memory being used to find the mean for each month"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "guilty-bloom",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"@dask.delayed\n",
"def downsample(ds):\n",
" ds_month = ds.resample(time='1M').mean(keep_attrs=True)\n",
" ds_month_deg = ds_month.coarsen(latitude=8,longitude=8,boundary=\"trim\").mean(keep_attrs=True)\n",
" return ds_month_deg\n",
"\n",
"for idvar,dvar in enumerate(data_var):\n",
" tem = []\n",
" var = dvar[27:-5]\n",
" fout = './../../data/era5/era5_monthly_2deg'+var+'.nc'\n",
" if os.path.exists(fout):\n",
" continue\n",
" for lyr in range(1979,2021):\n",
" #print(lyr,dvar) \n",
" #if var == 'air_pressure_at_mean_sea_level':\n",
" # continue\n",
" print(lyr,var)\n",
" ds = open_era5_range(lyr,lyr, [var])\n",
" mn = []\n",
" tt= []\n",
" for i in range(12):\n",
" ds_month = ds.sel(time=str(lyr)+'-'+str(i+1).zfill(2)).mean('time',keep_attrs=True)\n",
" ds_month = ds_month.assign_coords({'time':ds.sel(time=str(lyr)+'-'+str(i+1).zfill(2)).time.mean().data})\n",
" ds_month_deg = ds_month.coarsen(latitude=8,longitude=8,boundary=\"trim\").mean(keep_attrs=True)\n",
" ds_month_deg = ds_month_deg.load()\n",
" mn.append(ds_month_deg) \n",
" mn = xr.concat(mn,dim='time')\n",
" tem.append(mn)\n",
" tem2 = xr.concat(tem,dim='time')\n",
" tem2 = tem2.sortby(tem2.latitude)\n",
" #tem2.to_zarr('./../../data/era5/era5_monthly_2deg'+var+'_1990.zarr')\n",
" tem2.to_netcdf('./../../data/era5/era5_monthly_2deg'+var+'.nc')\n",
" print('wrote:', lyr)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "39259c57-51e4-4124-b1b3-191d742d9992",
"metadata": {},
"outputs": [],
"source": [
"for idvar,dvar in enumerate(data_var):\n",
" tem = []\n",
" var = dvar[27:-5]\n",
" fout = './../../data/era5/era5_monthly_2deg'+var+'.nc'\n",
" if os.path.exists(fout):\n",
" ds = xr.open_dataset('./../../data/era5/era5_monthly_2deg'+var+'.nc')\n",
" print(ds.time[-1].data,idvar,var,ds.time[0].data)"
]
},
{
"cell_type": "markdown",
"id": "32df82aa-7a30-42b4-8e30-b88f2d71ff03",
"metadata": {},
"source": [
"- now take all the individual files & merge together"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "29e12edc-8e30-4add-bf00-8215f4b034ea",
"metadata": {},
"outputs": [],
"source": [
"ds_all = []\n",
"for idvar,dvar in enumerate(data_var):\n",
" var = dvar[27:-5]\n",
" file = './../../data/era5/era5_monthly_2deg'+var+'.nc'\n",
" if os.path.exists(fout):\n",
" ds = xr.open_dataset(file)\n",
" if idvar==0:\n",
" ds_all = ds\n",
" else:\n",
" for var in ds:\n",
" ds_all[var]=ds[var]\n",
" \n",
"ds_all.to_netcdf('./../../data/era5/era5_monthly_2deg.nc')\n",
"ds_all"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "growing-young",
"metadata": {},
"outputs": [],
"source": [
"print('ds size in GB {:0.2f}\\n'.format(ds_all.nbytes / 1e9))\n",
"#ds_all.info"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "turkish-arthur",
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"for var in ds_all:\n",
" ds_all[var][-1,:,:].plot()\n",
" plt.show()\n",
" input()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "incorrect-speaker",
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"for var in ds_all:\n",
" ds_all[var].mean({'latitude','longitude'}).plot()\n",
" plt.show()\n",
" input()\n",
" \n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "handled-gibson",
"metadata": {},
"outputs": [],
"source": [
"ds_all.air_temperature_at_2_metres.mean({'latitude','longitude'}).plot()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9b779824-4a16-4137-a78f-c2bf9af06c27",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

0 comments on commit e110831

Please sign in to comment.