Skip to content

Commit 1a5f7b3

Browse files
committed
Convert the Regions materialised-view to a table
It gives us a lot more flexibility. Such as handling errors and incremental updating. Users who were running previous dev versions of Explorer will need to `--init` again.
1 parent 93e46e7 commit 1a5f7b3

File tree

2 files changed

+122
-62
lines changed

2 files changed

+122
-62
lines changed

cubedash/summary/_schema.py

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from __future__ import absolute_import
22

3-
from typing import Set
3+
import warnings
44
from enum import Enum
5+
from typing import Set
6+
57
import structlog
68
from geoalchemy2 import Geometry
79
from sqlalchemy import (
@@ -156,6 +158,24 @@
156158
),
157159
)
158160

161+
# The geometry of each unique 'region' for a product.
162+
REGION = Table(
163+
"region",
164+
METADATA,
165+
Column("dataset_type_ref", SmallInteger, nullable=False),
166+
Column("region_code", String, nullable=False),
167+
Column("count", Integer, nullable=False),
168+
Column(
169+
"generation_time",
170+
DateTime(timezone=True),
171+
server_default=func.now(),
172+
nullable=False,
173+
),
174+
Column("footprint", Geometry(srid=4326, spatial_index=False)),
175+
PrimaryKeyConstraint("dataset_type_ref", "region_code"),
176+
)
177+
178+
159179
_REF_TABLE_METADATA = MetaData(schema=CUBEDASH_SCHEMA)
160180
# This is a materialised view of the postgis spatial_ref_sys for lookups.
161181
# See creation of mv_spatial_ref_sys below.
@@ -182,16 +202,6 @@
182202
Column("has_region", Integer),
183203
)
184204

185-
# The geometry of each unique 'region' for a product.
186-
REGION = Table(
187-
"mv_region",
188-
_REF_TABLE_METADATA,
189-
Column("dataset_type_ref", SmallInteger, nullable=False),
190-
Column("region_code", String),
191-
Column("footprint", Geometry(srid=4326, spatial_index=False)),
192-
Column("count", Integer, nullable=False),
193-
)
194-
195205

196206
def has_schema(engine: Engine) -> bool:
197207
"""
@@ -207,6 +217,16 @@ def is_compatible_schema(engine: Engine) -> bool:
207217
if not pg_column_exists(engine, f"{CUBEDASH_SCHEMA}.product", "fixed_metadata"):
208218
is_latest = False
209219

220+
if not pg_exists(engine, f"{CUBEDASH_SCHEMA}.region"):
221+
is_latest = False
222+
223+
if pg_exists(engine, f"{CUBEDASH_SCHEMA}.mv_region"):
224+
warnings.warn(
225+
"Your database has item `cubedash.mv_region` from an unstable version of Explorer. "
226+
"It will not harm you, but feel free to drop it once all Explorer instances "
227+
"have been upgraded: "
228+
" drop materialised view cubedash.mv_region"
229+
)
210230
return is_latest
211231

212232

@@ -346,30 +366,6 @@ def create_schema(engine: Engine):
346366
"""
347367
)
348368

349-
# A geometry for each declared region.
350-
#
351-
# This happens in two steps so that we union cleanly (in the native CRS rather than after transforming)
352-
# TODO: Simplify geom after union?
353-
engine.execute(
354-
f"""
355-
create materialized view if not exists {CUBEDASH_SCHEMA}.mv_region as (
356-
select dataset_type_ref,
357-
region_code,
358-
ST_SimplifyPreserveTopology(ST_Union(footprint), 0.0001) as footprint,
359-
sum(count) as count
360-
from (
361-
select dataset_type_ref,
362-
region_code,
363-
ST_Transform(ST_Union(footprint), 4326) as footprint,
364-
count(*) as count
365-
from {CUBEDASH_SCHEMA}.dataset_spatial
366-
group by dataset_type_ref, region_code, ST_SRID(footprint)
367-
) srid_groups
368-
group by dataset_type_ref, region_code
369-
) with no data;
370-
"""
371-
)
372-
373369

374370
def refresh_supporting_views(conn, concurrently=False):
375371
args = "concurrently" if concurrently else ""
@@ -383,11 +379,6 @@ def refresh_supporting_views(conn, concurrently=False):
383379
refresh materialized view {args} {CUBEDASH_SCHEMA}.mv_dataset_spatial_quality;
384380
"""
385381
)
386-
conn.execute(
387-
f"""
388-
refresh materialized view {args} {CUBEDASH_SCHEMA}.mv_region;
389-
"""
390-
)
391382

392383

393384
def get_srid_name(engine: Engine, srid: int):

cubedash/summary/_stores.py

Lines changed: 91 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,17 @@
33
from collections import Counter
44
from dataclasses import dataclass
55
from datetime import date, datetime, timedelta
6-
from typing import Dict, Generator, Iterable, List, Optional, Sequence, Tuple, Union
6+
from typing import (
7+
Dict,
8+
Generator,
9+
Iterable,
10+
List,
11+
Optional,
12+
Sequence,
13+
Tuple,
14+
Union,
15+
Set,
16+
)
717
from uuid import UUID
818

919
import dateutil.parser
@@ -14,7 +24,7 @@
1424
from geoalchemy2 import shape as geo_shape
1525
from geoalchemy2.shape import to_shape
1626
from shapely.geometry import GeometryCollection
17-
from sqlalchemy import DDL, String, and_, func, select
27+
from sqlalchemy import DDL, String, and_, func, select, bindparam, SmallInteger, literal
1828
from sqlalchemy.dialects import postgresql as postgres
1929
from sqlalchemy.dialects.postgresql import TSTZRANGE
2030
from sqlalchemy.engine import Engine, RowProxy
@@ -128,13 +138,14 @@ def init(self):
128138
129139
(Requires `create` permissions in the db)
130140
"""
141+
needed_update = not _schema.is_compatible_schema(self._engine)
142+
143+
# Add any missing schema items or patches.
131144
_schema.create_schema(self._engine)
132-
# Apply any needed updates.
133-
refresh_items = _schema.update_schema(self._engine)
145+
refresh_also = _schema.update_schema(self._engine)
134146

