Skip to content

Commit 34a95b7

Browse files
committed
Fastly logging to CIT /data/logs_archive
ARXIVCE-1120
1 parent edc4ac5 commit 34a95b7

File tree

4 files changed

+252
-127
lines changed

4 files changed

+252
-127
lines changed

arxiv/ops/fastly_gs_logs_to_cit.py

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
r"""Script to get Fastly logs from GS and move them to a local file.
2+
3+
To use this:
4+
5+
cd arxiv-base
6+
poetry install
7+
export GOOGLE_APPLICATION_CREDENTIALS=~/somefile.json # needs read object permission on fastly log bucket
8+
poetry python arxiv/ops/fastly_gs_logs_to_cit.py --help
9+
10+
"""
11+
import heapq
12+
import logging
13+
import os
14+
import re
15+
import tempfile
16+
from datetime import date, timedelta
17+
from datetime import datetime
18+
from pathlib import Path
19+
from typing import Optional, List
20+
21+
import google.auth
22+
from google.cloud import storage
23+
24+
import fire
25+
26+
logging.basicConfig()
27+
logger = logging.getLogger(__file__)
28+
logger.setLevel(logging.INFO)
29+
30+
TMP_DIR="/tmp" # needs reasonable temp FS, 600MB of work space
31+
32+
DEFAULT_BUCKET="arxiv-logs-archive"
33+
DEFAULT_KEY="site/fastly/" + datetime.utcnow().strftime("%Y/fastly_access_log.%Y-%m-%dT%H")
34+
35+
_previous_hour = datetime.utcnow().replace(minute=00) - timedelta(hours=1)
36+
UTC_NOW = _previous_hour.strftime("%Y-%m-%dT%H")
37+
UTC_DATE = _previous_hour.date()
38+
UTC_HOUR = _previous_hour.hour
39+
40+
DEFAULT_OUT_DIR=f"/data/logs_archive/site/fastly/{_previous_hour.year}/"
41+
42+
TIME_PATTERN=r"\[([^\]]+)\]"
43+
"""Pattern to caputre time from a log line
44+
Ex.
45+
180.66.144.48 180.66.144.48 - | [29/Dec/2023:01:57:01 +0000] [Mozilla/etc...
46+
"""
47+
48+
TIME_STRP = "%d/%b/%Y:%H:%M:%S %z"
49+
50+
def _filename_only(name: str) -> str:
51+
return name.split("/")[-1]
52+
53+
54+
def _keyed(line: str):
55+
match = re.search(TIME_PATTERN, line)
56+
if match:
57+
return datetime.strptime(match.group(1), TIME_STRP), line
58+
else:
59+
return None
60+
61+
def _invert_keyed(data) -> str:
62+
return data[1]
63+
64+
def k_way_merge(
65+
in_files: List[Path],
66+
out_file: Path) -> None:
67+
"""
68+
Outline: open all N files,
69+
until done:
70+
read 1 line from each and put on heap, save one line from heap
71+
72+
Parameters
73+
----------
74+
files: Paths to read from
75+
output: Path to write output to
76+
"""
77+
78+
#files = [gzip.open(filename) for filename in in_files]
79+
files = [open(filename) for filename in in_files]
80+
line_heap = []
81+
heapq.heapify(line_heap)
82+
with open(out_file, 'w') as out_fh:
83+
try:
84+
while files:
85+
for i, file in enumerate(files): # get a line from each file
86+
line = file.readline()
87+
if not line: # handle file is out of lines
88+
file.close()
89+
files.pop(i)
90+
continue
91+
else:
92+
with_key = _keyed(line.strip())
93+
if with_key is None:
94+
print("Warning, no date found in log line, skipping.")
95+
else:
96+
heapq.heappush(line_heap, with_key)
97+
98+
# now that we have at least one line from each file, get the earliest
99+
earliest_line = _invert_keyed(heapq.heappop(line_heap))
100+
out_fh.write( earliest_line + '\n')
101+
102+
while line_heap: # files empty but still lines in line_heap
103+
earliest_line = _invert_keyed(heapq.heappop(line_heap))
104+
out_fh.write(earliest_line + '\n')
105+
finally:
106+
[f.close() for f in files]
107+
108+
109+
def download_files(verbose: bool = False,
110+
bucket: str = DEFAULT_BUCKET,
111+
date: date = UTC_DATE,
112+
hour: int = UTC_HOUR,
113+
key_pattern: Optional[str] = None,
114+
max: int = 0,
115+
out_dir: Path = Path(TMP_DIR)) -> List[Path]:
116+
"""Gets fastly log files for the last hour, combines them into a single file and saves that.
117+
118+
Parameters
119+
----------
120+
verbose: if verbose or not
121+
bucket: the GS bucket. Should be a string without the "gs://"
122+
key_pattern: The key pattern for the files. Should not have a leading /.
123+
out_dir: Output directory to save log files in.
124+
125+
Returns
126+
-------
127+
Path
128+
"""
129+
if key_pattern is None:
130+
key_pattern = f"site/fastly/{date.year}/fastly_access_log.{date.strftime('%Y-%m-%d')}T{hour}"
131+
132+
logger.debug(f"Getting logs for gs://{bucket}/{key_pattern}")
133+
logger.debug(f"Writing to {out_dir}")
134+
out_dir.mkdir(exist_ok=True)
135+
credentials, project_id = google.auth.default()
136+
if hasattr(credentials, "service_account_email"):
137+
logger.debug(f"Using service account: {credentials.service_account_email}")
138+
else:
139+
logger.warning("WARNING: no service account credentials.")
140+
141+
client = storage.Client()
142+
blobs = client.list_blobs(DEFAULT_BUCKET, prefix=key_pattern)
143+
files: List[Path] = []
144+
for i, blob in enumerate(blobs):
145+
if max and i > max:
146+
break
147+
dl_name = out_dir / _filename_only(blob.name)
148+
unzip_name = out_dir / str(dl_name.name).removesuffix('.gz')
149+
blob.download_to_filename(dl_name)
150+
logger.debug(f"Downloaded {dl_name}")
151+
if dl_name.suffix == ".gz":
152+
logger.debug(f"Will ungzip to {unzip_name}")
153+
os.system(f"gunzip --stdout \"{dl_name}\" | sort -o \"{unzip_name}\"")
154+
dl_name.unlink()
155+
files.append(unzip_name)
156+
else:
157+
files.append(dl_name)
158+
logger.debug(f"downloaded {len(files)} saved to {out_dir}")
159+
return files
160+
161+
def sort_files_by_time(files: list[Path]) -> list[Path]:
162+
"""The logs from fastly are not sorted."""
163+
out_files = []
164+
for i, file in enumerate(files):
165+
tmpf = file.with_name(file.name + ".sorted")
166+
with open(file, 'r') as infile, open(tmpf, 'w') as outfile:
167+
sorted_lines = []
168+
lines = infile.readlines()
169+
for line in lines:
170+
match = re.search(TIME_PATTERN, line,)
171+
if not match:
172+
logger.warning("no time found in log line during sorting")
173+
continue
174+
sorted_lines.append( (match.group(1), line))
175+
sorted_lines.sort()
176+
outfile.writelines([item[1] for item in sorted_lines])
177+
out_files.append(tmpf)
178+
file.unlink()
179+
return out_files
180+
181+
182+
def get_fastly_logs(date_of_logs: date = _previous_hour.date(),
183+
hour: int = _previous_hour.hour,
184+
out_file: Path = None,
185+
tmp_dir: Path = Path(TMP_DIR)):
186+
"""Gets the fastly logs for an hour.
187+
188+
Downloads the fastly logs from GCP, combines them and puts in logs_archive.
189+
190+
By default, gets the previous hour.
191+
192+
File names and timestamps in logs will be UTC time since fastly uses UTC times.
193+
194+
Will overwrite already existing out file and will overwrite any already downloaded log files.
195+
"""
196+
if out_file is None:
197+
out_dir = Path(tmp_dir)
198+
out_dir.mkdir(exist_ok=True)
199+
out_file = out_dir / f"fastly_access_logs.{date_of_logs.strftime('%Y-%m-%d')}T{hour}.log"
200+
201+
files = download_files(date=date_of_logs, hour=hour)
202+
k_way_merge(files, out_file=out_file)
203+
[file.unlink() for file in files]
204+
205+
def _test():
206+
logger.setLevel(logging.DEBUG)
207+
d = date(2023,12,30)
208+
h = 2
209+
files = download_files(verbose=True, hour=h, date=d, max=4)
210+
#files = sort_files_by_time(files)
211+
k_way_merge(files,
212+
files[0].with_name(f"fastly_access_MERGED.{d.strftime('%Y-%m-%d')}T{h}.log"))
213+
214+
215+
if __name__ == "__main__":
216+
fire.Fire(get_fastly_logs)

docs/source/index.rst

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,8 @@
11
arXiv Base Documentation
22
========================
33

4-
This project provides a base Flask application and base Docker image for
5-
arXiv-NG services.
6-
7-
Each component of this project **must** meet all of the following criteria:
8-
9-
1. It is likely that the component will be utilized in many or all arXiv
10-
services.
11-
2. Once stable, it is unlikely to change often.
12-
3. It is unlikely that implementing new features in specific services
13-
would require changes to the component.
14-
4. When a component does change, it **must** change in the same way for all of
15-
the services that use it.
16-
4+
This project provides library of commonly used functions, a base Flask application
5+
and base Docker image for arXiv-NG services.
176

187
.. toctree::
198
:maxdepth: 2

0 commit comments

Comments
 (0)