-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathreader.py
110 lines (80 loc) · 2.97 KB
/
reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
import time
import logging
import requests
from concurrent import futures
from datetime import datetime
from homura import download as fetch
from tempfile import mkdtemp
from collections import OrderedDict
logger = logging.getLogger('landsat8.meta')
class ReachedEndOfProcess(Exception):
pass
def convert_date(value):
return datetime.strptime(value, '%Y-%m-%d').date()
def download_meta(url, download_path):
dpath = download_path if download_path else mkdtemp()
dpath = os.path.join(dpath, 'LANDSAT_8_C1.csv')
# don't download if the file is downloaded in the last 6 hours
if os.path.isfile(dpath):
mtime = os.path.getmtime(dpath)
if time.time() - mtime < (6 * 60 * 60):
return open(dpath, 'r')
fetch(url, dpath)
return open(dpath, 'r')
def row_processor(record, date, dst, writers, **kwargs):
path = os.path.join(dst, str(date.year), str(date.month), str(date.day))
logger.info('processing %s' % record['sceneID'])
for w in writers:
w(path, record, **kwargs)
def csv_reader(dst, writers, start_date=None, end_date=None, url=None,
download=False, download_path=None, num_worker_threads=1, **kwargs):
""" Reads landsat8 metadata from a csv file stored on USGS servers
and applys writer functions on the data """
if not url:
url = 'https://landsat.usgs.gov/landsat/metadata_service/bulk_metadata_files/LANDSAT_8_C1.csv'
# download the whole file
if download:
logger.info('Downloading landsat8 metadata file')
# don't download if the file is downloaded in the last 6 hours
f = download_meta(url, download_path)
liner = f
# or read line by line
else:
logger.info('Streaming landsat8 metadata file')
r = requests.get(url, stream=True)
liner = r.iter_lines()
if start_date:
start_date = convert_date(start_date)
if end_date:
end_date = convert_date(end_date)
header = None
# read the header
line = liner.next()
header = line.split(',')
def gen(line):
row = line.split(',')
for j, v in enumerate(row):
try:
row[j] = float(v)
except ValueError:
pass
# generate the record
record = OrderedDict(zip(header, row))
date = convert_date(record['acquisitionDate'])
# apply filter
# if there is an enddate, stops the process when the end date is reached
if end_date and date > end_date:
return
if start_date and date < start_date:
return
print('sending for row processing')
row_processor(record, date, dst, writers, **kwargs)
if num_worker_threads <= 1:
map(gen, liner)
else:
with futures.ThreadPoolExecutor(max_workers=num_worker_threads) as executor:
try:
executor.map(gen, liner, timeout=10)
except futures.TimeoutError:
print('skipped')