Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chip away some refactors in geometry.py #46

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 70 additions & 25 deletions sgeop/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import geopandas as gpd
import numpy as np
import pandas as pd
import shapely
from libpysal import graph
from scipy import spatial
Expand Down Expand Up @@ -122,7 +123,7 @@ def voronoi_skeleton(
buffer : None | float | int = None
Optional custom buffer distance for dealing with Voronoi infinity issues.
secondary_snap_to : None | gpd.GeoSeries = None
...
Fall-back series of geometries that shall be connected to the skeleton.
limit_distance : None | int = 2
...
consolidation_tolerance : None | float = None
Expand Down Expand Up @@ -236,7 +237,7 @@ def voronoi_skeleton(
if edgelines.shape[0] > 0:
# if there is no explicit snapping target, snap to the boundary of the polygon
# via the shortest line. That is by definition always within the polygon
# (Martin thinks)
# (Martin thinks) (James concurs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

if snap_to is not False:
if snap_to is None:
sl = shapely.shortest_line(
Expand Down Expand Up @@ -296,24 +297,45 @@ def _consolidate(
return edgelines


def snap_to_targets(edgelines, poly, snap_to, secondary_snap_to=None):
def snap_to_targets(
edgelines: np.ndarray,
poly: shapely.Polygon,
snap_to: gpd.GeoSeries,
secondary_snap_to: None | gpd.GeoSeries = None,
) -> tuple[list[shapely.LineString], list[shapely.Point]]:
"""Snap edgelines to vertices.

Parameters
----------
edgelines : numpy.ndarray
Voronoi skeleton edges.
poly : None | shapely.Polygon = None
Polygon enclosed by ``lines``.
snap_to : None | gpd.GeoSeries = None
Series of geometries that shall be connected to the skeleton.
secondary_snap_to : None | gpd.GeoSeries = None
Fall-back series of geometries that shall be connected to the skeleton.

Returns
-------
to_add, to_split : tuple[list[shapely.LineString], list[shapely.Point]]
Lines to add and points where to split.
"""

to_add = []
to_split = []
# cast edgelines to gdf
edgelines_df = gpd.GeoDataFrame(geometry=edgelines)
# build queen contiguity on edgelines and extract component labels
comp_labels = graph.Graph.build_contiguity(
edgelines_df[~(edgelines_df.is_empty | edgelines_df.geometry.isna())],
rook=False,
).component_labels
# compute size of each component
comp_counts = comp_labels.value_counts()
# get MultiLineString geometry per connected component
components = edgelines_df.dissolve(comp_labels)

# generate graph from lines
comp_labels, comp_counts, components = _prep_components(edgelines)

primary_union = shapely.union_all(snap_to)
secondary_union = shapely.union_all(secondary_snap_to)

# if there are muliple components, loop over all and treat each
if len(components) > 1:
for comp_label, comp in components.geometry.items():
cbound = comp.boundary

# if component does not interest the boundary, it needs to be snapped
# if it does but has only one part, this part interesect only on one
# side (the node remaining from the removed edge) and needs to be
Expand All @@ -322,30 +344,25 @@ def snap_to_targets(edgelines, poly, snap_to, secondary_snap_to=None):
(not comp.intersects(poly.boundary))
or comp_counts[comp_label] == 1
or (
not comp.intersects(shapely.union_all(snap_to))
not comp.intersects(primary_union)
) # ! this fixes one thing but may break others
):
# add segment composed of the shortest line to the nearest snapping
# target. We use boundary to snap to endpoints of edgelines only
sl = shapely.shortest_line(comp.boundary, shapely.union_all(snap_to))
sl = shapely.shortest_line(cbound, primary_union)
if _is_within(sl, poly):
to_add.append(sl)
to_split.append(shapely.get_point(sl, -1))
to_split, to_add = _split_add(sl, to_split, to_add)
else:
if secondary_snap_to is not None:
sl = shapely.shortest_line(
comp.boundary, shapely.union_all(secondary_snap_to)
)
to_split.append(shapely.get_point(sl, -1))
to_add.append(sl)
sl = shapely.shortest_line(cbound, secondary_union)
to_split, to_add = _split_add(sl, to_split, to_add)
else:
# if there is a single component, ensure it gets a shortest line to an
# endpoint from each snapping target
for target in snap_to:
sl = shapely.shortest_line(components.boundary.item(), target)
if _is_within(sl, poly):
to_split.append(shapely.get_point(sl, -1))
to_add.append(sl)
to_split, to_add = _split_add(sl, to_split, to_add)
else:
warnings.warn(
"Could not create a connection as it would lead outside "
Expand All @@ -354,3 +371,31 @@ def snap_to_targets(edgelines, poly, snap_to, secondary_snap_to=None):
stacklevel=2,
)
return to_add, to_split


def _prep_components(lines: np.ndarray) -> tuple[pd.Series, pd.Series, gpd.GeoSeries]:
"""Helper for preparing graph components & labels in PySAL."""

# cast edgelines to gdf
lines = gpd.GeoDataFrame(geometry=lines)

# build queen contiguity on edgelines and extract component labels
not_empty = ~lines.is_empty
not_nan = ~lines.geometry.isna()
lines = lines[not_empty | not_nan]
comp_labels = graph.Graph.build_contiguity(lines, rook=False).component_labels

# compute size of each component
comp_counts = comp_labels.value_counts()

# get MultiLineString geometry per connected component
components = lines.dissolve(comp_labels)

return comp_labels, comp_counts, components


def _split_add(line: shapely.LineString, splits: list, adds: list) -> tuple[list]:
"""Helper for preparing splitter points & added lines."""
splits.append(shapely.get_point(line, -1))
adds.append(line)
return splits, adds
51 changes: 51 additions & 0 deletions sgeop/tests/test_geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,54 @@ def test_consolidate(tolerance):
numpy.array([line_100_900, line_100_120, line_110_900]), tolerance
)
numpy.testing.assert_array_equal(observed, known)


def test_prep_components():
line1 = shapely.LineString(((1, 1), (1, 2)))
line2 = shapely.LineString(((1, 2), (2, 2)))
line3 = shapely.LineString(((3, 0), (3, 3)))

known_labels = pandas.Series(
[0, 0, 1],
index=pandas.Index([0, 1, 2], name="focal"),
name="component labels",
dtype=numpy.int32,
)
known_counts = pandas.Series(
[2, 1],
index=pandas.Index([0, 1], name="component labels", dtype=numpy.int32),
name="count",
dtype=numpy.int64,
)
known_comps = geopandas.GeoDataFrame(
geometry=[
shapely.MultiLineString(
(
shapely.LineString(((1, 1), (1, 2))),
shapely.LineString(((1, 2), (2, 2))),
)
),
shapely.LineString(((3, 0), (3, 3))),
],
index=pandas.Index([0, 1], name="component labels", dtype=numpy.int32),
)

observed_labels, observed_counts, observed_comps = sgeop.geometry._prep_components(
[line1, line2, line3]
)

pandas.testing.assert_series_equal(observed_labels, known_labels)
pandas.testing.assert_series_equal(observed_counts, known_counts)
geopandas.testing.assert_geodataframe_equal(observed_comps, known_comps)


def test_split_add():
_x = 1100
x1, y1 = _x, 0
x2, y2 = _x, 1000
sl = shapely.LineString(((x1, y1), (x2, y2)))
known_splits = [shapely.Point((x2, y2))]
known_adds = [sl]
observed_splits, observed_adds = sgeop.geometry._split_add(sl, [], [])
assert observed_splits == known_splits
assert observed_adds == known_adds
Loading