-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: optimize specific indices #2192
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -197,9 +197,7 @@ def uri(self) -> str: | |
return self._uri | ||
|
||
def list_indices(self) -> List[Dict[str, Any]]: | ||
if getattr(self, "_list_indices_res", None) is None: | ||
self._list_indices_res = self._ds.load_indices() | ||
return self._list_indices_res | ||
return self._ds.load_indices() | ||
|
||
def index_statistics(self, index_name: str) -> Dict[str, Any]: | ||
warnings.warn( | ||
|
@@ -863,8 +861,6 @@ def drop_columns(self, columns: List[str]): | |
2 c | ||
""" | ||
self._ds.drop_columns(columns) | ||
# Indices might have changed | ||
self._list_indices_res = None | ||
|
||
def delete(self, predicate: Union[str, pa.compute.Expression]): | ||
""" | ||
|
@@ -2286,7 +2282,12 @@ def compact_files( | |
) | ||
return Compaction.execute(self._dataset, opts) | ||
|
||
def optimize_indices(self, **kwargs): | ||
def optimize_indices( | ||
self, | ||
merge_indices: bool | int | List[str] = 1, | ||
index_new_data: bool | List[int] = True, | ||
Comment on lines
+2286
to
+2288
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we do |
||
**kwargs, | ||
): | ||
"""Optimizes index performance. | ||
|
||
As new data arrives it is not added to existing indexes automatically. | ||
|
@@ -2298,8 +2299,34 @@ def optimize_indices(self, **kwargs): | |
the new data to existing partitions. This means an update is much quicker | ||
than retraining the entire index but may have less accuracy (especially | ||
if the new data exhibits new patterns, concepts, or trends) | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we update some of this wording so that we also explain what "merging" is and why you would want to do it? |
||
self._dataset._ds.optimize_indices(**kwargs) | ||
|
||
Parameters | ||
---------- | ||
merge_indices: bool | int | List[str] | ||
If True, all indices will be merged. If False, no indices will be | ||
merged and instead a new index delta will be created. If an integer, | ||
that number of indices will be merged. If a list of UUID strings, | ||
those specific indices will be merged. | ||
Comment on lines
+2305
to
+2309
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got a little confused here because I didn't know if you were talking about "pass in UUIDs if you only want to update some columns (e.g. multiple vector embeddings, each with an index, and only update a few)" or if you were talking about "pass in UUIDs here if you have multiple deltas in the same column to merge together" |
||
index_new_data: bool | List[int] | ||
If True, all new data will be indexed. If False, no new data will be | ||
indexed. If a list of fragment ids, those specific fragments will be | ||
indexed. | ||
""" | ||
# legacy parameter. | ||
if "num_indices_to_merge" in kwargs: | ||
warnings.warn( | ||
"num_indices_to_merge is deprecated, use merge_indices instead", | ||
DeprecationWarning, | ||
) | ||
num_indices_to_merge = kwargs.pop("num_indices_to_merge") | ||
if num_indices_to_merge == 0: | ||
merge_indices = False | ||
else: | ||
merge_indices = num_indices_to_merge | ||
|
||
self._dataset._ds.optimize_indices( | ||
merge_indices=merge_indices, index_new_data=index_new_data, **kwargs | ||
) | ||
|
||
|
||
class DatasetStats(TypedDict): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,7 +40,7 @@ use lance::dataset::{BatchInfo, BatchUDF, NewColumnTransform, UDFCheckpointStore | |
use lance::index::{scalar::ScalarIndexParams, vector::VectorIndexParams}; | ||
use lance_arrow::as_fixed_size_list_array; | ||
use lance_core::datatypes::Schema; | ||
use lance_index::optimize::OptimizeOptions; | ||
use lance_index::optimize::{IndexHandling, NewDataHandling, OptimizeOptions}; | ||
use lance_index::vector::hnsw::builder::HnswBuildParams; | ||
use lance_index::vector::sq::builder::SQBuildParams; | ||
use lance_index::{ | ||
|
@@ -49,6 +49,7 @@ use lance_index::{ | |
}; | ||
use lance_io::object_store::ObjectStoreParams; | ||
use lance_linalg::distance::MetricType; | ||
|
||
use lance_table::format::Fragment; | ||
use lance_table::io::commit::CommitHandler; | ||
use object_store::path::Path; | ||
|
@@ -62,6 +63,7 @@ use pyo3::{ | |
PyObject, PyResult, | ||
}; | ||
use snafu::{location, Location}; | ||
use uuid::Uuid; | ||
|
||
use crate::fragment::{FileFragment, FragmentMetadata}; | ||
use crate::schema::LanceSchema; | ||
|
@@ -867,15 +869,52 @@ impl Dataset { | |
}) | ||
} | ||
|
||
#[pyo3(signature = (**kwargs))] | ||
fn optimize_indices(&mut self, kwargs: Option<&PyDict>) -> PyResult<()> { | ||
#[pyo3(signature = (merge_indices, index_new_data, **_kwargs))] | ||
fn optimize_indices( | ||
&mut self, | ||
merge_indices: &PyAny, | ||
index_new_data: &PyAny, | ||
_kwargs: Option<&PyDict>, | ||
) -> PyResult<()> { | ||
let mut new_self = self.ds.as_ref().clone(); | ||
let mut options: OptimizeOptions = Default::default(); | ||
if let Some(kwargs) = kwargs { | ||
if let Some(num_indices_to_merge) = kwargs.get_item("num_indices_to_merge")? { | ||
options.num_indices_to_merge = num_indices_to_merge.extract()?; | ||
|
||
if let Ok(merge_indices_bool) = merge_indices.extract::<bool>() { | ||
if merge_indices_bool { | ||
options.index_handling = IndexHandling::MergeAll; | ||
} else { | ||
options.index_handling = IndexHandling::NewDelta; | ||
} | ||
} else if let Ok(merge_indices_int) = merge_indices.extract::<usize>() { | ||
options.index_handling = IndexHandling::MergeLatestN(merge_indices_int); | ||
} else if let Ok(merge_indices_ids) = merge_indices.extract::<Vec<String>>() { | ||
let index_ids = merge_indices_ids | ||
.iter() | ||
.map(|id_str| { | ||
Uuid::parse_str(id_str).map_err(|err| PyValueError::new_err(err.to_string())) | ||
}) | ||
.collect::<PyResult<Vec<Uuid>>>()?; | ||
options.index_handling = IndexHandling::MergeIndices(index_ids); | ||
} else { | ||
return Err(PyValueError::new_err( | ||
"merge_indices must be a boolean value, integer, or list of str.", | ||
)); | ||
} | ||
|
||
if let Ok(index_new_data_bool) = index_new_data.extract::<bool>() { | ||
if index_new_data_bool { | ||
options.new_data_handling = NewDataHandling::IndexAll; | ||
} else { | ||
options.new_data_handling = NewDataHandling::Ignore; | ||
} | ||
} else if let Ok(index_new_data_ids) = index_new_data.extract::<Vec<u32>>() { | ||
options.new_data_handling = NewDataHandling::Fragments(index_new_data_ids); | ||
} else { | ||
return Err(PyValueError::new_err( | ||
"index_new_data must be a boolean value.", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or a list of fragment ids? |
||
)); | ||
} | ||
|
||
RT.block_on( | ||
None, | ||
new_self | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1,77 @@ | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// SPDX-FileCopyrightText: Copyright The Lance Authors | ||
|
||
/// Options for optimizing all indices. | ||
#[derive(Debug)] | ||
pub struct OptimizeOptions { | ||
/// Number of delta indices to merge for one column. Default: 1. | ||
use uuid::Uuid; | ||
|
||
/// How to handle new unindexed data | ||
#[derive(Debug, Clone, PartialEq)] | ||
pub enum NewDataHandling { | ||
/// Do not index new data | ||
Ignore, | ||
/// Index all unindexed data | ||
IndexAll, | ||
/// Index only new data in specified fragments. The fragments are | ||
/// specified by their ids. | ||
Fragments(Vec<u32>), | ||
} | ||
|
||
/// How to merge indices. | ||
#[derive(Debug, Clone, PartialEq)] | ||
pub enum IndexHandling { | ||
/// Put all new data into it's own delta index. | ||
/// | ||
/// If `num_indices_to_merge` is 0, a new delta index will be created. | ||
/// If `num_indices_to_merge` is 1, the delta updates will be merged into the latest index. | ||
/// If `num_indices_to_merge` is more than 1, the delta updates and latest N indices | ||
/// will be merged into one single index. | ||
/// If NewDataHandling::Ignore is used, this is a no-op. | ||
NewDelta, | ||
/// Merge new data and the latest N indices into a single index. | ||
/// | ||
/// It is up to the caller to decide how many indices to merge / keep. Callers can | ||
/// find out how many indices are there by calling [`Dataset::index_statistics`]. | ||
/// If NewDataHandling::Ignore is used, this just merges the latest N indices. | ||
/// Unless N=1, then this is a no-op. | ||
MergeLatestN(usize), | ||
/// Merge all indices into a single index. | ||
MergeAll, | ||
/// Merge new data and the indices with the specified UUIDs. Only indices with | ||
/// the same name will be merged together. You can pass the UUIDs of the | ||
/// deltas of multiple indices, and they will be merged together into one | ||
/// index per name. | ||
/// | ||
/// A common usage pattern will be that, the caller can keep a large snapshot of the index of the base version, | ||
/// and accumulate a few delta indices, then merge them into the snapshot. | ||
pub num_indices_to_merge: usize, | ||
/// If NewDataHandling::Ignore is used, this just merges the specified indices. | ||
MergeIndices(Vec<Uuid>), | ||
} | ||
|
||
/// Options for optimizing all indices. | ||
/// | ||
/// To create a delta index with new data, write: | ||
/// | ||
/// ```rust | ||
/// # use lance_index::optimize::{OptimizeOptions, NewDataHandling, IndexHandling}; | ||
/// OptimizeOptions { | ||
/// new_data_handling: NewDataHandling::IndexAll, | ||
/// index_handling: IndexHandling::NewDelta, | ||
/// }; | ||
/// ``` | ||
/// | ||
/// To merge all existing indices without adding new data, write: | ||
/// | ||
/// ```rust | ||
/// # use lance_index::optimize::{OptimizeOptions, NewDataHandling, IndexHandling}; | ||
/// OptimizeOptions { | ||
/// new_data_handling: NewDataHandling::Ignore, | ||
/// index_handling: IndexHandling::MergeAll, | ||
/// }; | ||
#[derive(Debug, Clone, PartialEq)] | ||
pub struct OptimizeOptions { | ||
/// How to handle new unindexed data. | ||
pub new_data_handling: NewDataHandling, | ||
|
||
/// How to merge indices. | ||
pub index_handling: IndexHandling, | ||
} | ||
|
||
impl Default for OptimizeOptions { | ||
fn default() -> Self { | ||
Self { | ||
num_indices_to_merge: 1, | ||
new_data_handling: NewDataHandling::IndexAll, | ||
index_handling: IndexHandling::MergeLatestN(1), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this cache. There was no invalidation so it is often wrong.