Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): support merge_columns via Ray integration #2278

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions python/python/lance/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,15 @@
# SPDX-FileCopyrightText: Copyright The Lance Authors

"""Ray integration."""

from .schema import LanceMergeColumn, merge_columns
from .sink import LanceCommitter, LanceDatasink, LanceFragmentWriter, _register_hooks

__all__ = [
"_register_hooks",
"LanceCommitter",
"LanceDatasink",
"LanceFragmentWriter",
"LanceMergeColumn",
"merge_columns",
]
66 changes: 66 additions & 0 deletions python/python/lance/ray/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import pickle
from typing import Callable, Optional, Union

import pyarrow as pa

from .. import LanceDataset, dataset
from .sink import LanceCommitter


class LanceMergeColumn:
"""Merge columns in a distributed way."""

def __init__(
self,
value_func: Callable[[pa.RecordBatch], pa.RecordBatch],
columns: Optional[list[str]] = None,
):
self.value_func = value_func
self.columns = columns

def __call__(self, batch: pa.RecordBatch) -> pa.RecordBatch:
fragment = batch["item"]
new_fragment, schema = fragment.merge_columns(self.value_func, self.columns)

return {
"fragment": pickle.dumps(new_fragment),
"schema": pickle.dumps(schema),
}


def merge_columns(
data: Union[str, LanceDataset],
value_func: Callable[[pa.RecordBatch], pa.RecordBatch],
*,
columns: Optional[list[str]] = None,
):
"""Run merge_columns distributedly with Ray.

Parameters
----------
value_func: Callable.
A function that takes a RecordBatch as input and returns a RecordBatch.
columns: Optional[list[str]].
If specified, only the columns in this list will be passed to the
value_func. Otherwise, all columns will be passed to the value_func.

See Also
--------
lance.fragment.LanceFragment.merge_columns
"""
import ray

if isinstance(data, LanceDataset):
ds = data
else:
ds = dataset(data)

fragments = ds.get_fragments()

ray_ds = ray.data.from_items(fragments)
ray_ds.map(LanceMergeColumn(value_func, columns)).write_datasink(
LanceCommitter(ds.uri, mode="merge")
)
7 changes: 5 additions & 2 deletions python/python/lance/ray/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def supports_distributed_writes(self) -> bool:
return True

def on_write_start(self):
if self.mode == "append":
if self.mode in set(["append", "merge"]):
ds = lance.LanceDataset(self.uri)
self.read_version = ds.version
if self.schema is None:
Expand All @@ -135,10 +135,13 @@ def on_write_complete(
fragment = pickle.loads(fragment_str)
fragments.append(fragment)
schema = pickle.loads(schema_str)
# TODO: use pattern matching after python 3.10
if self.mode in set(["create", "overwrite"]):
op = lance.LanceOperation.Overwrite(schema, fragments)
elif self.mode == "append":
op = lance.LanceOperation.Append(fragments)
elif self.mode == "merge":
op = lance.LanceOperation.Merge(fragments, schema=schema)
lance.LanceDataset.commit(self.uri, op, read_version=self.read_version)


Expand Down Expand Up @@ -172,7 +175,7 @@ def __init__(
self,
uri: str,
schema: Optional[pa.Schema] = None,
mode: Literal["create", "append", "overwrite"] = "create",
mode: Literal["create", "append", "overwrite", "merge"] = "create",
max_rows_per_file: int = 1024 * 1024,
use_experimental_writer: bool = True,
*args,
Expand Down
29 changes: 28 additions & 1 deletion python/python/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@

import lance
import pyarrow as pa
import pyarrow.compute as pc
import pytest

ray = pytest.importorskip("ray")


from lance.ray.sink import ( # noqa: E402
from lance.ray import ( # noqa: E402
LanceCommitter,
LanceDatasink,
LanceFragmentWriter,
_register_hooks,
merge_columns,
)

# Use this hook until we have offical DataSink in Ray.
Expand Down Expand Up @@ -99,3 +101,28 @@ def test_ray_write_lance(tmp_path: Path):
tbl = ds.to_table()
assert sorted(tbl["id"].to_pylist()) == list(range(10))
assert set(tbl["str"].to_pylist()) == set([f"str-{i}" for i in range(10)])


def test_ray_merge_column(tmp_path: Path):
schema = pa.schema([pa.field("id", pa.int64()), pa.field("str", pa.string())])

(
ray.data.range(10)
.map(lambda x: {"id": x["id"], "str": f"str-{x['id']}"})
.write_lance(tmp_path, schema=schema)
)

def value_func(batch):
arrs = pc.add(batch["id"], 2)
return pa.RecordBatch.from_arrays([arrs], ["sum"])

merge_columns(tmp_path, value_func)

ds = lance.dataset(tmp_path)
schema = ds.schema
assert schema.names == ["id", "str", "sum"]

tbl = ds.to_table()
assert set(tbl["sum"].to_pylist()) == set(range(2, 12))
# Only bumped 1 version.
assert ds.version == 2