Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize specific indices #2192

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 35 additions & 8 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,7 @@ def uri(self) -> str:
return self._uri

def list_indices(self) -> List[Dict[str, Any]]:
if getattr(self, "_list_indices_res", None) is None:
self._list_indices_res = self._ds.load_indices()
return self._list_indices_res
return self._ds.load_indices()
Comment on lines 199 to +200
Copy link
Contributor Author

@wjones127 wjones127 Apr 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this cache. There was no invalidation so it is often wrong.


def index_statistics(self, index_name: str) -> Dict[str, Any]:
warnings.warn(
Expand Down Expand Up @@ -863,8 +861,6 @@ def drop_columns(self, columns: List[str]):
2 c
"""
self._ds.drop_columns(columns)
# Indices might have changed
self._list_indices_res = None

def delete(self, predicate: Union[str, pa.compute.Expression]):
"""
Expand Down Expand Up @@ -2286,7 +2282,12 @@ def compact_files(
)
return Compaction.execute(self._dataset, opts)

def optimize_indices(self, **kwargs):
def optimize_indices(
self,
merge_indices: bool | int | List[str] = 1,
index_new_data: bool | List[int] = True,
Comment on lines +2286 to +2288
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do self, *, merge_indices, index_new_data?

**kwargs,
):
"""Optimizes index performance.

As new data arrives it is not added to existing indexes automatically.
Expand All @@ -2298,8 +2299,34 @@ def optimize_indices(self, **kwargs):
the new data to existing partitions. This means an update is much quicker
than retraining the entire index but may have less accuracy (especially
if the new data exhibits new patterns, concepts, or trends)
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update some of this wording so that we also explain what "merging" is and why you would want to do it?

self._dataset._ds.optimize_indices(**kwargs)

Parameters
----------
merge_indices: bool | int | List[str]
If True, all indices will be merged. If False, no indices will be
merged and instead a new index delta will be created. If an integer,
that number of indices will be merged. If a list of UUID strings,
those specific indices will be merged.
Comment on lines +2305 to +2309
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a little confused here because I didn't know if you were talking about "pass in UUIDs if you only want to update some columns (e.g. multiple vector embeddings, each with an index, and only update a few)" or if you were talking about "pass in UUIDs here if you have multiple deltas in the same column to merge together"

index_new_data: bool | List[int]
If True, all new data will be indexed. If False, no new data will be
indexed. If a list of fragment ids, those specific fragments will be
indexed.
"""
# legacy parameter.
if "num_indices_to_merge" in kwargs:
warnings.warn(
"num_indices_to_merge is deprecated, use merge_indices instead",
DeprecationWarning,
)
num_indices_to_merge = kwargs.pop("num_indices_to_merge")
if num_indices_to_merge == 0:
merge_indices = False
else:
merge_indices = num_indices_to_merge

self._dataset._ds.optimize_indices(
merge_indices=merge_indices, index_new_data=index_new_data, **kwargs
)


class DatasetStats(TypedDict):
Expand Down
40 changes: 39 additions & 1 deletion python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ def test_optimize_index(dataset, tmp_path):
ds = lance.write_dataset(dataset.to_table(), dataset_uri)
ds = ds.create_index(
"vector",
name="vector_index",
index_type="IVF_PQ",
num_partitions=4,
num_sub_vectors=2,
Expand All @@ -443,8 +444,45 @@ def test_optimize_index(dataset, tmp_path):
indices_dir = dataset_uri / "_indices"
assert len(list(indices_dir.iterdir())) == 1

ds = ds.optimize.optimize_indices()
# Default is to index new data and merge with the most recent index.
ds.optimize.optimize_indices()
assert len(list(indices_dir.iterdir())) == 2
index_stats = ds.stats.index_stats("vector_index")
assert len(index_stats["indices"]) == 1

# New data / make a delta
tbl = create_table(nvec=200)
ds = lance.write_dataset(tbl, dataset_uri, mode="append")
ds.optimize.optimize_indices(merge_indices=False)
index_stats = ds.stats.index_stats("vector_index")
assert len(index_stats["indices"]) == 2
assert index_stats["num_indexed_fragments"] == 3

# Two new fragments, make a delta out of only one
tbl = create_table(nvec=200)
ds = lance.write_dataset(tbl, dataset_uri, mode="append", max_rows_per_file=100)
to_index = [ds.get_fragments()[-1].fragment_id]
ds.optimize.optimize_indices(merge_indices=False, index_new_data=to_index)
index_stats = ds.stats.index_stats("vector_index")
assert len(index_stats["indices"]) == 3
assert index_stats["num_unindexed_fragments"] == 1
assert index_stats["num_indexed_fragments"] == 4

# Merge two specific indices
all_uuids = [idx["uuid"] for idx in ds.list_indices()]
to_merge = all_uuids[:2]
ds.optimize.optimize_indices(merge_indices=to_merge, index_new_data=False)
index_stats = ds.stats.index_stats("vector_index")
assert len(index_stats["indices"]) == 2
all_uuids = [idx["uuid"] for idx in ds.list_indices()]
assert all([uuid not in all_uuids for uuid in to_merge])

# Merge all indices, don't index new data
ds.optimize.optimize_indices(merge_indices=True, index_new_data=False)
index_stats = ds.stats.index_stats("vector_index")
assert len(index_stats["indices"]) == 1
assert index_stats["num_indexed_fragments"] == 4
assert index_stats["num_unindexed_fragments"] == 1


def create_uniform_table(min, max, nvec, offset, ndim=8):
Expand Down
51 changes: 45 additions & 6 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use lance::dataset::{BatchInfo, BatchUDF, NewColumnTransform, UDFCheckpointStore
use lance::index::{scalar::ScalarIndexParams, vector::VectorIndexParams};
use lance_arrow::as_fixed_size_list_array;
use lance_core::datatypes::Schema;
use lance_index::optimize::OptimizeOptions;
use lance_index::optimize::{IndexHandling, NewDataHandling, OptimizeOptions};
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::sq::builder::SQBuildParams;
use lance_index::{
Expand All @@ -49,6 +49,7 @@ use lance_index::{
};
use lance_io::object_store::ObjectStoreParams;
use lance_linalg::distance::MetricType;

use lance_table::format::Fragment;
use lance_table::io::commit::CommitHandler;
use object_store::path::Path;
Expand All @@ -62,6 +63,7 @@ use pyo3::{
PyObject, PyResult,
};
use snafu::{location, Location};
use uuid::Uuid;

use crate::fragment::{FileFragment, FragmentMetadata};
use crate::schema::LanceSchema;
Expand Down Expand Up @@ -867,15 +869,52 @@ impl Dataset {
})
}

#[pyo3(signature = (**kwargs))]
fn optimize_indices(&mut self, kwargs: Option<&PyDict>) -> PyResult<()> {
#[pyo3(signature = (merge_indices, index_new_data, **_kwargs))]
fn optimize_indices(
&mut self,
merge_indices: &PyAny,
index_new_data: &PyAny,
_kwargs: Option<&PyDict>,
) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
let mut options: OptimizeOptions = Default::default();
if let Some(kwargs) = kwargs {
if let Some(num_indices_to_merge) = kwargs.get_item("num_indices_to_merge")? {
options.num_indices_to_merge = num_indices_to_merge.extract()?;

if let Ok(merge_indices_bool) = merge_indices.extract::<bool>() {
if merge_indices_bool {
options.index_handling = IndexHandling::MergeAll;
} else {
options.index_handling = IndexHandling::NewDelta;
}
} else if let Ok(merge_indices_int) = merge_indices.extract::<usize>() {
options.index_handling = IndexHandling::MergeLatestN(merge_indices_int);
} else if let Ok(merge_indices_ids) = merge_indices.extract::<Vec<String>>() {
let index_ids = merge_indices_ids
.iter()
.map(|id_str| {
Uuid::parse_str(id_str).map_err(|err| PyValueError::new_err(err.to_string()))
})
.collect::<PyResult<Vec<Uuid>>>()?;
options.index_handling = IndexHandling::MergeIndices(index_ids);
} else {
return Err(PyValueError::new_err(
"merge_indices must be a boolean value, integer, or list of str.",
));
}

if let Ok(index_new_data_bool) = index_new_data.extract::<bool>() {
if index_new_data_bool {
options.new_data_handling = NewDataHandling::IndexAll;
} else {
options.new_data_handling = NewDataHandling::Ignore;
}
} else if let Ok(index_new_data_ids) = index_new_data.extract::<Vec<u32>>() {
options.new_data_handling = NewDataHandling::Fragments(index_new_data_ids);
} else {
return Err(PyValueError::new_err(
"index_new_data must be a boolean value.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or a list of fragment ids?

));
}

RT.block_on(
None,
new_self
Expand Down
1 change: 1 addition & 0 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ snafu.workspace = true
tokio.workspace = true
tracing.workspace = true
tempfile.workspace = true
uuid.workspace = true

[dev-dependencies]
approx.workspace = true
Expand Down
77 changes: 63 additions & 14 deletions rust/lance-index/src/optimize.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,77 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

/// Options for optimizing all indices.
#[derive(Debug)]
pub struct OptimizeOptions {
/// Number of delta indices to merge for one column. Default: 1.
use uuid::Uuid;

/// How to handle new unindexed data
#[derive(Debug, Clone, PartialEq)]
pub enum NewDataHandling {
/// Do not index new data
Ignore,
/// Index all unindexed data
IndexAll,
/// Index only new data in specified fragments. The fragments are
/// specified by their ids.
Fragments(Vec<u32>),
}

/// How to merge indices.
#[derive(Debug, Clone, PartialEq)]
pub enum IndexHandling {
/// Put all new data into it's own delta index.
///
/// If `num_indices_to_merge` is 0, a new delta index will be created.
/// If `num_indices_to_merge` is 1, the delta updates will be merged into the latest index.
/// If `num_indices_to_merge` is more than 1, the delta updates and latest N indices
/// will be merged into one single index.
/// If NewDataHandling::Ignore is used, this is a no-op.
NewDelta,
/// Merge new data and the latest N indices into a single index.
///
/// It is up to the caller to decide how many indices to merge / keep. Callers can
/// find out how many indices are there by calling [`Dataset::index_statistics`].
/// If NewDataHandling::Ignore is used, this just merges the latest N indices.
/// Unless N=1, then this is a no-op.
MergeLatestN(usize),
/// Merge all indices into a single index.
MergeAll,
/// Merge new data and the indices with the specified UUIDs. Only indices with
/// the same name will be merged together. You can pass the UUIDs of the
/// deltas of multiple indices, and they will be merged together into one
/// index per name.
///
/// A common usage pattern will be that, the caller can keep a large snapshot of the index of the base version,
/// and accumulate a few delta indices, then merge them into the snapshot.
pub num_indices_to_merge: usize,
/// If NewDataHandling::Ignore is used, this just merges the specified indices.
MergeIndices(Vec<Uuid>),
}

/// Options for optimizing all indices.
///
/// To create a delta index with new data, write:
///
/// ```rust
/// # use lance_index::optimize::{OptimizeOptions, NewDataHandling, IndexHandling};
/// OptimizeOptions {
/// new_data_handling: NewDataHandling::IndexAll,
/// index_handling: IndexHandling::NewDelta,
/// };
/// ```
///
/// To merge all existing indices without adding new data, write:
///
/// ```rust
/// # use lance_index::optimize::{OptimizeOptions, NewDataHandling, IndexHandling};
/// OptimizeOptions {
/// new_data_handling: NewDataHandling::Ignore,
/// index_handling: IndexHandling::MergeAll,
/// };
#[derive(Debug, Clone, PartialEq)]
pub struct OptimizeOptions {
/// How to handle new unindexed data.
pub new_data_handling: NewDataHandling,

/// How to merge indices.
pub index_handling: IndexHandling,
}

impl Default for OptimizeOptions {
fn default() -> Self {
Self {
num_indices_to_merge: 1,
new_data_handling: NewDataHandling::IndexAll,
index_handling: IndexHandling::MergeLatestN(1),
}
}
}
8 changes: 4 additions & 4 deletions rust/lance-testing/src/datagen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ impl IncrementingInt32 {
self
}

pub fn named(mut self, name: String) -> Self {
self.name = Some(name);
pub fn named(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
}
Expand Down Expand Up @@ -104,8 +104,8 @@ impl RandomVector {
self
}

pub fn named(mut self, name: String) -> Self {
self.name = Some(name);
pub fn named(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
}
Expand Down