135-
# Refresh relevant data summaries
136-
for refresh_item in refresh_items:
137-
_refresh_data(refresh_item, store=self)
147+
if needed_update or refresh_also:
148+
_refresh_data(refresh_also, store=self)
138149

139150
@classmethod
140151
def create(cls, index: Index, log=_LOG) -> "SummaryStore":
@@ -185,7 +196,7 @@ def refresh_product(
185196
return None
186197

187198
_LOG.info("init.product", product_name=product.name)
188-
added_count = _extents.refresh_product(
199+
change_count = _extents.refresh_product(
189200
self.index,
190201
product,
191202
recompute_all_extents=force_dataset_extent_recompute,
@@ -226,7 +237,62 @@ def refresh_product(
226237
fixed_metadata=fixed_metadata,
227238
)
228239
)
229-
return added_count
240+
241+
self._refresh_product_regions(product)
242+
_LOG.info("init.regions.done", product_name=product.name)
243+
return change_count
244+
245+
def _refresh_product_regions(self, dataset_type: DatasetType) -> int:
246+
log = _LOG.bind(product_name=dataset_type.name)
247+
log.info("refresh.regions.start")
248+
select_by_srid = (
249+
select(
250+
[
251+
DATASET_SPATIAL.c.dataset_type_ref,
252+
DATASET_SPATIAL.c.region_code,
253+
func.ST_Transform(
254+
func.ST_Union(DATASET_SPATIAL.c.footprint), 4326
255+
).label("footprint"),
256+
func.count().label("count"),
257+
]
258+
)
259+
.where(
260+
DATASET_SPATIAL.c.dataset_type_ref
261+
== bindparam("product_ref", dataset_type.id, type_=SmallInteger)
262+
)
263+
.group_by("dataset_type_ref", "region_code")
264+
.cte("srid_groups")
265+
)
266+
267+
columns = dict(
268+
dataset_type_ref=select_by_srid.c.dataset_type_ref,
269+
region_code=func.coalesce(select_by_srid.c.region_code, ""),
270+
footprint=func.ST_SimplifyPreserveTopology(
271+
func.ST_Union(select_by_srid.c.footprint), literal(0.0001)
272+
),
273+
count=func.sum(select_by_srid.c.count),
274+
)
275+
query = postgres.insert(REGION).from_select(
276+
columns.keys(),
277+
select(columns.values())
278+
.select_from(select_by_srid)
279+
.group_by("dataset_type_ref", "region_code"),
280+
)
281+
query = query.on_conflict_do_update(
282+
index_elements=["dataset_type_ref", "region_code"],
283+
set_=dict(
284+
footprint=query.excluded.footprint,
285+
count=query.excluded.count,
286+
generation_time=func.now(),
287+
),
288+
)
289+
# Path(__file__).parent.joinpath("insertion.sql").write_text(
290+
# f"\n{as_sql(query)}\n"
291+
# )
292+
changed_rows = self._engine.execute(query).rowcount
293+
294+
log.info("refresh.regions.end", changed_regions=changed_rows)
295+
return changed_rows
230296

231297
def refresh_stats(self, concurrently=False):
232298
"""
@@ -905,21 +971,24 @@ def get_dataset_footprint_region(self, dataset_id):
905971
)
906972

907973

908-
def _refresh_data(item: PleaseRefresh, store: SummaryStore):
974+
def _refresh_data(please_refresh: Set[PleaseRefresh], store: SummaryStore):
909975
"""
910-
Refresh the given kind of data.
976+
Refresh product information after a schema update, plus the given kind of data.
911977
"""
912-
if item == PleaseRefresh.DATASET_EXTENTS:
913-
for dt in store.all_dataset_types():
914-
_LOG.info("data.refreshing_extents", product=dt.name)
915-
# Skip product if it's never been summarised at all.
916-
if store.get_product_summary(dt.name) is None:
917-
continue
918-
919-
store.refresh_product(dt, force_dataset_extent_recompute=True)
920-
_LOG.info("data.refreshing_extents.complete")
921-
else:
922-
raise NotImplementedError(f"Unknown data type to refresh_data: {item}")
978+
recompute_dataset_extents = PleaseRefresh.DATASET_EXTENTS in please_refresh
979+
980+
for dt in store.all_dataset_types():
981+
_LOG.info("data.refreshing_extents", product=dt.name)
982+
# Skip product if it's never been summarised at all.
983+
if store.get_product_summary(dt.name) is None:
984+
continue
985+
986+
store.refresh_product(
987+
dt,
988+
refresh_older_than=timedelta(minutes=-1),
989+
force_dataset_extent_recompute=recompute_dataset_extents,
990+
)
991+
_LOG.info("data.refreshing_extents.complete")
923992

924993

925994
def _safe_read_date(d):

0 commit comments

Comments
 (0)