diff --git a/covid19_sfbayarea/data/marin.py b/covid19_sfbayarea/data/marin.py index 06919f36..746dfd48 100644 --- a/covid19_sfbayarea/data/marin.py +++ b/covid19_sfbayarea/data/marin.py @@ -1,287 +1,278 @@ -#!/usr/bin/env python3 -import csv -from typing import Any, List, Dict, Generator, Iterable, Tuple -from urllib.parse import unquote_plus -from datetime import datetime, timezone -from contextlib import contextmanager -from selenium import webdriver # type: ignore - +from typing import List, Dict, Iterable +from datetime import datetime from ..errors import FormatError -from ..utils import PERMISSIVE_SPACES -from ..webdriver import get_firefox -from .utils import get_data_model - - -class MarinDashboardPage: - """ - Represents a Selenium Webdriver browser opened to the Marin County COVID-19 - Dashboard page. - """ - url = 'https://coronavirus.marinhhs.org/surveillance' - driver: webdriver.Remote - - def __init__(self) -> None: - self.driver = get_firefox() - self.driver.implicitly_wait(30) - self._load() - - def find(self, selector: str) -> Any: - "Find the first element matching a CSS selector." - return self.driver.find_element_by_css_selector(selector) - - def find_all(self, selector: str) -> List: - "Find all element matching a CSS selector." - return self.driver.find_elements_by_css_selector(selector) - - @contextmanager - def use_chart_frame(self, chart_id: str) -> Generator: - """ - Switch the page to focus on the frame for a given chart. (Each chart - on the page is an iframe from https://www.datawrapper.de/.) When this - context manager exits, the page returns focus to the main frame. - """ - frame = self.driver.find_element_by_css_selector(f'iframe[src*="//datawrapper.dwcdn.net/{chart_id}/"]') - self.driver.switch_to.frame(frame) - try: - yield frame - finally: - self.driver.switch_to.default_content() - - def get_chart_csv(self, chart_id: str) -> List[str]: - "Extract the data backing a given chart as a CSV string." - with self.use_chart_frame(chart_id): - csv_data = self.find('.dw-data-link').get_attribute('href') - # Deal with the data - if csv_data.startswith('data:'): - media, data = csv_data[5:].split(',', 1) - # Will likely always have this kind of data type - if media != 'application/octet-stream;charset=utf-8': - raise FormatError(f'Cannot parse data with media type "{media}" for chart "{chart_id}"') - csv_string = unquote_plus(data) - csv_data = csv_string.splitlines() - else: - raise FormatError(f'Chart "{chart_id}" did not have a data: URL') - - return csv_data - - def get_chart_data(self, chart_id: str) -> csv.DictReader: - "Get the data backing a given chart as a :class:`csv.DictReader`." - reader = csv.DictReader(self.get_chart_csv(chart_id)) - # Clean up field names, which sometimes have erroneous whitespace. - reader.fieldnames = [name.strip(PERMISSIVE_SPACES) - for name in reader.fieldnames or []] - return reader - - def _load(self) -> None: - self.driver.get(self.url) - - def __enter__(self) -> 'MarinDashboardPage': - return self - - # We're being a bit lazy with types here because we don't use them. - def __exit__(self, _type: Any, _value: Any, _traceback: Any) -> None: - self.driver.quit() +from ..utils import assert_equal_sets, parse_datetime +from .utils import SocrataApi + + +API_IDS = { + # This timeseries includes cases, deaths, and hospitalizations (not tests) + 'cases': 'wg8s-i3c7', + 'tests': 'kr8c-izzb', + + # Cumulative demographics for age, race/ethnicity, and gender across cases, + # hospitalizations, and deaths are all published in one dataset. + # Breakdowns by transmission vector and comorbidities are not available. + # NOTE: Cases (but not hospitalizations or deaths) are also available as a + # separate timeseries for each demographic breakdown, but we do not + # use those at the moment. Most other counties do not publish demographic + # timeseries. + 'demographics': 'uu8g-ckxh' + + # Marin does not publish demographic breakdowns for testing. +} def get_county() -> Dict: """Main method for populating county data""" - model = get_data_model() - - chart_ids = { - "cases": "Eq6Es", - "deaths": "bSxdG", - "age": "zSHDs", - "gender": "FEciW", - "race_eth": "aBeEd", - "tests": "7sHQq", + api = SocrataApi('https://data.marincounty.org/') + notes = ('This data only accounts for Marin residents and does not ' + 'include inmates at San Quentin State Prison. ' + 'The tests timeseries only includes the number of tests ' + 'performed and not how many were positive or negative. ' + 'Demographic breakdowns for testing are not available.') + + return { + 'name': 'Marin', + 'update_time': get_latest_update(api).isoformat(), + # The county's data dashboard is at: + # https://coronavirus.marinhhs.org/surveillance + # Which links to the data portal category with the data sets we + # actually use at: + # https://data.marincounty.org/browse?q=covid + 'source_url': 'https://coronavirus.marinhhs.org/surveillance', + 'meta_from_source': '', + 'meta_from_baypd': notes, + 'series': { + 'cases': get_timeseries_cases(api), + 'deaths': get_timeseries_deaths(api), + 'tests': get_timeseries_tests(api), + }, + 'case_totals': get_case_totals(api), + 'death_totals': get_death_totals(api), + # Marin does not currently provide demographic breakdowns for + # testing, so no test totals right now. } - # The time series data for negative tests is gone, so I've just scraped positive test data using the new chart referenced above. - - with MarinDashboardPage() as page: - model['name'] = "Marin County" - model['update_time'] = datetime.now(tz=timezone.utc).isoformat() - model["meta_from_baypd"] = "" - model['source_url'] = page.url - model['meta_from_source'] = get_chart_meta(page, chart_ids.values()) - - model["series"]["cases"] = get_series_data(page, chart_ids["cases"], ['Date', 'Total Cases', 'Total Recovered*'], "cumul_cases", 'Total Cases', 'cases') - model["series"]["deaths"] = get_series_data(page, chart_ids["deaths"], ['Event Date', 'Total Hospitalizations', 'Total Deaths'], "cumul_deaths", 'Total Deaths', 'deaths', date_column='Event Date') - - model["series"]["tests"] = get_test_series(page, chart_ids["tests"]) - model["case_totals"]["age_group"], model["death_totals"]["age_group"] = get_breakdown_age(page, chart_ids["age"]) - model["case_totals"]["gender"], model["death_totals"]["gender"] = get_breakdown_gender(page, chart_ids["gender"]) - model["case_totals"]["race_eth"], model["death_totals"]["race_eth"] = get_breakdown_race_eth(page, chart_ids["race_eth"]) - - return model - - -def get_chart_meta(page: MarinDashboardPage, chart_ids: Iterable[str]) -> str: - """ - Get all the metadata underneath the data wrapper charts and the metadata at - the top of the county dashboard. - """ - metadata: List[str] = [] - chart_metadata: List[str] = [] - - if paragraphs := page.find_all('div.surveillance-data-text p'): - # TODO: it's not clear why any of these are being removed, nor - # why they are not being replaced with an equivalent ASCII - # character or just a space (not having something else in their - # place results in joined up words, like "arealways") - # \u2014 = em dash - # \u00a0 = non-breaking space - # \u2019 = apostrophe/right single quote - metadata.extend(paragraph.text.replace("\u2014","").replace("\u00a0", "").replace("\u2019","") - for paragraph in paragraphs) - else: - raise FormatError('Metadata location has changed.') - - for chart_id in chart_ids: - with page.use_chart_frame(chart_id): - for div in page.find_all('div.notes-block'): - chart_metadata.append(div.text) - - # Manually adding in metadata about testing data - chart_metadata.append("Negative and pending tests are excluded from the Marin County test data.") - chart_metadata.append("Note that this test data is about tests done by Marin County residents, not about all tests done in Marin County (includes residents and non-residents).") - - # Some metadata strings are repeated. - # Dedupe and preserve order with list(dict()). - all_metadata = list(dict.fromkeys([*metadata, *chart_metadata])) - return '\n\n'.join(all_metadata) - - -def get_series_data(page: MarinDashboardPage, chart_id: str, headers: list, model_typ: str, typ: str, new_count: str, date_column: str = 'Date') -> List: - """Extract the date, number of cases/deaths, and new cases/deaths.""" - csv_reader = page.get_chart_data(chart_id) - keys = csv_reader.fieldnames - - if keys != headers: - raise FormatError(f'Data headers for chart "{chart_id}" have changed. ' - f'Expected: {headers}, found: {keys}') - - series = [] - history = [] - for row in csv_reader: - daily: dict = dict() - date_time_obj = datetime.strptime(row[date_column], '%m/%d/%Y') - daily["date"] = date_time_obj.strftime('%Y-%m-%d') - # Collect the case totals in order to compute the change in cases per day - history.append(int(row[typ])) - daily[model_typ] = int(row[typ]) - series.append(daily) - - history_diff: list = list() - # Since i'm substracting pairwise elements, I need to adjust the range so I don't get an off by one error. - for i in range(0, len(history)-1): - history_diff.append((int(history[i+1]) - int(history[i])) + int(series[0][model_typ])) - # from what I've seen, series[0]["cumul_cases"] will be 0, but I shouldn't assume that. - history_diff.insert(0, int(series[0][model_typ])) - - for val, num in enumerate(history_diff): - series[val][new_count] = num - return series - - -def get_breakdown_age(page: MarinDashboardPage, chart_id: str) -> Tuple[List, List]: - """Get the breakdown of cases and deaths by age.""" - csv_reader = page.get_chart_data(chart_id) - keys = csv_reader.fieldnames - - if keys != ['Age Category', 'POPULATION', 'Cases', 'Hospitalizations', 'Deaths']: - raise FormatError(f'Data headers for chart "{chart_id}" have changed') - - key_mapping = {"0-9": "0_to_9", "10-18": "10_to_18", "19-34": "19_to_34", "35-49": "35_to_49", "50-64": "50_to_64", "65-79": "65_to_79", "80-94": "80_to_94", "95+": "95_and_older"} - - c_brkdown = [] - d_brkdown = [] - for row in csv_reader: - c_age: dict = dict() - d_age: dict = dict() - # Extracting the age group and the raw count for both cases and deaths. - c_age["group"], d_age["group"] = row['Age Category'], row['Age Category'] - if c_age["group"] not in key_mapping: - raise FormatError(f'"{c_age["group"]}" is not in the list of age groups. The age groups have changed.') - else: - c_age["group"] = key_mapping[c_age["group"]] - c_age["raw_count"] = int(row["Cases"]) - d_age["group"] = key_mapping[d_age["group"]] - d_age["raw_count"] = int(row["Deaths"]) - c_brkdown.append(c_age) - d_brkdown.append(d_age) - - return c_brkdown, d_brkdown - - -def get_breakdown_gender(page: MarinDashboardPage, chart_id: str) -> Tuple[Dict, Dict]: - """Get the breakdown of cases and deaths by gender.""" - csv_reader = page.get_chart_data(chart_id) - keys = csv_reader.fieldnames - - if keys != ['Gender', 'POPULATION', 'Cases', 'Hospitalizations', 'Deaths']: - raise FormatError(f'Data headers for chart "{chart_id}" have changed') - - genders = ['male', 'female'] - c_gender = {} - d_gender = {} - - for row in csv_reader: - # Extracting the gender and the raw count (the 3rd and 5th columns, respectively) for both cases and deaths. - # Each new row has data for a different gender. - gender = row["Gender"].lower() - if gender not in genders: - raise FormatError("The genders have changed.") - c_gender[gender] = int(row["Cases"]) - d_gender[gender] = int(row["Deaths"]) - - return c_gender, d_gender - - -def get_breakdown_race_eth(page: MarinDashboardPage, chart_id: str) -> Tuple[Dict, Dict]: - """Get the breakdown of cases and deaths by race/ethnicity.""" - csv_reader = page.get_chart_data(chart_id) - keys = csv_reader.fieldnames - - if keys != ['Race/Ethnicity', 'COUNTY POPULATION', 'Cases', 'Case Percent', 'Hospitalizations', 'Hospitalizations Percent', 'Deaths', 'Deaths Percent']: - raise FormatError(f'Data headers for chart "{chart_id}" have changed') - - key_mapping = {"Black/African American":"African_Amer", "Hispanic/Latino": "Latinx_or_Hispanic", "White": "White", "Asian": "Asian", "Native Hawaiian/Pacific Islander": "Pacific_Islander", "American Indian/Alaska Native": "Native_Amer", "Multi or Other Race": "Multi_or_Other"} - - c_race_eth = {} - d_race_eth = {} - - for row in csv_reader: - race_eth = row["Race/Ethnicity"] - if race_eth not in key_mapping: - raise FormatError("The race_eth groups have changed.") - else: - c_race_eth[key_mapping[race_eth]] = int(row["Cases"]) - d_race_eth[key_mapping[race_eth]] = int(row["Deaths"]) - - return c_race_eth, d_race_eth - - -def get_test_series(page: MarinDashboardPage, chart_id: str) -> List: - """ - Get the date, the number of new positive tests on that date, and the number - of cumulative positive tests. - """ - csv_reader = page.get_chart_data(chart_id) - keys = csv_reader.fieldnames - - if keys != ['Test Date', 'Positive Tests']: - raise FormatError(f'Data headers for chart "{chart_id}" have changed') - - test_series = [] - - cumul_pos = 0 - for row in csv_reader: - daily: dict = dict() - date_time_obj = datetime.strptime(row['Test Date'], '%m/%d/%Y') - daily["date"] = date_time_obj.strftime('%Y-%m-%d') - daily["positive"] = int(row["Positive Tests"]) - cumul_pos += daily["positive"] - daily["cumul_positive"] = cumul_pos - test_series.append(daily) - - return test_series + + +def get_latest_update(api: SocrataApi) -> datetime: + times = [parse_datetime(api.metadata(api_id)['dataUpdatedAt']) + for api_id in API_IDS.values()] + return max(*times) + + +def get_api_cases(api: SocrataApi, disposition: str) -> Iterable[dict]: + # https://data.marincounty.org/Public-Health/COVID-19-Case-Disposition/wg8s-i3c7 + data = api.resource(API_IDS['cases'], params={'$order': 'test_date ASC'}) + total = 0 + for entry in data: + if entry['status'] == disposition: + total += 1 + yield entry + + # Sanity-check that we filtered the `status` column on a real value. + if total == 0: + raise FormatError(f'There were no cases with `status == "{disposition}"`') + + +def get_timeseries_cases(api: SocrataApi) -> List[dict]: + return [ + { + 'date': parse_datetime(entry['test_date']).date().isoformat(), + 'cases': int(entry['new_confirmed_cases']), + 'cumul_cases': int(entry['cumulative_case_count']), + } + for entry in get_api_cases(api, 'Confirmed') + ] + + +def get_timeseries_deaths(api: SocrataApi) -> List[dict]: + return [ + { + 'date': parse_datetime(entry['test_date']).date().isoformat(), + 'deaths': int(entry['new_confirmed_cases']), + 'cumul_deaths': int(entry['cumulative_case_count']), + } + for entry in get_api_cases(api, 'Death') + ] + + +def get_timeseries_tests(api: SocrataApi) -> List[dict]: + # https://data.marincounty.org/Public-Health/Marin-County-COVID-19-Testing-Data-CDPH-/kr8c-izzb + data = api.resource(API_IDS['tests'], params={'$order': 'date ASC'}) + + total = 0 + result = [] + for entry in data: + # Percent positive tests is also available in this timeseries when: + # variable == 'test_pos_nopris_7day_total_!no_lag' + # Unfortunately the naming and documentation implies this is the + # positivity rate over the past 7 days, so it's probably not accurate + # to calculate an absolute number of positive/negative tests from it. + if entry['variable'] != 'total_tests_nopris_!no_lag': + continue + + # Since the same column is used for percent positive and tests, the + # total tests comes through as a float rather than an int. + value = float(entry['value']) + tests = int(value) + assert tests == value + + total += tests + result.append({ + 'date': parse_datetime(entry['date']).date().isoformat(), + 'tests': tests, + 'positive': -1, + 'negative': -1, + 'pending': -1, + 'cumul_tests': total, + 'cumul_pos': -1, + 'cumul_neg': -1, + 'cumul_pend': -1, + }) + + return result + + +def get_demographic_totals(api: SocrataApi, demographic: str) -> List[Dict]: + # https://data.marincounty.org/Public-Health/COVID-19-Cumulative-Demographics/uu8g-ckxh + data = api.resource(API_IDS['demographics']) + prefix = f'{demographic} - ' + prefix_length = len(prefix) + + result = [] + for entry in data: + if entry['grouping'].startswith(prefix): + entry = entry.copy() + entry['grouping'] = entry['grouping'][prefix_length:] + result.append(entry) + + # Sanity-check that we filtered the `status` column on a real value. + if len(result) == 0: + raise FormatError(f'There were no cases with demographic "{demographic}"') + + return result + + +def get_case_totals(api: SocrataApi) -> Dict: + return { + 'gender': get_cases_by_gender(api), + 'age_group': get_cases_by_age(api), + 'race_eth': get_cases_by_race(api), + # These are not currently provided by Marin: + # 'underlying_cond': get_cases_by_condition(api), + # 'transmission_cat': get_cases_by_transmission(api), + } + + +def get_cases_by_gender(api: SocrataApi) -> Dict: + data = get_demographic_totals(api, 'Gender') + result = {row['grouping'].lower(): int(row['cumulative']) + for row in data} + + # Sanity-check the output at least has male/female. + try: + assert 'male' in result + assert 'female' in result + except AssertionError: + raise FormatError('Missing explicity male/female gender categories ' + f'for cases, got: {list(result.keys())}') + + return result + + +def get_cases_by_age(api: SocrataApi) -> List[Dict]: + data = get_demographic_totals(api, 'Age') + return [{'group': row['grouping'], 'raw_count': int(row['cumulative'])} + for row in data] + + +def get_cases_by_race(api: SocrataApi) -> Dict: + data = get_demographic_totals(api, 'Race') + mapping = { + 'american indian/alaska native': 'Native_Amer', + 'asian': 'Asian', + 'black/african american': 'African_Amer', + 'hispanic/latinx': 'Latinx_or_Hispanic', + 'multiracial': 'Multiple_Race', + 'native hawaiian/pacific islander': 'Pacific_Islander', + 'unknown': 'Unknown', + 'other': 'Other', + 'white': 'White', + } + assert_equal_sets(mapping.keys(), (row['grouping'].lower() for row in data)) + result = {mapping[row['grouping'].lower()]: int(row['cumulative']) + for row in data} + return result + + +def get_cases_by_condition(api: SocrataApi) -> Dict: + # NO DATA + raise NotImplementedError() + + +def get_cases_by_transmission(api: SocrataApi) -> Dict: + # NO DATA + raise NotImplementedError() + + +def get_death_totals(api: SocrataApi) -> Dict: + return { + 'gender': get_deaths_by_gender(api), + 'age_group': get_deaths_by_age(api), + 'race_eth': get_deaths_by_race(api), + # These are not currently provided by Marin: + # 'underlying_cond': get_deaths_by_condition(api), + # 'transmission_cat': get_deaths_by_transmission(api), + } + + +def get_deaths_by_gender(api: SocrataApi) -> Dict: + data = get_demographic_totals(api, 'Gender') + result = {row['grouping'].lower(): int(row['deaths']) + for row in data} + + # Sanity-check the output at least has male/female. + try: + assert 'male' in result + assert 'female' in result + except AssertionError: + raise FormatError('Missing explicity male/female gender categories ' + f'for deaths, got: {list(result.keys())}') + + return result + + +def get_deaths_by_age(api: SocrataApi) -> List[Dict]: + data = get_demographic_totals(api, 'Age') + return [{'group': row['grouping'], 'raw_count': int(row['deaths'])} + for row in data] + + +def get_deaths_by_race(api: SocrataApi) -> Dict: + data = get_demographic_totals(api, 'Race') + mapping = { + 'american indian/alaska native': 'Native_Amer', + 'asian': 'Asian', + 'black/african american': 'African_Amer', + 'hispanic/latinx': 'Latinx_or_Hispanic', + 'multiracial': 'Multiple_Race', + 'native hawaiian/pacific islander': 'Pacific_Islander', + 'unknown': 'Unknown', + 'other': 'Other', + 'white': 'White', + } + assert_equal_sets(mapping.keys(), (row['grouping'].lower() for row in data)) + result = {mapping[row['grouping'].lower()]: int(row['deaths']) + for row in data} + return result + + +def get_deaths_by_condition(api: SocrataApi) -> Dict: + # NO DATA + raise NotImplementedError() + + +def get_deaths_by_transmission(api: SocrataApi) -> Dict: + # NO DATA + raise NotImplementedError() diff --git a/covid19_sfbayarea/data/utils.py b/covid19_sfbayarea/data/utils.py index 44808db7..d1b604a4 100644 --- a/covid19_sfbayarea/data/utils.py +++ b/covid19_sfbayarea/data/utils.py @@ -1,3 +1,4 @@ +from functools import lru_cache from pathlib import Path import json from typing import Dict, Any @@ -25,7 +26,8 @@ def __init__(self, base_url: str): self.resource_url = urljoin(self.base_url, '/resource/') self.metadata_url = urljoin(self.base_url, '/api/views/metadata/v1/') - def request(self, url:str, **kwargs: Any) -> Dict: + @lru_cache(maxsize=32) + def _request(self, url: str, **kwargs: Any) -> Dict: try: response = self.session.get(url, **kwargs) response.raise_for_status() @@ -38,6 +40,14 @@ def request(self, url:str, **kwargs: Any) -> Dict: raise http_err raise BadRequest(server_message, response=response) + def request(self, url: str, params: Dict = None, **kwargs: Any) -> Dict: + # Arguments to _request() must be hashable (so they can be cached). + # If a params dict is sent, convert it to a tuple. + if params: + kwargs['params'] = tuple(params.items()) + + return self._request(url, **kwargs) + def resource(self, resource_id: str, **kwargs: Any) -> Dict: return self.request(f'{self.resource_url}{resource_id}', **kwargs)