forked from sfbrigade/data-covid19-sfbayarea
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscraper_news.py
executable file
·98 lines (82 loc) · 3.28 KB
/
scraper_news.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3
import click
from datetime import datetime, timedelta
from covid19_sfbayarea import news
from covid19_sfbayarea.utils import friendly_county, parse_datetime
import logging
import os
import sys
import traceback
from pathlib import Path
from typing import cast, Tuple
COUNTY_NAMES = cast(Tuple[str], tuple(news.scrapers.keys()))
def cli_date(date_string: str) -> datetime:
'''Parse a CLI date or number of days into a TZ-aware datetime.'''
try:
days = float(date_string)
if days <= 0:
raise click.BadParameter('must be a positive number or date')
return datetime.now().astimezone() - timedelta(days=days)
except ValueError:
pass
try:
value = parse_datetime(date_string)
except Exception:
raise click.BadParameter(f'"{date_string}" is not a date')
if value >= datetime.now().astimezone():
raise click.BadParameter('must be a date in the past.')
return value
def run_county_news(county: str, from_: datetime, format: Tuple[str], output: str) -> None:
'''Run the scraper for a given county and output the results.'''
feed = news.scrapers[county].get_news(from_date=from_)
for format_name in format:
if format_name == 'json_simple':
data = feed.format_json_simple()
extension = '.simple.json'
elif format_name == 'json_feed':
data = feed.format_json_feed()
extension = '.json'
else:
data = feed.format_rss()
extension = '.rss'
if output:
parent = Path(output)
parent.mkdir(exist_ok=True)
with parent.joinpath(f'{county}{extension}').open('wb') as f:
f.write(data)
else:
click.echo(data)
@click.command(help='Create a news feed for one or more counties. Supported '
f'counties: {", ".join(COUNTY_NAMES)}.')
@click.argument('counties', metavar='[COUNTY]...', nargs=-1,
type=click.Choice(COUNTY_NAMES, case_sensitive=False))
@click.option('--from', 'from_', type=cli_date, default='31',
help='Only include news items newer than this date. Instead of '
'a date, you can specify a number of days ago, e.g. "14" '
'for 2 weeks ago.')
@click.option('--format', default=('json_feed',),
type=click.Choice(('json_feed', 'json_simple', 'rss')),
multiple=True)
@click.option('--output', metavar='PATH',
help='write output file(s) to this directory')
def main(counties: Tuple[str], from_: datetime, format: Tuple[str], output: str) -> None:
if len(counties) == 0:
counties = COUNTY_NAMES
# Do the work!
error_count = 0
for county in counties:
try:
run_county_news(county, from_, format, output)
except Exception as error:
error_count += 1
message = click.style(f'{friendly_county(county)} county failed',
fg='red')
click.echo(f'{message}: {error}', err=True)
traceback.print_exc()
if error_count == len(counties):
sys.exit(70)
elif error_count > 0:
sys.exit(1)
if __name__ == '__main__':
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'WARN').upper())
main()