From 7f89f4e983c23cb3c67f3b59dcad753f75bae5fd Mon Sep 17 00:00:00 2001 From: Rob Brackett Date: Thu, 7 Jan 2021 21:47:35 -0800 Subject: [PATCH] HOTFIX: Rewrite Marin data scraper to use Socrata Marin now publishes COVID data in an actual data portal with a real API and changed the dashboard to use Tableau. That broke our scraper (because all the charts are now built in an entirely different way). The best way to fix was to rewrite on top of the Socrata data portal API, which we should happily expect to be much more stable. This also adds caching to the Socrata API so we can make multiple calls to the same URL without actually making multiple HTTP requests. The Marin data is arranged such that a lot of different dimensions of data are combined (unlike most other portals where they are separated), and caching lets us keep the logic straightforward without making unnecessary repeated requests. This also turns out to fix #162 -- unknowns are now included in the data! Fixes #165. --- covid19_sfbayarea/data/marin.py | 553 ++++++++++++++++---------------- covid19_sfbayarea/data/utils.py | 12 +- 2 files changed, 283 insertions(+), 282 deletions(-) diff --git a/covid19_sfbayarea/data/marin.py b/covid19_sfbayarea/data/marin.py index 06919f36..746dfd48 100644 --- a/covid19_sfbayarea/data/marin.py +++ b/covid19_sfbayarea/data/marin.py @@ -1,287 +1,278 @@ -#!/usr/bin/env python3 -import csv -from typing import Any, List, Dict, Generator, Iterable, Tuple -from urllib.parse import unquote_plus -from datetime import datetime, timezone -from contextlib import contextmanager -from selenium import webdriver # type: ignore - +from typing import List, Dict, Iterable +from datetime import datetime from ..errors import FormatError -from ..utils import PERMISSIVE_SPACES -from ..webdriver import get_firefox -from .utils import get_data_model - - -class MarinDashboardPage: - """ - Represents a Selenium Webdriver browser opened to the Marin County COVID-19 - Dashboard page. - """ - url = 'https://coronavirus.marinhhs.org/surveillance' - driver: webdriver.Remote - - def __init__(self) -> None: - self.driver = get_firefox() - self.driver.implicitly_wait(30) - self._load() - - def find(self, selector: str) -> Any: - "Find the first element matching a CSS selector." - return self.driver.find_element_by_css_selector(selector) - - def find_all(self, selector: str) -> List: - "Find all element matching a CSS selector." - return self.driver.find_elements_by_css_selector(selector) - - @contextmanager - def use_chart_frame(self, chart_id: str) -> Generator: - """ - Switch the page to focus on the frame for a given chart. (Each chart - on the page is an iframe from https://www.datawrapper.de/.) When this - context manager exits, the page returns focus to the main frame. - """ - frame = self.driver.find_element_by_css_selector(f'iframe[src*="//datawrapper.dwcdn.net/{chart_id}/"]') - self.driver.switch_to.frame(frame) - try: - yield frame - finally: - self.driver.switch_to.default_content() - - def get_chart_csv(self, chart_id: str) -> List[str]: - "Extract the data backing a given chart as a CSV string." - with self.use_chart_frame(chart_id): - csv_data = self.find('.dw-data-link').get_attribute('href') - # Deal with the data - if csv_data.startswith('data:'): - media, data = csv_data[5:].split(',', 1) - # Will likely always have this kind of data type - if media != 'application/octet-stream;charset=utf-8': - raise FormatError(f'Cannot parse data with media type "{media}" for chart "{chart_id}"') - csv_string = unquote_plus(data) - csv_data = csv_string.splitlines() - else: - raise FormatError(f'Chart "{chart_id}" did not have a data: URL') - - return csv_data - - def get_chart_data(self, chart_id: str) -> csv.DictReader: - "Get the data backing a given chart as a :class:`csv.DictReader`." - reader = csv.DictReader(self.get_chart_csv(chart_id)) - # Clean up field names, which sometimes have erroneous whitespace. - reader.fieldnames = [name.strip(PERMISSIVE_SPACES) - for name in reader.fieldnames or []] - return reader - - def _load(self) -> None: - self.driver.get(self.url) - - def __enter__(self) -> 'MarinDashboardPage': - return self - - # We're being a bit lazy with types here because we don't use them. - def __exit__(self, _type: Any, _value: Any, _traceback: Any) -> None: - self.driver.quit() +from ..utils import assert_equal_sets, parse_datetime +from .utils import SocrataApi + + +API_IDS = { + # This timeseries includes cases, deaths, and hospitalizations (not tests) + 'cases': 'wg8s-i3c7', + 'tests': 'kr8c-izzb', + + # Cumulative demographics for age, race/ethnicity, and gender across cases, + # hospitalizations, and deaths are all published in one dataset. + # Breakdowns by transmission vector and comorbidities are not available. + # NOTE: Cases (but not hospitalizations or deaths) are also available as a + # separate timeseries for each demographic breakdown, but we do not + # use those at the moment. Most other counties do not publish demographic + # timeseries. + 'demographics': 'uu8g-ckxh' + + # Marin does not publish demographic breakdowns for testing. +} def get_county() -> Dict: """Main method for populating county data""" - model = get_data_model() - - chart_ids = { - "cases": "Eq6Es", - "deaths": "bSxdG", - "age": "zSHDs", - "gender": "FEciW", - "race_eth": "aBeEd", - "tests": "7sHQq", + api = SocrataApi('https://data.marincounty.org/') + notes = ('This data only accounts for Marin residents and does not ' + 'include inmates at San Quentin State Prison. ' + 'The tests timeseries only includes the number of tests ' + 'performed and not how many were positive or negative. ' + 'Demographic breakdowns for testing are not available.') + + return { + 'name': 'Marin', + 'update_time': get_latest_update(api).isoformat(), + # The county's data dashboard is at: + # https://coronavirus.marinhhs.org/surveillance + # Which links to the data portal category with the data sets we + # actually use at: + # https://data.marincounty.org/browse?q=covid + 'source_url': 'https://coronavirus.marinhhs.org/surveillance', + 'meta_from_source': '', + 'meta_from_baypd': notes, + 'series': { + 'cases': get_timeseries_cases(api), + 'deaths': get_timeseries_deaths(api), + 'tests': get_timeseries_tests(api), + }, + 'case_totals': get_case_totals(api), + 'death_totals': get_death_totals(api), + # Marin does not currently provide demographic breakdowns for + # testing, so no test totals right now. } - # The time series data for negative tests is gone, so I've just scraped positive test data using the new chart referenced above. - - with MarinDashboardPage() as page: - model['name'] = "Marin County" - model['update_time'] = datetime.now(tz=timezone.utc).isoformat() - model["meta_from_baypd"] = "" - model['source_url'] = page.url - model['meta_from_source'] = get_chart_meta(page, chart_ids.values()) - - model["series"]["cases"] = get_series_data(page, chart_ids["cases"], ['Date', 'Total Cases', 'Total Recovered*'], "cumul_cases", 'Total Cases', 'cases') - model["series"]["deaths"] = get_series_data(page, chart_ids["deaths"], ['Event Date', 'Total Hospitalizations', 'Total Deaths'], "cumul_deaths", 'Total Deaths', 'deaths', date_column='Event Date') - - model["series"]["tests"] = get_test_series(page, chart_ids["tests"]) - model["case_totals"]["age_group"], model["death_totals"]["age_group"] = get_breakdown_age(page, chart_ids["age"]) - model["case_totals"]["gender"], model["death_totals"]["gender"] = get_breakdown_gender(page, chart_ids["gender"]) - model["case_totals"]["race_eth"], model["death_totals"]["race_eth"] = get_breakdown_race_eth(page, chart_ids["race_eth"]) - - return model - - -def get_chart_meta(page: MarinDashboardPage, chart_ids: Iterable[str]) -> str: - """ - Get all the metadata underneath the data wrapper charts and the metadata at - the top of the county dashboard. - """ - metadata: List[str] = [] - chart_metadata: List[str] = [] - - if paragraphs := page.find_all('div.surveillance-data-text p'): - # TODO: it's not clear why any of these are being removed, nor - # why they are not being replaced with an equivalent ASCII - # character or just a space (not having something else in their - # place results in joined up words, like "arealways") - # \u2014 = em dash - # \u00a0 = non-breaking space - # \u2019 = apostrophe/right single quote - metadata.extend(paragraph.text.replace("\u2014","").replace("\u00a0", "").replace("\u2019","") - for paragraph in paragraphs) - else: - raise FormatError('Metadata location has changed.') - - for chart_id in chart_ids: - with page.use_chart_frame(chart_id): - for div in page.find_all('div.notes-block'): - chart_metadata.append(div.text) - - # Manually adding in metadata about testing data - chart_metadata.append("Negative and pending tests are excluded from the Marin County test data.") - chart_metadata.append("Note that this test data is about tests done by Marin County residents, not about all tests done in Marin County (includes residents and non-residents).") - - # Some metadata strings are repeated. - # Dedupe and preserve order with list(dict()). - all_metadata = list(dict.fromkeys([*metadata, *chart_metadata])) - return '\n\n'.join(all_metadata) - - -def get_series_data(page: MarinDashboardPage, chart_id: str, headers: list, model_typ: str, typ: str, new_count: str, date_column: str = 'Date') -> List: - """Extract the date, number of cases/deaths, and new cases/deaths.""" - csv_reader = page.get_chart_data(chart_id) - keys = csv_reader.fieldnames - - if keys != headers: - raise FormatError(f'Data headers for chart "{chart_id}" have changed. ' - f'Expected: {headers}, found: {keys}') - - series = [] - history = [] - for row in csv_reader: - daily: dict = dict() - date_time_obj = datetime.strptime(row[date_column], '%m/%d/%Y') - daily["date"] = date_time_obj.strftime('%Y-%m-%d') - # Collect the case totals in order to compute the change in cases per day - history.append(int(row[typ])) - daily[model_typ] = int(row[typ]) - series.append(daily) - - history_diff: list = list() - # Since i'm substracting pairwise elements, I need to adjust the range so I don't get an off by one error. - for i in range(0, len(history)-1): - history_diff.append((int(history[i+1]) - int(history[i])) + int(series[0][model_typ])) - # from what I've seen, series[0]["cumul_cases"] will be 0, but I shouldn't assume that. - history_diff.insert(0, int(series[0][model_typ])) - - for val, num in enumerate(history_diff): - series[val][new_count] = num - return series - - -def get_breakdown_age(page: MarinDashboardPage, chart_id: str) -> Tuple[List, List]: - """Get the breakdown of cases and deaths by age.""" - csv_reader = page.get_chart_data(chart_id) - keys = csv_reader.fieldnames - - if keys != ['Age Category', 'POPULATION', 'Cases', 'Hospitalizations', 'Deaths']: - raise FormatError(f'Data headers for chart "{chart_id}" have changed') - - key_mapping = {"0-9": "0_to_9", "10-18": "10_to_18", "19-34": "19_to_34", "35-49": "35_to_49", "50-64": "50_to_64", "65-79": "65_to_79", "80-94": "80_to_94", "95+": "95_and_older"} - - c_brkdown = [] - d_brkdown = [] - for row in csv_reader: - c_age: dict = dict() - d_age: dict = dict() - # Extracting the age group and the raw count for both cases and deaths. - c_age["group"], d_age["group"] = row['Age Category'], row['Age Category'] - if c_age["group"] not in key_mapping: - raise FormatError(f'"{c_age["group"]}" is not in the list of age groups. The age groups have changed.') - else: - c_age["group"] = key_mapping[c_age["group"]] - c_age["raw_count"] = int(row["Cases"]) - d_age["group"] = key_mapping[d_age["group"]] - d_age["raw_count"] = int(row["Deaths"]) - c_brkdown.append(c_age) - d_brkdown.append(d_age) - - return c_brkdown, d_brkdown - - -def get_breakdown_gender(page: MarinDashboardPage, chart_id: str) -> Tuple[Dict, Dict]: - """Get the breakdown of cases and deaths by gender.""" - csv_reader = page.get_chart_data(chart_id) - keys = csv_reader.fieldnames - - if keys != ['Gender', 'POPULATION', 'Cases', 'Hospitalizations', 'Deaths']: - raise FormatError(f'Data headers for chart "{chart_id}" have changed') - - genders = ['male', 'female'] - c_gender = {} - d_gender = {} - - for row in csv_reader: - # Extracting the gender and the raw count (the 3rd and 5th columns, respectively) for both cases and deaths. - # Each new row has data for a different gender. - gender = row["Gender"].lower() - if gender not in genders: - raise FormatError("The genders have changed.") - c_gender[gender] = int(row["Cases"]) - d_gender[gender] = int(row["Deaths"]) - - return c_gender, d_gender - - -def get_breakdown_race_eth(page: MarinDashboardPage, chart_id: str) -> Tuple[Dict, Dict]: - """Get the breakdown of cases and deaths by race/ethnicity.""" - csv_reader = page.get_chart_data(chart_id) - keys = csv_reader.fieldnames - - if keys != ['Race/Ethnicity', 'COUNTY POPULATION', 'Cases', 'Case Percent', 'Hospitalizations', 'Hospitalizations Percent', 'Deaths', 'Deaths Percent']: - raise FormatError(f'Data headers for chart "{chart_id}" have changed') - - key_mapping = {"Black/African American":"African_Amer", "Hispanic/Latino": "Latinx_or_Hispanic", "White": "White", "Asian": "Asian", "Native Hawaiian/Pacific Islander": "Pacific_Islander", "American Indian/Alaska Native": "Native_Amer", "Multi or Other Race": "Multi_or_Other"} - - c_race_eth = {} - d_race_eth = {} - - for row in csv_reader: - race_eth = row["Race/Ethnicity"] - if race_eth not in key_mapping: - raise FormatError("The race_eth groups have changed.") - else: - c_race_eth[key_mapping[race_eth]] = int(row["Cases"]) - d_race_eth[key_mapping[race_eth]] = int(row["Deaths"]) - - return c_race_eth, d_race_eth - - -def get_test_series(page: MarinDashboardPage, chart_id: str) -> List: - """ - Get the date, the number of new positive tests on that date, and the number - of cumulative positive tests. - """ - csv_reader = page.get_chart_data(chart_id) - keys = csv_reader.fieldnames - - if keys != ['Test Date', 'Positive Tests']: - raise FormatError(f'Data headers for chart "{chart_id}" have changed') - - test_series = [] - - cumul_pos = 0 - for row in csv_reader: - daily: dict = dict() - date_time_obj = datetime.strptime(row['Test Date'], '%m/%d/%Y') - daily["date"] = date_time_obj.strftime('%Y-%m-%d') - daily["positive"] = int(row["Positive Tests"]) - cumul_pos += daily["positive"] - daily["cumul_positive"] = cumul_pos - test_series.append(daily) - - return test_series + + +def get_latest_update(api: SocrataApi) -> datetime: + times = [parse_datetime(api.metadata(api_id)['dataUpdatedAt']) + for api_id in API_IDS.values()] + return max(*times) + + +def get_api_cases(api: SocrataApi, disposition: str) -> Iterable[dict]: + # https://data.marincounty.org/Public-Health/COVID-19-Case-Disposition/wg8s-i3c7 + data = api.resource(API_IDS['cases'], params={'$order': 'test_date ASC'}) + total = 0 + for entry in data: + if entry['status'] == disposition: + total += 1 + yield entry + + # Sanity-check that we filtered the `status` column on a real value. + if total == 0: + raise FormatError(f'There were no cases with `status == "{disposition}"`') + + +def get_timeseries_cases(api: SocrataApi) -> List[dict]: + return [ + { + 'date': parse_datetime(entry['test_date']).date().isoformat(), + 'cases': int(entry['new_confirmed_cases']), + 'cumul_cases': int(entry['cumulative_case_count']), + } + for entry in get_api_cases(api, 'Confirmed') + ] + + +def get_timeseries_deaths(api: SocrataApi) -> List[dict]: + return [ + { + 'date': parse_datetime(entry['test_date']).date().isoformat(), + 'deaths': int(entry['new_confirmed_cases']), + 'cumul_deaths': int(entry['cumulative_case_count']), + } + for entry in get_api_cases(api, 'Death') + ] + + +def get_timeseries_tests(api: SocrataApi) -> List[dict]: + # https://data.marincounty.org/Public-Health/Marin-County-COVID-19-Testing-Data-CDPH-/kr8c-izzb + data = api.resource(API_IDS['tests'], params={'$order': 'date ASC'}) + + total = 0 + result = [] + for entry in data: + # Percent positive tests is also available in this timeseries when: + # variable == 'test_pos_nopris_7day_total_!no_lag' + # Unfortunately the naming and documentation implies this is the + # positivity rate over the past 7 days, so it's probably not accurate + # to calculate an absolute number of positive/negative tests from it. + if entry['variable'] != 'total_tests_nopris_!no_lag': + continue + + # Since the same column is used for percent positive and tests, the + # total tests comes through as a float rather than an int. + value = float(entry['value']) + tests = int(value) + assert tests == value + + total += tests + result.append({ + 'date': parse_datetime(entry['date']).date().isoformat(), + 'tests': tests, + 'positive': -1, + 'negative': -1, + 'pending': -1, + 'cumul_tests': total, + 'cumul_pos': -1, + 'cumul_neg': -1, + 'cumul_pend': -1, + }) + + return result + + +def get_demographic_totals(api: SocrataApi, demographic: str) -> List[Dict]: + # https://data.marincounty.org/Public-Health/COVID-19-Cumulative-Demographics/uu8g-ckxh + data = api.resource(API_IDS['demographics']) + prefix = f'{demographic} - ' + prefix_length = len(prefix) + + result = [] + for entry in data: + if entry['grouping'].startswith(prefix): + entry = entry.copy() + entry['grouping'] = entry['grouping'][prefix_length:] + result.append(entry) + + # Sanity-check that we filtered the `status` column on a real value. + if len(result) == 0: + raise FormatError(f'There were no cases with demographic "{demographic}"') + + return result + + +def get_case_totals(api: SocrataApi) -> Dict: + return { + 'gender': get_cases_by_gender(api), + 'age_group': get_cases_by_age(api), + 'race_eth': get_cases_by_race(api), + # These are not currently provided by Marin: + # 'underlying_cond': get_cases_by_condition(api), + # 'transmission_cat': get_cases_by_transmission(api), + } + + +def get_cases_by_gender(api: SocrataApi) -> Dict: + data = get_demographic_totals(api, 'Gender') + result = {row['grouping'].lower(): int(row['cumulative']) + for row in data} + + # Sanity-check the output at least has male/female. + try: + assert 'male' in result + assert 'female' in result + except AssertionError: + raise FormatError('Missing explicity male/female gender categories ' + f'for cases, got: {list(result.keys())}') + + return result + + +def get_cases_by_age(api: SocrataApi) -> List[Dict]: + data = get_demographic_totals(api, 'Age') + return [{'group': row['grouping'], 'raw_count': int(row['cumulative'])} + for row in data] + + +def get_cases_by_race(api: SocrataApi) -> Dict: + data = get_demographic_totals(api, 'Race') + mapping = { + 'american indian/alaska native': 'Native_Amer', + 'asian': 'Asian', + 'black/african american': 'African_Amer', + 'hispanic/latinx': 'Latinx_or_Hispanic', + 'multiracial': 'Multiple_Race', + 'native hawaiian/pacific islander': 'Pacific_Islander', + 'unknown': 'Unknown', + 'other': 'Other', + 'white': 'White', + } + assert_equal_sets(mapping.keys(), (row['grouping'].lower() for row in data)) + result = {mapping[row['grouping'].lower()]: int(row['cumulative']) + for row in data} + return result + + +def get_cases_by_condition(api: SocrataApi) -> Dict: + # NO DATA + raise NotImplementedError() + + +def get_cases_by_transmission(api: SocrataApi) -> Dict: + # NO DATA + raise NotImplementedError() + + +def get_death_totals(api: SocrataApi) -> Dict: + return { + 'gender': get_deaths_by_gender(api), + 'age_group': get_deaths_by_age(api), + 'race_eth': get_deaths_by_race(api), + # These are not currently provided by Marin: + # 'underlying_cond': get_deaths_by_condition(api), + # 'transmission_cat': get_deaths_by_transmission(api), + } + + +def get_deaths_by_gender(api: SocrataApi) -> Dict: + data = get_demographic_totals(api, 'Gender') + result = {row['grouping'].lower(): int(row['deaths']) + for row in data} + + # Sanity-check the output at least has male/female. + try: + assert 'male' in result + assert 'female' in result + except AssertionError: + raise FormatError('Missing explicity male/female gender categories ' + f'for deaths, got: {list(result.keys())}') + + return result + + +def get_deaths_by_age(api: SocrataApi) -> List[Dict]: + data = get_demographic_totals(api, 'Age') + return [{'group': row['grouping'], 'raw_count': int(row['deaths'])} + for row in data] + + +def get_deaths_by_race(api: SocrataApi) -> Dict: + data = get_demographic_totals(api, 'Race') + mapping = { + 'american indian/alaska native': 'Native_Amer', + 'asian': 'Asian', + 'black/african american': 'African_Amer', + 'hispanic/latinx': 'Latinx_or_Hispanic', + 'multiracial': 'Multiple_Race', + 'native hawaiian/pacific islander': 'Pacific_Islander', + 'unknown': 'Unknown', + 'other': 'Other', + 'white': 'White', + } + assert_equal_sets(mapping.keys(), (row['grouping'].lower() for row in data)) + result = {mapping[row['grouping'].lower()]: int(row['deaths']) + for row in data} + return result + + +def get_deaths_by_condition(api: SocrataApi) -> Dict: + # NO DATA + raise NotImplementedError() + + +def get_deaths_by_transmission(api: SocrataApi) -> Dict: + # NO DATA + raise NotImplementedError() diff --git a/covid19_sfbayarea/data/utils.py b/covid19_sfbayarea/data/utils.py index 44808db7..d1b604a4 100644 --- a/covid19_sfbayarea/data/utils.py +++ b/covid19_sfbayarea/data/utils.py @@ -1,3 +1,4 @@ +from functools import lru_cache from pathlib import Path import json from typing import Dict, Any @@ -25,7 +26,8 @@ def __init__(self, base_url: str): self.resource_url = urljoin(self.base_url, '/resource/') self.metadata_url = urljoin(self.base_url, '/api/views/metadata/v1/') - def request(self, url:str, **kwargs: Any) -> Dict: + @lru_cache(maxsize=32) + def _request(self, url: str, **kwargs: Any) -> Dict: try: response = self.session.get(url, **kwargs) response.raise_for_status() @@ -38,6 +40,14 @@ def request(self, url:str, **kwargs: Any) -> Dict: raise http_err raise BadRequest(server_message, response=response) + def request(self, url: str, params: Dict = None, **kwargs: Any) -> Dict: + # Arguments to _request() must be hashable (so they can be cached). + # If a params dict is sent, convert it to a tuple. + if params: + kwargs['params'] = tuple(params.items()) + + return self._request(url, **kwargs) + def resource(self, resource_id: str, **kwargs: Any) -> Dict: return self.request(f'{self.resource_url}{resource_id}', **kwargs)