diff --git a/crates/base/src/index.rs b/crates/base/src/index.rs index 6630f5bee..d8c3a7bf2 100644 --- a/crates/base/src/index.rs +++ b/crates/base/src/index.rs @@ -1,6 +1,8 @@ use crate::distance::*; use crate::vector::*; use serde::{Deserialize, Serialize}; +use std::sync::atomic::AtomicU16; +use std::sync::atomic::Ordering; use thiserror::Error; use uuid::Uuid; use validator::{Validate, ValidationError}; @@ -78,6 +80,61 @@ pub enum StatError { NotExist, } +#[must_use] +#[derive(Debug, Clone, Error, Serialize, Deserialize)] +pub enum SettingError { + #[error("Setting key {key} is not exist.")] + BadKey { key: String }, + #[error("Setting key {key} has a wrong value {value}.")] + BadValue { key: String, value: String }, + #[error("Index not found.")] + NotExist, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct IndexFlexibleOptions { + #[serde(default = "IndexFlexibleOptions::default_optimizing_threads")] + pub optimizing_threads: AtomicU16, +} + +impl IndexFlexibleOptions { + fn default_optimizing_threads() -> AtomicU16 { + match std::thread::available_parallelism() { + Ok(threads) => AtomicU16::new((threads.get() as f64).sqrt() as _), + Err(_) => AtomicU16::new(1), + } + } + pub fn optimizing_threads_eq(&self, other: &Self) -> bool { + self.optimizing_threads.load(Ordering::Relaxed) + == other.optimizing_threads.load(Ordering::Relaxed) + } +} + +impl Clone for IndexFlexibleOptions { + fn clone(&self) -> Self { + IndexFlexibleOptions { + optimizing_threads: AtomicU16::new(self.optimizing_threads.load(Ordering::Relaxed)), + } + } +} + +impl PartialEq for IndexFlexibleOptions { + fn eq(&self, other: &Self) -> bool { + self.optimizing_threads_eq(other) + } +} + +impl Eq for IndexFlexibleOptions {} + +impl Default for IndexFlexibleOptions { + fn default() -> Self { + Self { + optimizing_threads: Self::default_optimizing_threads(), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, Validate)] #[serde(deny_unknown_fields)] #[validate(schema(function = "IndexOptions::validate_index_options"))] @@ -198,9 +255,6 @@ pub struct OptimizingOptions { #[serde(default = "OptimizingOptions::default_delete_threshold")] #[validate(range(min = 0.01, max = 1.00))] pub delete_threshold: f64, - #[serde(default = "OptimizingOptions::default_optimizing_threads")] - #[validate(range(min = 1, max = 65535))] - pub optimizing_threads: usize, } impl OptimizingOptions { @@ -213,12 +267,6 @@ impl OptimizingOptions { fn default_delete_threshold() -> f64 { 0.2 } - fn default_optimizing_threads() -> usize { - match std::thread::available_parallelism() { - Ok(threads) => (threads.get() as f64).sqrt() as _, - Err(_) => 1, - } - } } impl Default for OptimizingOptions { @@ -227,7 +275,6 @@ impl Default for OptimizingOptions { sealing_secs: Self::default_sealing_secs(), sealing_size: Self::default_sealing_size(), delete_threshold: Self::default_delete_threshold(), - optimizing_threads: Self::default_optimizing_threads(), } } } diff --git a/crates/base/src/worker.rs b/crates/base/src/worker.rs index 94cb866f8..5ffd371c2 100644 --- a/crates/base/src/worker.rs +++ b/crates/base/src/worker.rs @@ -17,6 +17,7 @@ pub trait WorkerOperations { fn view_vbase(&self, handle: Handle) -> Result; fn view_list(&self, handle: Handle) -> Result; fn stat(&self, handle: Handle) -> Result; + fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError>; } pub trait ViewBasicOperations { diff --git a/crates/common/src/clean.rs b/crates/common/src/clean.rs index 43b699344..bba9547b4 100644 --- a/crates/common/src/clean.rs +++ b/crates/common/src/clean.rs @@ -8,6 +8,9 @@ pub fn clean(path: impl AsRef, wanted: impl Iterator) { .unwrap(); let wanted = HashSet::::from_iter(wanted); for dir in dirs { + if dir.path().is_file() { + log::info!("Unexpected file {:?}, skip.", dir.path()); + } let filename = dir.file_name(); let filename = filename.to_str().unwrap(); let p = path.as_ref().join(filename); diff --git a/crates/index/src/lib.rs b/crates/index/src/lib.rs index f8129c555..5059853b3 100644 --- a/crates/index/src/lib.rs +++ b/crates/index/src/lib.rs @@ -6,6 +6,7 @@ pub mod indexing; pub mod optimizing; pub mod segments; +mod setting; mod utils; use self::delete::Delete; @@ -13,6 +14,7 @@ use self::segments::growing::GrowingSegment; use self::segments::sealed::SealedSegment; use crate::optimizing::indexing::OptimizerIndexing; use crate::optimizing::sealing::OptimizerSealing; +use crate::setting::FlexibleOptionSyncing; use crate::utils::tournament_tree::LoserTree; use arc_swap::ArcSwap; pub use base::distance::*; @@ -34,6 +36,8 @@ use std::collections::HashMap; use std::collections::HashSet; use std::convert::Infallible; use std::path::PathBuf; +use std::str::FromStr; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread::JoinHandle; use std::time::Instant; @@ -51,6 +55,7 @@ pub struct OutdatedError; pub struct Index { path: PathBuf, options: IndexOptions, + flexible: IndexFlexibleOptions, delete: Arc, protect: Mutex>, view: ArcSwap>, @@ -58,11 +63,13 @@ pub struct Index { instant_write: AtomicCell, background_indexing: Mutex, JoinHandle<()>)>>, background_sealing: Mutex, JoinHandle<()>)>>, + background_setting: Mutex, JoinHandle<()>)>>, _tracker: Arc, } impl Index { pub fn create(path: PathBuf, options: IndexOptions) -> Result, CreateError> { + let flexible = IndexFlexibleOptions::default(); if let Err(err) = options.validate() { return Err(CreateError::InvalidIndexOptions { reason: err.to_string(), @@ -87,6 +94,7 @@ impl Index { let index = Arc::new(Index { path: path.clone(), options: options.clone(), + flexible, delete: delete.clone(), protect: Mutex::new(IndexProtect { startup, @@ -105,6 +113,7 @@ impl Index { instant_write: AtomicCell::new(Instant::now()), background_indexing: Mutex::new(None), background_sealing: Mutex::new(None), + background_setting: Mutex::new(None), _tracker: Arc::new(IndexTracker { path }), }); Ok(index) @@ -114,6 +123,10 @@ impl Index { let options = serde_json::from_slice::(&std::fs::read(path.join("options")).unwrap()) .unwrap(); + let flexible = serde_json::from_slice::( + &std::fs::read(path.join("flexible")).unwrap_or_default(), + ) + .unwrap_or_default(); let tracker = Arc::new(IndexTracker { path: path.clone() }); let startup = FileAtomic::::open(path.join("startup")); clean( @@ -161,6 +174,7 @@ impl Index { Arc::new(Index { path: path.clone(), options: options.clone(), + flexible, delete: delete.clone(), protect: Mutex::new(IndexProtect { startup, @@ -179,6 +193,7 @@ impl Index { instant_write: AtomicCell::new(Instant::now()), background_indexing: Mutex::new(None), background_sealing: Mutex::new(None), + background_setting: Mutex::new(None), _tracker: tracker, }) } @@ -188,6 +203,25 @@ impl Index { pub fn view(&self) -> Arc> { self.view.load_full() } + pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> { + match key.as_str() { + "optimizing.threads" => { + let parsed = i32::from_str(value.as_str()) + .map_err(|_e| SettingError::BadValue { key, value })?; + let threads = match parsed { + -1 => IndexFlexibleOptions::default() + .optimizing_threads + .load(Ordering::Relaxed), + threads_limit => threads_limit as u16, + }; + self.flexible + .optimizing_threads + .store(threads, Ordering::Relaxed); + } + _ => return Err(SettingError::BadKey { key }), + }; + Ok(()) + } pub fn refresh(&self) { let mut protect = self.protect.lock(); if let Some((uuid, write)) = protect.write.clone() { @@ -256,6 +290,12 @@ impl Index { *background_sealing = Some(OptimizerSealing::new(self.clone()).spawn()); } } + { + let mut background_setting = self.background_setting.lock(); + if background_setting.is_none() { + *background_setting = Some(FlexibleOptionSyncing::new(self.clone()).spawn()); + } + } } pub fn stop(&self) { { @@ -272,6 +312,13 @@ impl Index { let _ = join_handle.join(); } } + { + let mut background_setting = self.background_setting.lock(); + if let Some((sender, join_handle)) = background_setting.take() { + drop(sender); + let _ = join_handle.join(); + } + } } pub fn wait(&self) -> Arc { Arc::clone(&self._tracker) diff --git a/crates/index/src/optimizing/indexing.rs b/crates/index/src/optimizing/indexing.rs index 5b214f6c4..cfaa8ae18 100644 --- a/crates/index/src/optimizing/indexing.rs +++ b/crates/index/src/optimizing/indexing.rs @@ -12,6 +12,7 @@ use crossbeam::channel::TryRecvError; use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender}; use std::cmp::Reverse; use std::convert::Infallible; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread::JoinHandle; use std::time::Instant; @@ -102,8 +103,9 @@ impl OptimizerIndexing { } fn main(self, shutdown: Receiver) { let index = self.index; + let threads = index.flexible.optimizing_threads.load(Ordering::Relaxed); rayon::ThreadPoolBuilder::new() - .num_threads(index.options.optimizing.optimizing_threads) + .num_threads(threads as usize) .build_scoped(|pool| { std::thread::scope(|scope| { scope.spawn(|| match shutdown.recv() { diff --git a/crates/index/src/setting.rs b/crates/index/src/setting.rs new file mode 100644 index 000000000..fe4c51dd3 --- /dev/null +++ b/crates/index/src/setting.rs @@ -0,0 +1,60 @@ +use super::Index; +use crate::optimizing::indexing::OptimizerIndexing; +use crate::Op; +use base::index::IndexFlexibleOptions; +use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender}; +use std::convert::Infallible; +use std::sync::Arc; +use std::thread::JoinHandle; +use std::time::Duration; + +pub struct FlexibleOptionSyncing { + index: Arc>, +} + +impl FlexibleOptionSyncing { + pub fn new(index: Arc>) -> Self { + Self { index } + } + pub fn spawn(self) -> (Sender, JoinHandle<()>) { + let (tx, rx) = bounded(1); + ( + tx, + std::thread::spawn(move || { + self.main(rx); + }), + ) + } + pub fn main(self, shutdown: Receiver) { + let index = &self.index; + let mut old_options = index.flexible.clone(); + let dur = Duration::from_secs(5); + loop { + let new_options = index.flexible.clone(); + if new_options != old_options { + std::fs::write( + index.path.join("flexible"), + serde_json::to_string::(&new_options).unwrap(), + ) + .unwrap(); + old_options = new_options.clone(); + } + self.update_optimizing_threads(new_options.clone(), old_options.clone()); + match shutdown.recv_timeout(dur) { + Ok(never) => match never {}, + Err(RecvTimeoutError::Disconnected) => return, + Err(RecvTimeoutError::Timeout) => continue, + } + } + } + fn update_optimizing_threads(&self, new: IndexFlexibleOptions, old: IndexFlexibleOptions) { + if !new.optimizing_threads_eq(&old) { + let mut background = self.index.background_indexing.lock(); + if let Some((sender, join_handle)) = background.take() { + drop(sender); + let _ = join_handle.join(); + *background = Some(OptimizerIndexing::new(self.index.clone()).spawn()); + } + } + } +} diff --git a/crates/service/src/instance.rs b/crates/service/src/instance.rs index 3831ef44d..20ca8879b 100644 --- a/crates/service/src/instance.rs +++ b/crates/service/src/instance.rs @@ -187,6 +187,26 @@ impl Instance { Instance::Veci8Dot(x) => x.stat(), } } + pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> { + match self { + Instance::Vecf32Cos(x) => x.setting(key, value), + Instance::Vecf32Dot(x) => x.setting(key, value), + Instance::Vecf32L2(x) => x.setting(key, value), + Instance::Vecf16Cos(x) => x.setting(key, value), + Instance::Vecf16Dot(x) => x.setting(key, value), + Instance::Vecf16L2(x) => x.setting(key, value), + Instance::SVecf32Cos(x) => x.setting(key, value), + Instance::SVecf32Dot(x) => x.setting(key, value), + Instance::SVecf32L2(x) => x.setting(key, value), + Instance::BVecf32Cos(x) => x.setting(key, value), + Instance::BVecf32Dot(x) => x.setting(key, value), + Instance::BVecf32L2(x) => x.setting(key, value), + Instance::BVecf32Jaccard(x) => x.setting(key, value), + Instance::Veci8L2(x) => x.setting(key, value), + Instance::Veci8Cos(x) => x.setting(key, value), + Instance::Veci8Dot(x) => x.setting(key, value), + } + } pub fn start(&self) { match self { Instance::Vecf32Cos(x) => x.start(), diff --git a/crates/service/src/worker.rs b/crates/service/src/worker.rs index 473b89a08..703dfe823 100644 --- a/crates/service/src/worker.rs +++ b/crates/service/src/worker.rs @@ -176,6 +176,11 @@ impl WorkerOperations for Worker { let stat = instance.stat(); Ok(stat) } + fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError> { + let view = self.view(); + let instance = view.get(handle).ok_or(SettingError::NotExist)?; + instance.setting(key, value) + } } pub struct WorkerView { diff --git a/src/bgworker/normal.rs b/src/bgworker/normal.rs index 0caae237e..32656ca8b 100644 --- a/src/bgworker/normal.rs +++ b/src/bgworker/normal.rs @@ -89,6 +89,14 @@ fn session(worker: Arc, handler: ServerRpcHandler) -> Result { handler = x.leave(worker.stat(handle))?; } + ServerRpcHandle::Setting { + handle, + key, + value, + x, + } => { + handler = x.leave(worker.setting(handle, key, value))?; + } ServerRpcHandle::Basic { handle, vector, diff --git a/src/index/views.rs b/src/index/views.rs index 5f85a7a2a..e3780b3ea 100644 --- a/src/index/views.rs +++ b/src/index/views.rs @@ -2,6 +2,17 @@ use crate::error::*; use crate::index::utils::from_oid_to_handle; use crate::ipc::client; use base::index::*; +use pgrx::error; + +#[pgrx::pg_extern(volatile, strict)] +fn _vectors_alter_vector_index(oid: pgrx::pg_sys::Oid, key: String, value: String) { + let id = get_handle(oid); + let mut rpc = check_client(crate::ipc::client()); + match rpc.setting(id, key, value) { + Ok(_) => {} + Err(e) => error!("{}", e.to_string()), + } +} #[pgrx::pg_extern(volatile, strict, parallel_safe)] fn _vectors_index_stat( diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index a746fb574..28239668a 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -329,4 +329,5 @@ defines! { stream vbase(handle: Handle, vector: OwnedVector, opts: SearchOptions) -> Pointer; stream list(handle: Handle) -> Pointer; unary stat(handle: Handle) -> IndexStat; + unary setting(handle: Handle, key: String, value: String) -> (); } diff --git a/src/sql/finalize.sql b/src/sql/finalize.sql index 3dc8e9c63..82f65ac94 100644 --- a/src/sql/finalize.sql +++ b/src/sql/finalize.sql @@ -590,6 +590,9 @@ BEGIN END; $$; +CREATE FUNCTION alter_vector_index("index" OID, "key" TEXT, "value" TEXT) RETURNS void +STRICT LANGUAGE c AS 'MODULE_PATHNAME', '_vectors_alter_vector_index_wrapper'; + -- List of casts CREATE CAST (real[] AS vector) diff --git a/tests/sqllogictest/index_edit.slt b/tests/sqllogictest/index_edit.slt new file mode 100644 index 000000000..ea74b226e --- /dev/null +++ b/tests/sqllogictest/index_edit.slt @@ -0,0 +1,37 @@ +statement ok +SET search_path TO pg_temp, vectors; + +statement ok +CREATE TABLE t (val vector(3)); + +statement ok +INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000); + +statement ok +CREATE INDEX hnsw_1 ON t USING vectors (val vector_l2_ops) +WITH (options = "[indexing.hnsw]"); + +query I +SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]' limit 10) t2; +---- +10 + +statement error does not exist +SELECT alter_vector_index('unknown_index'::regclass::oid, 'optimizing.threads', '1'); + +statement error Setting key +SELECT alter_vector_index('hnsw_1'::regclass::oid, 'unknown_key', '1'); + +statement error wrong value +SELECT alter_vector_index('hnsw_1'::regclass::oid, 'optimizing.threads', 'unknown_value'); + +statement ok +SELECT alter_vector_index('hnsw_1'::regclass::oid, 'optimizing.threads', '1'); + +query I +SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]' limit 10) t2; +---- +10 + +statement ok +DROP TABLE t; \ No newline at end of file