-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfile_fetcher.py
168 lines (140 loc) · 5.8 KB
/
file_fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import abc
import asyncio
import io
import os
from pathlib import Path
from typing import Union
from .utils import misc, network, unzip
class FileFetcher(metaclass=abc.ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
return (
hasattr(subclass, "fetch_file")
and callable(subclass.fetch_file)
and hasattr(subclass, "fetch_files")
and callable(subclass.fetch_files)
or hasattr(subclass, "fetch_large_file")
and callable(subclass.fetch_large_file)
or NotImplemented
)
@abc.abstractmethod
def fetch_file(
self,
url: str,
filepath: str,
filename: Union[str, None] = None,
etag: Union[str, None] = None,
auto_unzip: bool = True,
):
"""download a file from "url" and save to "filepath"
You can give a new "filename" for the file.
If "etag" is given, check if etag has changed. If not changed, do not download again.
:param url: the url to download file from
:param filepath: location to keep the file
:param filename: new file name (optional)
:param etag: old etag. if the old etag is the same with the one on server, do not download again.
:param auto_unzip: bool flag to indicate if unzip .zip file automatically
:returns: new etag
"""
raise NotImplementedError
@abc.abstractmethod
def fetch_files(
self,
urls,
filepaths,
filenames=[],
etags=[],
auto_unzip: bool = True,
):
"""fetch multiple files concurrently
:param urls: the urls to download files from
:param filepaths: location(s) to keep the files. This can be one path for all files or one path for each file.
:param filenames: new file names (optional)
:param etags: old etags. if the old etag is the same with the one on server, do not download again.
:param auto_unzip: bool flag to indicate if unzip .zip file automatically
"""
raise NotImplementedError
@abc.abstractmethod
def _run_fetch_large_file(self, loop, url, filesize, data):
raise NotImplementedError
def fetch_large_file(
self,
url: str,
filepath: str,
filename: Union[str, None],
filesize: Union[int, None] = None,
etag: Union[str, None] = None,
auto_unzip: bool = True,
check_etag: bool = True,
timeout=(None, None),
):
"""use multi-thread to fetch a large file.
LOOK HERE!!!
Be careful when use this function. You cannot get partial content if the content is gzip encoded.
So the file might be larger than the one download directly.
It is useful when downloading large .zip file.
Warning: this could be slower than single thread download.
Some firewall sequences these requests to shape network traffic and defeat the purpose of this function completely.
check the etag and get content-length before calling this function
:param url: the file url
:param filepath: location to keep the file
:param filename: the output filename specified by caller. ignored if auto unzip compressed data.
:param filesize: the size of file (in bytes)
:param etag: old etag. if the old etag is the same with the one on server, do not download again.
:param auto_unzip: bool flag to indicate if unzip .zip file automatically
:returns: new etag
"""
if filesize is None or check_etag:
headers = network.get_headers(url, timeout=timeout)
file_size = network.get_content_length(headers)
new_etag = network.get_etag(headers)
# if the etags are the same, do not download again
if etag and etag == new_etag:
return etag
else:
# if the caller has provided file size and checked etag
file_size = filesize
new_etag = etag
# create folder to keep the file
if os.path.isfile(filepath):
raise Exception(
f"The 'filepath' is in fact a file. The 'filepath' should be a folder path(non-exist is fine). {filepath}"
)
Path(filepath).mkdir(parents=True, exist_ok=True)
# set up concurrent functions
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
data = [io.BytesIO()]
try:
self._run_fetch_large_file(loop, url, file_size, data)
except RuntimeError:
import nest_asyncio
nest_asyncio.apply()
self._run_fetch_large_file(loop, url, file_size, data)
except Exception as e:
misc.print_error("Failed to fetch large file!")
raise Exception("Failed to fetch large file!") from e
finally:
loop.close()
data[0].seek(0)
if not filename:
filename = url.split("/")[-1] # use the filename in the url
# save the file
if auto_unzip:
try:
unzip.save_compressed_data(url, data[0], filepath)
except Exception as ex:
print(ex)
print("failed to save zip. try save directly")
data[0].seek(0)
self._save_file(filepath, filename, data[0].read())
else:
self._save_file(filepath, filename, data[0].read())
return new_etag
def _save_file(self, filepath, filename, data):
"""helper function to save file to hard drive"""
Path(filepath).mkdir(parents=True, exist_ok=True)
if os.path.isfile(f"{filepath}/{filename}"):
print(f"Warning: overwriting {filename}")
with open(f"{filepath}/{filename}", "wb+") as of:
of.write(data)