-
Notifications
You must be signed in to change notification settings - Fork 0
/
fetch_data.py
160 lines (134 loc) · 5.53 KB
/
fetch_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import argparse
import json
import os
import random
from joblib import Parallel, delayed
from tqdm import tqdm
import ee
import helpers
from pathlib import Path
from datetime import datetime
AREA_DIR = "area"
POI_DIR = "poi"
def parse_args():
parser = argparse.ArgumentParser(
description="Fetches tiles from a Google Earth Engine collection based on time range, "
+ "coverage and geohash level. All tiles within the coverage area will be fetched by default. "
+ "If a list of points of interest is provided, tiles overlapping the points will be clipped to the"
+ "coverage area and fetched."
)
parser.add_argument("--collection", type=str, default=helpers.SENTINEL_2_COLLECTION)
parser.add_argument(
"--channels", nargs="+", type=str, default=helpers.SENTINEL_2_CHANNELS
)
parser.add_argument("--outdir", type=str, default="output")
parser.add_argument("--input_file", type=str)
parser.add_argument("--coverage_file", type=str)
parser.add_argument("--poi_file", type=str, default="")
parser.add_argument("--precision", type=int, default=5)
parser.add_argument("--start_date", type=str, default="2020-01-01")
parser.add_argument("--end_date", type=str, default=datetime.today().strftime("%Y-%m-%d")) # today's date
parser.add_argument("--interval", type=int, default=30)
parser.add_argument("--n_jobs", type=int, default=30)
parser.add_argument("--sampling", type=float, default=1.0)
parser.add_argument("--seed", type=int, default=42)
parser.add_argument(
"--save_requests", dest="save_requests", default=False, action="store_true"
)
parser.add_argument(
"--skip_fetch", dest="skip_fetch", default=False, action="store_true"
)
parser.add_argument("--fetch_latest", dest="fetch_latest", default=False, action="store_true")
return parser.parse_args()
def main():
# initialize earth engine
ee.Initialize()
args = parse_args()
# for now we define bands by collection, but this can be made more general by supplying
# as an argument or via config
bands = helpers.SENTINEL_2_CHANNELS
collection = helpers.SENTINEL_2_COLLECTION
if args.collection == helpers.SENTINEL_2_COLLECTION:
collection = args.collection
bands = helpers.SENTINEL_2_CHANNELS
else:
collection = args.collection
bands = args.channels
# if fetch latest the start date is set to the earliest date in ds
if args.fetch_latest:
args.start_date = helpers.MIN_DS_DATE[args.collection]
requests = []
is_geo_json = False
if args.input_file:
# loads previously saved requests
input_json = json.load(open(args.input_file))
requests = input_json
else:
# loading coverage polygon from geo json file
is_geo_json = True
coverage_geojson = json.load(open(args.coverage_file))
# generate geohashes covered by the AoI
geohashes_aoi = helpers.geohashes_from_geojson_poly(
coverage_geojson, args.precision
)
# generate geohash + intervals, applying sampling
request_data = helpers.generate_fetch_requests(
geohashes_aoi,
args.start_date,
args.end_date,
args.interval,
args.fetch_latest,
args.sampling,
)
if args.poi_file is not None and args.poi_file is not "":
# load points of interest from geo json file
poi_geojson = json.load(open(args.poi_file))
# generate the geohashes containing each PoI, clipped geospatially and
# temporally to the AoI and time bounds
geohashes_poi = helpers.geohashes_from_geojson_points(
geohashes_aoi,
poi_geojson,
args.start_date,
args.end_date,
args.precision,
)
# merge the AoI geohash samples with the PoI data
request_data = helpers.generate_fetch_requests_poi(
request_data,
geohashes_poi,
args.interval,
)
# encode request data as json
for d in request_data:
requests.append(
{
"geohash": d[0],
"poi": d[1],
"date_start": str(d[2][0]),
"date_end": str(d[2][1]),
}
)
# save the metadata associated with the collection and bands we are fetching
helpers.fetch_metadata(args.outdir, collection, bands)
Path(os.path.join(args.outdir, AREA_DIR)).mkdir(parents=True, exist_ok=True)
Path(os.path.join(args.outdir, POI_DIR)).mkdir(parents=True, exist_ok=True)
# save fetched tile info to json if required
if args.save_requests and is_geo_json:
output_path = os.path.join(args.outdir, "requests.json")
with open(output_path, "w+") as json_file:
json.dump(requests, json_file, indent=4)
# Run jobs in parallel
if not args.skip_fetch:
jobs = []
for request in requests:
# generate target path based on POI presence
subdir = POI_DIR if request["poi"] else AREA_DIR
outdir = os.path.join(args.outdir, subdir)
job = delayed(helpers.fetch_tile)(request, outdir, collection, bands)
jobs.append(job)
random.Random(args.seed).shuffle(jobs)
_ = Parallel(
backend="multiprocessing", n_jobs=args.n_jobs, verbose=1, batch_size=4
)(tqdm(jobs))
if __name__ == "__main__":
main()