Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[python] Add spatial read to GeometryDataFrame #3733

Open
wants to merge 4 commits into
base: xan/geometry-dataframe-mq-read
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion apis/python/src/tiledbsoma/_geometry_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from ._spatial_util import (
coordinate_space_from_json,
coordinate_space_to_json,
process_spatial_df_region,
)
from ._types import OpenTimestamp
from .options import SOMATileDBContext
Expand Down Expand Up @@ -369,6 +370,7 @@ def read(
result_order=_util.to_clib_result_order(result_order),
value_filter=value_filter,
platform_config=platform_config,
coord_space=self._coord_space,
)

def read_spatial_region(
Expand Down Expand Up @@ -417,7 +419,60 @@ def read_spatial_region(
Lifecycle:
Experimental.
"""
raise NotImplementedError()
# Set/check transform and region coordinate space.
if region_transform is None:
region_transform = somacore.IdentityTransform(
self.axis_names, self.axis_names
)
if region_coord_space is not None:
raise ValueError(
"Cannot specify the output coordinate space when region transform i"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Cannot specify the output coordinate space when region transform i"
"Cannot specify the output coordinate space when region transform "

"is ``None``."
)
region_coord_space = self._coord_space
else:
if region_coord_space is None:
region_coord_space = CoordinateSpace.from_axis_names(
region_transform.input_axes
)
elif region_transform.input_axes != region_coord_space.axis_names:
raise ValueError(
f"The input axes '{region_transform.input_axes}' of the region "
f"transform must match the axes '{region_coord_space.axis_names}' "
f"of the coordinate space the requested region is defined in."
)
if region_transform.output_axes != self._coord_space.axis_names:
raise ValueError(
f"The output axes of '{region_transform.output_axes}' of the "
f"transform must match the axes '{self._coord_space.axis_names}' "
f"of the coordinate space of this point cloud dataframe."
)

# Process the user provided region.
coords, data_region, inv_transform = process_spatial_df_region(
region,
region_transform,
dict(), # Move index value_filters into this dict to optimize queries
self.index_column_names,
self._coord_space.axis_names,
self._handle.schema,
spatial_column=SOMA_GEOMETRY,
)

return somacore.SpatialRead(
self.read(
coords,
column_names,
result_order=result_order,
value_filter=value_filter,
batch_size=batch_size,
partitions=partitions,
platform_config=platform_config,
),
self.coordinate_space,
region_coord_space,
inv_transform,
)

def write(
self,
Expand Down
21 changes: 18 additions & 3 deletions apis/python/src/tiledbsoma/_read_iters.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import pyarrow as pa
import somacore
from scipy import sparse
from somacore import options
from somacore import CoordinateSpace, options

# This package's pybind11 code
import tiledbsoma.pytiledbsoma as clib
Expand Down Expand Up @@ -77,6 +77,8 @@ def __init__(
result_order: clib.ResultOrder,
value_filter: str | None,
platform_config: options.PlatformConfig | None,
*,
coord_space: CoordinateSpace | None = None,
):
"""Initalizes a new TableReadIter for SOMAArrays.

Expand Down Expand Up @@ -106,7 +108,13 @@ def __init__(

"""
self._reader = ArrowTableRead(
array, coords, column_names, result_order, value_filter, platform_config
array,
coords,
column_names,
result_order,
value_filter,
platform_config,
coord_space=coord_space,
)

def __next__(self) -> pa.Table:
Expand Down Expand Up @@ -565,6 +573,8 @@ def __init__(
result_order: clib.ResultOrder,
value_filter: str | None,
platform_config: options.PlatformConfig | None,
*,
coord_space: CoordinateSpace | None = None,
):
clib_handle = array._handle._handle

Expand All @@ -575,13 +585,18 @@ def __init__(
if column_names is not None:
for name in column_names:
clib_handle.get_column(name).select_columns(self.mq._handle)
else:
for name in array.schema.names:
clib_handle.get_column(name).select_columns(self.mq._handle)

if value_filter is not None:
self.mq._handle.set_condition(
QueryCondition(value_filter), clib_handle.schema
)

_util._set_coords(self.mq, coords)
_util._set_coords(
self.mq, coords, coord_space.axis_names if coord_space else None
)

def __next__(self) -> pa.Table:
return self.mq._handle.next()
Expand Down
22 changes: 16 additions & 6 deletions apis/python/src/tiledbsoma/_spatial_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def process_spatial_df_region(
index_columns: Tuple[str, ...],
axis_names: Tuple[str, ...],
schema: pa.Schema,
spatial_column: str | None = None,
) -> Tuple[
options.SparseDFCoords,
options.SpatialRegion | None,
Expand All @@ -195,6 +196,8 @@ def process_spatial_df_region(
raise KeyError("Extra coords cannot contain a spatial index column.")
if not set(index_columns).issuperset(coords_by_name):
raise KeyError("Extra coords must be index columns.")
if spatial_column is not None and spatial_column not in index_columns:
raise KeyError("Spatial column must be an index column.")

# Transform the region into the data region and add the spatial coordinates
# to the coords_by_name map.
Expand Down Expand Up @@ -227,12 +230,19 @@ def axis_slice(a: float, b: float, dtype: pa.DataType) -> slice:
# Add the transform region to coords. This gets the bounding box for the
# requested region.
(x_min, y_min, x_max, y_max) = shapely.bounds(data_region)
coords_by_name[axis_names[0]] = axis_slice(
x_min, x_max, schema.field(axis_names[0]).type
)
coords_by_name[axis_names[1]] = axis_slice(
y_min, y_max, schema.field(axis_names[1]).type
)

if spatial_column is None:
coords_by_name[axis_names[0]] = axis_slice(
x_min, x_max, schema.field(axis_names[0]).type
)
coords_by_name[axis_names[1]] = axis_slice(
y_min, y_max, schema.field(axis_names[1]).type
)
else:
coords_by_name[spatial_column] = [
axis_slice(x_min, x_max, pa.float64()),
axis_slice(y_min, y_max, pa.float64()),
]

coords = tuple(coords_by_name.get(index_name) for index_name in index_columns)
inv_transform = transform.inverse_transform()
Expand Down
76 changes: 73 additions & 3 deletions apis/python/src/tiledbsoma/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,11 @@ def _cast_domainish(domainish: List[Any]) -> Tuple[Tuple[object, object], ...]:
return tuple(result)


def _set_coords(mq: ManagedQuery, coords: options.SparseNDCoords) -> None:
def _set_coords(
mq: ManagedQuery,
coords: options.SparseNDCoords,
axis_names: Sequence[str] | None = None,
) -> None:
if not is_nonstringy_sequence(coords):
raise TypeError(
f"coords type {type(coords)} must be a regular sequence,"
Expand All @@ -468,10 +472,15 @@ def _set_coords(mq: ManagedQuery, coords: options.SparseNDCoords) -> None:
)

for i, coord in enumerate(coords):
_set_coord(i, mq, coord)
_set_coord(i, mq, coord, axis_names)


def _set_coord(dim_idx: int, mq: ManagedQuery, coord: object) -> None:
def _set_coord(
dim_idx: int,
mq: ManagedQuery,
coord: object,
axis_names: Sequence[str] | None = None,
) -> None:
if coord is None:
return

Expand All @@ -480,6 +489,26 @@ def _set_coord(dim_idx: int, mq: ManagedQuery, coord: object) -> None:

column = mq._array._handle._handle.get_column(dim.name)

if dim.metadata is not None:
if dim.metadata[b"dtype"].decode("utf-8") == "WKB":
if axis_names is None:
raise ValueError(
"Axis names are required to set geometry column coordinates"
)
if not isinstance(dom[0], Mapping) or not isinstance(dom[1], Mapping):
raise ValueError(
"Domain should be expressed per axis for geometry columns"
)

_set_geometry_coord(
mq,
dim,
cast(Tuple[Mapping[str, float], Mapping[str, float]], dom),
coord,
axis_names,
)
return

if isinstance(coord, (str, bytes)):
column.set_dim_points_string_or_bytes(mq._handle, [coord])
return
Expand Down Expand Up @@ -548,6 +577,47 @@ def _set_coord(dim_idx: int, mq: ManagedQuery, coord: object) -> None:
raise TypeError(f"unhandled type {dim.type} for index column named {dim.name}")


def _set_geometry_coord(
mq: ManagedQuery,
dim: pa.Field,
dom: Tuple[Mapping[str, float], Mapping[str, float]],
coord: object,
axis_names: Sequence[str],
) -> None:
if not isinstance(coord, Sequence):
raise ValueError(
f"unhandled coord type {type(coord)} for index column named {dim.name}"
)

ordered_dom_min = [dom[0][axis] for axis in axis_names]
ordered_dom_max = [dom[1][axis] for axis in axis_names]

column = mq._array._handle._handle.get_column(dim.name)

if all([is_slice_of(x, np.float64) for x in coord]):
range_min = []
range_max = []
for sub_coord, dom_min, dom_max in zip(coord, ordered_dom_min, ordered_dom_max):
validate_slice(sub_coord)
lo_hi = slice_to_numeric_range(sub_coord, (dom_min, dom_max))

# None slices need to be set to the domain size
if lo_hi is None:
range_min.append(dom_min)
range_max.append(dom_max)
else:
range_min.append(lo_hi[0])
range_max.append(lo_hi[1])

column.set_dim_ranges_double_array(mq._handle, [(range_min, range_max)])
elif all([isinstance(x, np.number) for x in coord]):
column.set_dim_points_double_array(mq._handle, [coord])
else:
raise ValueError(
f"Unsupported spatial coordinate type. Expected slice or float, found {type(coord)}"
)


def _set_coord_by_py_seq_or_np_array(
mq: ManagedQuery, dim: pa.Field, coord: object
) -> None:
Expand Down
16 changes: 16 additions & 0 deletions apis/python/src/tiledbsoma/soma_column.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ void load_soma_column(py::module& m) {
const std::vector<uint8_t>& points) {
column->set_dim_points<uint8_t>(mq, points);
})
.def(
"set_dim_points_double_array",
[](std::shared_ptr<SOMAColumn>& column,
ManagedQuery& mq,
const std::vector<std::vector<double_t>>& points) {
column->set_dim_points<std::vector<double_t>>(mq, points);
})
.def(
"set_dim_ranges_string_or_bytes",
[](std::shared_ptr<SOMAColumn>& column,
Expand Down Expand Up @@ -188,6 +195,15 @@ void load_soma_column(py::module& m) {
const std::vector<std::pair<uint8_t, uint8_t>>& ranges) {
column->set_dim_ranges<uint8_t>(mq, ranges);
})
.def(
"set_dim_ranges_double_array",
[](std::shared_ptr<SOMAColumn>& column,
ManagedQuery& mq,
const std::vector<
std::pair<std::vector<double_t>, std::vector<double_t>>>&
ranges) {
column->set_dim_ranges<std::vector<double_t>>(mq, ranges);
})
.def(
"set_dim_points_arrow",
[](std::shared_ptr<SOMAColumn>& column,
Expand Down
50 changes: 40 additions & 10 deletions apis/python/tests/test_geometry_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def test_geometry_basic_read(tmp_path):
uri = tmp_path.as_uri()

asch = pa.schema([("quality", pa.float32())])
triangle = shapely.Polygon([(0, 0), (0, 1), (1, 0), (0, 0)])
rect = shapely.Polygon([(0, 0), (0, 1), (1, 1), (1, 0), (0, 0)])

with soma.GeometryDataFrame.create(
uri, schema=asch, domain=[[(-10, 10), (-10, 10)], [0, 100]]
Expand All @@ -80,15 +82,43 @@ def test_geometry_basic_read(tmp_path):
with soma.GeometryDataFrame.open(uri) as geom:
result = geom.read().concat()

assert result[0].to_numpy()[0] == triangle.wkb
assert result[0].to_numpy()[1] == rect.wkb

assert shapely.from_wkb(
result["soma_geometry"].to_numpy()[0]
) == shapely.Polygon([(0, 0), (0, 1), (1, 0), (0, 0)])
assert shapely.from_wkb(
result["soma_geometry"].to_numpy()[1]
) == shapely.Polygon([(0, 0), (0, 1), (1, 1), (1, 0), (0, 0)])


def test_geometry_basic_spatial_read(tmp_path):
uri = tmp_path.as_uri()

asch = pa.schema([("quality", pa.float32())])

with soma.GeometryDataFrame.create(
uri, schema=asch, domain=[[(-10, 10), (-10, 10)], [0, 100]]
) as geom:
pydict = {}
pydict["soma_geometry"] = [
[0.0, 0, 0, 1, 1, 0, 0, 0],
[2.0, 0, 2, 1, 3, 1, 3, 0, 2, 0],
]
pydict["soma_joinid"] = [1, 2]
pydict["quality"] = [4.1, 5.2]

rb = pa.Table.from_pydict(pydict)
geom.from_outlines(rb)

with soma.GeometryDataFrame.open(uri) as geom:

result = geom.read_spatial_region(region=[0.5, 0.5, 1.5, 1.5]).data.concat()

# Internal columns will be hidden in a subsequent PR
assert (result[0].to_numpy() == [0, 0]).all()
assert (result[1].to_numpy() == [0, 0]).all()
assert (result[2].to_numpy() == [1, 1]).all()
assert (result[3].to_numpy() == [1, 1]).all()
assert len(result) == 1

assert shapely.from_wkb(result[5].to_numpy()[0]) == shapely.Polygon(
[(0, 0), (0, 1), (1, 0), (0, 0)]
)
assert shapely.from_wkb(result[5].to_numpy()[1]) == shapely.Polygon(
[(0, 0), (0, 1), (1, 1), (1, 0), (0, 0)]
)
assert shapely.from_wkb(
result["soma_geometry"].to_numpy()[0]
) == shapely.Polygon([(0, 0), (0, 1), (1, 0), (0, 0)])
11 changes: 11 additions & 0 deletions libtiledbsoma/src/utils/arrow_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,17 @@ std::unique_ptr<ArrowSchema> ArrowAdapter::arrow_schema_from_tiledb_attribute(
arrow_schema->release = &ArrowAdapter::release_schema;
arrow_schema->private_data = nullptr;

if (attribute.type() == TILEDB_GEOM_WKB) {
nanoarrow::UniqueBuffer metadata_buffer;
ArrowMetadataBuilderInit(metadata_buffer.get(), nullptr);
ArrowMetadataBuilderAppend(
metadata_buffer.get(),
ArrowCharView("dtype"),
ArrowCharView("WKB"));
ArrowSchemaSetMetadata(
arrow_schema.get(), reinterpret_cast<char*>(metadata_buffer->data));
}

LOG_TRACE(std::format(
"[ArrowAdapter] arrow_schema_from_tiledb_array format {} "
"name {}",
Expand Down
Loading