Skip to content

Commit

Permalink
wip: allow more control in creating delta indices
Browse files Browse the repository at this point in the history
feat: implement new parameters

refactor: move scalar index optimize into a different file

expose options in Pyhton

test in python
  • Loading branch information
wjones127 committed Apr 27, 2024
1 parent d0c313a commit 55a60bc
Show file tree
Hide file tree
Showing 10 changed files with 592 additions and 133 deletions.
43 changes: 35 additions & 8 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,7 @@ def uri(self) -> str:
return self._uri

def list_indices(self) -> List[Dict[str, Any]]:
if getattr(self, "_list_indices_res", None) is None:
self._list_indices_res = self._ds.load_indices()
return self._list_indices_res
return self._ds.load_indices()

def index_statistics(self, index_name: str) -> Dict[str, Any]:
warnings.warn(
Expand Down Expand Up @@ -863,8 +861,6 @@ def drop_columns(self, columns: List[str]):
2 c
"""
self._ds.drop_columns(columns)
# Indices might have changed
self._list_indices_res = None

def delete(self, predicate: Union[str, pa.compute.Expression]):
"""
Expand Down Expand Up @@ -2286,7 +2282,12 @@ def compact_files(
)
return Compaction.execute(self._dataset, opts)

def optimize_indices(self, **kwargs):
def optimize_indices(
self,
merge_indices: bool | int | List[str] = 1,
index_new_data: bool | List[int] = True,
**kwargs,
):
"""Optimizes index performance.
As new data arrives it is not added to existing indexes automatically.
Expand All @@ -2298,8 +2299,34 @@ def optimize_indices(self, **kwargs):
the new data to existing partitions. This means an update is much quicker
than retraining the entire index but may have less accuracy (especially
if the new data exhibits new patterns, concepts, or trends)
"""
self._dataset._ds.optimize_indices(**kwargs)
Parameters
----------
merge_indices: bool | int | List[str]
If True, all indices will be merged. If False, no indices will be
merged and instead a new index delta will be created. If an integer,
that number of indices will be merged. If a list of UUID strings,
those specific indices will be merged.
index_new_data: bool | List[int]
If True, all new data will be indexed. If False, no new data will be
indexed. If a list of fragment ids, those specific fragments will be
indexed.
"""
# legacy parameter.
if "num_indices_to_merge" in kwargs:
warnings.warn(
"num_indices_to_merge is deprecated, use merge_indices instead",
DeprecationWarning,
)
num_indices_to_merge = kwargs.pop("num_indices_to_merge")
if num_indices_to_merge == 0:
merge_indices = False
else:
merge_indices = num_indices_to_merge

self._dataset._ds.optimize_indices(
merge_indices=merge_indices, index_new_data=index_new_data, **kwargs
)


class DatasetStats(TypedDict):
Expand Down
40 changes: 39 additions & 1 deletion python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ def test_optimize_index(dataset, tmp_path):
ds = lance.write_dataset(dataset.to_table(), dataset_uri)
ds = ds.create_index(
"vector",
name="vector_index",
index_type="IVF_PQ",
num_partitions=4,
num_sub_vectors=2,
Expand All @@ -443,8 +444,45 @@ def test_optimize_index(dataset, tmp_path):
indices_dir = dataset_uri / "_indices"
assert len(list(indices_dir.iterdir())) == 1

ds = ds.optimize.optimize_indices()
# Default is to index new data and merge with the most recent index.
ds.optimize.optimize_indices()
assert len(list(indices_dir.iterdir())) == 2
index_stats = ds.stats.index_stats("vector_index")
assert len(index_stats["indices"]) == 1

# New data / make a delta
tbl = create_table(nvec=200)
ds = lance.write_dataset(tbl, dataset_uri, mode="append")
ds.optimize.optimize_indices(merge_indices=False)
index_stats = ds.stats.index_stats("vector_index")
assert len(index_stats["indices"]) == 2
assert index_stats["num_indexed_fragments"] == 3

# Two new fragments, make a delta out of only one
tbl = create_table(nvec=200)
ds = lance.write_dataset(tbl, dataset_uri, mode="append", max_rows_per_file=100)
to_index = [ds.get_fragments()[-1].fragment_id]
ds.optimize.optimize_indices(merge_indices=False, index_new_data=to_index)
index_stats = ds.stats.index_stats("vector_index")
assert len(index_stats["indices"]) == 3
assert index_stats["num_unindexed_fragments"] == 1
assert index_stats["num_indexed_fragments"] == 4

# Merge two specific indices
all_uuids = [idx["uuid"] for idx in ds.list_indices()]
to_merge = all_uuids[:2]
ds.optimize.optimize_indices(merge_indices=to_merge, index_new_data=False)
index_stats = ds.stats.index_stats("vector_index")
assert len(index_stats["indices"]) == 2
all_uuids = [idx["uuid"] for idx in ds.list_indices()]
assert all([uuid not in all_uuids for uuid in to_merge])

# Merge all indices, don't index new data
ds.optimize.optimize_indices(merge_indices=True, index_new_data=False)
index_stats = ds.stats.index_stats("vector_index")
assert len(index_stats["indices"]) == 1
assert index_stats["num_indexed_fragments"] == 4
assert index_stats["num_unindexed_fragments"] == 1


def create_uniform_table(min, max, nvec, offset, ndim=8):
Expand Down
51 changes: 45 additions & 6 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use lance::dataset::{BatchInfo, BatchUDF, NewColumnTransform, UDFCheckpointStore
use lance::index::{scalar::ScalarIndexParams, vector::VectorIndexParams};
use lance_arrow::as_fixed_size_list_array;
use lance_core::datatypes::Schema;
use lance_index::optimize::OptimizeOptions;
use lance_index::optimize::{IndexHandling, NewDataHandling, OptimizeOptions};
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::sq::builder::SQBuildParams;
use lance_index::{
Expand All @@ -49,6 +49,7 @@ use lance_index::{
};
use lance_io::object_store::ObjectStoreParams;
use lance_linalg::distance::MetricType;

use lance_table::format::Fragment;
use lance_table::io::commit::CommitHandler;
use object_store::path::Path;
Expand All @@ -62,6 +63,7 @@ use pyo3::{
PyObject, PyResult,
};
use snafu::{location, Location};
use uuid::Uuid;

use crate::fragment::{FileFragment, FragmentMetadata};
use crate::schema::LanceSchema;
Expand Down Expand Up @@ -867,15 +869,52 @@ impl Dataset {
})
}

#[pyo3(signature = (**kwargs))]
fn optimize_indices(&mut self, kwargs: Option<&PyDict>) -> PyResult<()> {
#[pyo3(signature = (merge_indices, index_new_data, **_kwargs))]
fn optimize_indices(
&mut self,
merge_indices: &PyAny,
index_new_data: &PyAny,
_kwargs: Option<&PyDict>,
) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
let mut options: OptimizeOptions = Default::default();
if let Some(kwargs) = kwargs {
if let Some(num_indices_to_merge) = kwargs.get_item("num_indices_to_merge")? {
options.num_indices_to_merge = num_indices_to_merge.extract()?;

if let Ok(merge_indices_bool) = merge_indices.extract::<bool>() {
if merge_indices_bool {
options.index_handling = IndexHandling::MergeAll;
} else {
options.index_handling = IndexHandling::NewDelta;
}
} else if let Ok(merge_indices_int) = merge_indices.extract::<usize>() {
options.index_handling = IndexHandling::MergeLatestN(merge_indices_int as usize);
} else if let Ok(merge_indices_ids) = merge_indices.extract::<Vec<String>>() {
let index_ids = merge_indices_ids
.iter()
.map(|id_str| {
Uuid::parse_str(id_str).map_err(|err| PyValueError::new_err(err.to_string()))
})
.collect::<PyResult<Vec<Uuid>>>()?;
options.index_handling = IndexHandling::MergeIndices(index_ids);
} else {
return Err(PyValueError::new_err(
"merge_indices must be a boolean value, integer, or list of str.",
));
}

if let Ok(index_new_data_bool) = index_new_data.extract::<bool>() {
if index_new_data_bool {
options.new_data_handling = NewDataHandling::IndexAll;
} else {
options.new_data_handling = NewDataHandling::Ignore;
}
} else if let Ok(index_new_data_ids) = index_new_data.extract::<Vec<u32>>() {
options.new_data_handling = NewDataHandling::Fragments(index_new_data_ids);
} else {
return Err(PyValueError::new_err(
"index_new_data must be a boolean value.",
));
}

RT.block_on(
None,
new_self
Expand Down
1 change: 1 addition & 0 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ snafu.workspace = true
tokio.workspace = true
tracing.workspace = true
tempfile.workspace = true
uuid.workspace = true

[dev-dependencies]
approx.workspace = true
Expand Down
75 changes: 61 additions & 14 deletions rust/lance-index/src/optimize.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,75 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

/// Options for optimizing all indices.
#[derive(Debug)]
pub struct OptimizeOptions {
/// Number of delta indices to merge for one column. Default: 1.
use uuid::Uuid;

/// How to handle new unindexed data
#[derive(Debug, Clone, PartialEq)]
pub enum NewDataHandling {
/// Do not index new data
Ignore,
/// Index all unindexed data
IndexAll,
/// Index only new data in specified fragments. The fragments are
/// specified by their ids.
Fragments(Vec<u32>),
}

/// How to merge indices.
#[derive(Debug, Clone, PartialEq)]
pub enum IndexHandling {
/// Put all new data into it's own delta index.
///
/// If `num_indices_to_merge` is 0, a new delta index will be created.
/// If `num_indices_to_merge` is 1, the delta updates will be merged into the latest index.
/// If `num_indices_to_merge` is more than 1, the delta updates and latest N indices
/// will be merged into one single index.
/// If NewDataHandling::Ignore is used, this is a no-op.
NewDelta,
/// Merge new data and the latest N indices into a single index.
///
/// It is up to the caller to decide how many indices to merge / keep. Callers can
/// find out how many indices are there by calling [`Dataset::index_statistics`].
/// If NewDataHandling::Ignore is used, this just merges the latest N indices.
/// Unless N=1, then this is a no-op.
MergeLatestN(usize),
/// Merge all indices into a single index.
MergeAll,
/// Merge new data and the indices with the specified UUIDs. Only indices with
/// the same name will be merged together. You can pass the UUIDs of the
/// deltas of multiple indices, and they will be merged together into one
/// index per name.
///
/// A common usage pattern will be that, the caller can keep a large snapshot of the index of the base version,
/// and accumulate a few delta indices, then merge them into the snapshot.
pub num_indices_to_merge: usize,
/// If NewDataHandling::Ignore is used, this just merges the specified indices.
MergeIndices(Vec<Uuid>),
}

/// Options for optimizing all indices.
///
/// To create a delta index with new data, write:
///
/// ```rust
/// OptimizeOptions {

Check failure on line 46 in rust/lance-index/src/optimize.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

cannot find struct, variant or union type `OptimizeOptions` in this scope
/// new_data_handling: NewDataHandling::IndexAll,

Check failure on line 47 in rust/lance-index/src/optimize.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

failed to resolve: use of undeclared type `NewDataHandling`
/// merge_index_options: MergeIndexOptions::NewDelta,

Check failure on line 48 in rust/lance-index/src/optimize.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

failed to resolve: use of undeclared type `MergeIndexOptions`
/// }
/// ```
///
/// To merge all existing indices without adding new data, write:
///
/// ```rust
/// OptimizeOptions {

Check failure on line 55 in rust/lance-index/src/optimize.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

cannot find struct, variant or union type `OptimizeOptions` in this scope
/// new_data_handling: NewDataHandling::Ignore,

Check failure on line 56 in rust/lance-index/src/optimize.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

failed to resolve: use of undeclared type `NewDataHandling`
/// merge_index_options: MergeIndexOptions::MergeAll,

Check failure on line 57 in rust/lance-index/src/optimize.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

failed to resolve: use of undeclared type `MergeIndexOptions`
/// }
#[derive(Debug, Clone, PartialEq)]
pub struct OptimizeOptions {
/// How to handle new unindexed data.
pub new_data_handling: NewDataHandling,

/// How to merge indices.
pub index_handling: IndexHandling,
}

impl Default for OptimizeOptions {
fn default() -> Self {
Self {
num_indices_to_merge: 1,
new_data_handling: NewDataHandling::IndexAll,
index_handling: IndexHandling::MergeLatestN(1),
}
}
}
8 changes: 4 additions & 4 deletions rust/lance-testing/src/datagen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ impl IncrementingInt32 {
self
}

pub fn named(mut self, name: String) -> Self {
self.name = Some(name);
pub fn named(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
}
Expand Down Expand Up @@ -104,8 +104,8 @@ impl RandomVector {
self
}

pub fn named(mut self, name: String) -> Self {
self.name = Some(name);
pub fn named(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
}
Expand Down

0 comments on commit 55a60bc

Please sign in to comment.