From f6a7a64c043418ccc8b8bf02c7c69d637288c38d Mon Sep 17 00:00:00 2001 From: cutecutecat Date: Mon, 25 Mar 2024 08:59:18 +0800 Subject: [PATCH] put flexible at IndexProtect Signed-off-by: cutecutecat --- crates/base/src/index.rs | 32 +++---------- crates/index/src/lib.rs | 60 +++++++++++-------------- crates/index/src/optimizing/indexing.rs | 6 ++- crates/index/src/setting.rs | 60 ------------------------- src/index/views.rs | 4 +- tests/crash/create.slt | 3 ++ tests/crash/restore.slt | 3 ++ 7 files changed, 43 insertions(+), 125 deletions(-) delete mode 100644 crates/index/src/setting.rs diff --git a/crates/base/src/index.rs b/crates/base/src/index.rs index d8c3a7bf2..4211bef89 100644 --- a/crates/base/src/index.rs +++ b/crates/base/src/index.rs @@ -1,8 +1,6 @@ use crate::distance::*; use crate::vector::*; use serde::{Deserialize, Serialize}; -use std::sync::atomic::AtomicU16; -use std::sync::atomic::Ordering; use thiserror::Error; use uuid::Uuid; use validator::{Validate, ValidationError}; @@ -91,42 +89,22 @@ pub enum SettingError { NotExist, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct IndexFlexibleOptions { #[serde(default = "IndexFlexibleOptions::default_optimizing_threads")] - pub optimizing_threads: AtomicU16, + pub optimizing_threads: u16, } impl IndexFlexibleOptions { - fn default_optimizing_threads() -> AtomicU16 { + pub fn default_optimizing_threads() -> u16 { match std::thread::available_parallelism() { - Ok(threads) => AtomicU16::new((threads.get() as f64).sqrt() as _), - Err(_) => AtomicU16::new(1), + Ok(threads) => (threads.get() as f64).sqrt() as _, + Err(_) => 1, } } - pub fn optimizing_threads_eq(&self, other: &Self) -> bool { - self.optimizing_threads.load(Ordering::Relaxed) - == other.optimizing_threads.load(Ordering::Relaxed) - } } -impl Clone for IndexFlexibleOptions { - fn clone(&self) -> Self { - IndexFlexibleOptions { - optimizing_threads: AtomicU16::new(self.optimizing_threads.load(Ordering::Relaxed)), - } - } -} - -impl PartialEq for IndexFlexibleOptions { - fn eq(&self, other: &Self) -> bool { - self.optimizing_threads_eq(other) - } -} - -impl Eq for IndexFlexibleOptions {} - impl Default for IndexFlexibleOptions { fn default() -> Self { Self { diff --git a/crates/index/src/lib.rs b/crates/index/src/lib.rs index 5059853b3..1fb1a5db6 100644 --- a/crates/index/src/lib.rs +++ b/crates/index/src/lib.rs @@ -6,7 +6,6 @@ pub mod indexing; pub mod optimizing; pub mod segments; -mod setting; mod utils; use self::delete::Delete; @@ -14,7 +13,6 @@ use self::segments::growing::GrowingSegment; use self::segments::sealed::SealedSegment; use crate::optimizing::indexing::OptimizerIndexing; use crate::optimizing::sealing::OptimizerSealing; -use crate::setting::FlexibleOptionSyncing; use crate::utils::tournament_tree::LoserTree; use arc_swap::ArcSwap; pub use base::distance::*; @@ -37,7 +35,6 @@ use std::collections::HashSet; use std::convert::Infallible; use std::path::PathBuf; use std::str::FromStr; -use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread::JoinHandle; use std::time::Instant; @@ -55,7 +52,6 @@ pub struct OutdatedError; pub struct Index { path: PathBuf, options: IndexOptions, - flexible: IndexFlexibleOptions, delete: Arc, protect: Mutex>, view: ArcSwap>, @@ -63,7 +59,6 @@ pub struct Index { instant_write: AtomicCell, background_indexing: Mutex, JoinHandle<()>)>>, background_sealing: Mutex, JoinHandle<()>)>>, - background_setting: Mutex, JoinHandle<()>)>>, _tracker: Arc, } @@ -87,6 +82,7 @@ impl Index { IndexStartup { sealeds: HashSet::new(), growings: HashSet::new(), + flexible, }, ); let delete = Delete::create(path.join("delete")); @@ -94,7 +90,6 @@ impl Index { let index = Arc::new(Index { path: path.clone(), options: options.clone(), - flexible, delete: delete.clone(), protect: Mutex::new(IndexProtect { startup, @@ -113,7 +108,6 @@ impl Index { instant_write: AtomicCell::new(Instant::now()), background_indexing: Mutex::new(None), background_sealing: Mutex::new(None), - background_setting: Mutex::new(None), _tracker: Arc::new(IndexTracker { path }), }); Ok(index) @@ -123,10 +117,6 @@ impl Index { let options = serde_json::from_slice::(&std::fs::read(path.join("options")).unwrap()) .unwrap(); - let flexible = serde_json::from_slice::( - &std::fs::read(path.join("flexible")).unwrap_or_default(), - ) - .unwrap_or_default(); let tracker = Arc::new(IndexTracker { path: path.clone() }); let startup = FileAtomic::::open(path.join("startup")); clean( @@ -174,7 +164,6 @@ impl Index { Arc::new(Index { path: path.clone(), options: options.clone(), - flexible, delete: delete.clone(), protect: Mutex::new(IndexProtect { startup, @@ -193,7 +182,6 @@ impl Index { instant_write: AtomicCell::new(Instant::now()), background_indexing: Mutex::new(None), background_sealing: Mutex::new(None), - background_setting: Mutex::new(None), _tracker: tracker, }) } @@ -203,20 +191,23 @@ impl Index { pub fn view(&self) -> Arc> { self.view.load_full() } - pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> { + pub fn setting(self: &Arc, key: String, value: String) -> Result<(), SettingError> { + let mut protect = self.protect.lock(); match key.as_str() { "optimizing.threads" => { let parsed = i32::from_str(value.as_str()) .map_err(|_e| SettingError::BadValue { key, value })?; - let threads = match parsed { - -1 => IndexFlexibleOptions::default() - .optimizing_threads - .load(Ordering::Relaxed), + let optimizing_threads = match parsed { + -1 => IndexFlexibleOptions::default_optimizing_threads(), threads_limit => threads_limit as u16, }; - self.flexible - .optimizing_threads - .store(threads, Ordering::Relaxed); + protect.flexible_set(IndexFlexibleOptions { optimizing_threads }); + let mut background = self.background_indexing.lock(); + if let Some((sender, join_handle)) = background.take() { + drop(sender); + let _ = join_handle.join(); + *background = Some(OptimizerIndexing::new(self.clone()).spawn()); + } } _ => return Err(SettingError::BadKey { key }), }; @@ -290,12 +281,6 @@ impl Index { *background_sealing = Some(OptimizerSealing::new(self.clone()).spawn()); } } - { - let mut background_setting = self.background_setting.lock(); - if background_setting.is_none() { - *background_setting = Some(FlexibleOptionSyncing::new(self.clone()).spawn()); - } - } } pub fn stop(&self) { { @@ -312,13 +297,6 @@ impl Index { let _ = join_handle.join(); } } - { - let mut background_setting = self.background_setting.lock(); - if let Some((sender, join_handle)) = background_setting.take() { - drop(sender); - let _ = join_handle.join(); - } - } } pub fn wait(&self) -> Arc { Arc::clone(&self._tracker) @@ -556,6 +534,7 @@ impl IndexView { struct IndexStartup { sealeds: HashSet, growings: HashSet, + flexible: IndexFlexibleOptions, } struct IndexProtect { @@ -585,7 +564,20 @@ impl IndexProtect { self.startup.set(IndexStartup { sealeds: startup_sealeds, growings: startup_growings, + flexible: self.flexible_get(), }); swap.swap(view); } + fn flexible_set(&mut self, flexible: IndexFlexibleOptions) { + let src = self.startup.get(); + self.startup.set(IndexStartup { + sealeds: src.sealeds.clone(), + growings: src.sealeds.clone(), + flexible, + }); + } + fn flexible_get(&self) -> IndexFlexibleOptions { + let src = self.startup.get(); + src.flexible.clone() + } } diff --git a/crates/index/src/optimizing/indexing.rs b/crates/index/src/optimizing/indexing.rs index cfaa8ae18..53d07a548 100644 --- a/crates/index/src/optimizing/indexing.rs +++ b/crates/index/src/optimizing/indexing.rs @@ -12,7 +12,6 @@ use crossbeam::channel::TryRecvError; use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender}; use std::cmp::Reverse; use std::convert::Infallible; -use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread::JoinHandle; use std::time::Instant; @@ -103,7 +102,10 @@ impl OptimizerIndexing { } fn main(self, shutdown: Receiver) { let index = self.index; - let threads = index.flexible.optimizing_threads.load(Ordering::Relaxed); + let threads = { + let protect = index.protect.lock(); + protect.flexible_get().optimizing_threads + }; rayon::ThreadPoolBuilder::new() .num_threads(threads as usize) .build_scoped(|pool| { diff --git a/crates/index/src/setting.rs b/crates/index/src/setting.rs deleted file mode 100644 index fe4c51dd3..000000000 --- a/crates/index/src/setting.rs +++ /dev/null @@ -1,60 +0,0 @@ -use super::Index; -use crate::optimizing::indexing::OptimizerIndexing; -use crate::Op; -use base::index::IndexFlexibleOptions; -use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender}; -use std::convert::Infallible; -use std::sync::Arc; -use std::thread::JoinHandle; -use std::time::Duration; - -pub struct FlexibleOptionSyncing { - index: Arc>, -} - -impl FlexibleOptionSyncing { - pub fn new(index: Arc>) -> Self { - Self { index } - } - pub fn spawn(self) -> (Sender, JoinHandle<()>) { - let (tx, rx) = bounded(1); - ( - tx, - std::thread::spawn(move || { - self.main(rx); - }), - ) - } - pub fn main(self, shutdown: Receiver) { - let index = &self.index; - let mut old_options = index.flexible.clone(); - let dur = Duration::from_secs(5); - loop { - let new_options = index.flexible.clone(); - if new_options != old_options { - std::fs::write( - index.path.join("flexible"), - serde_json::to_string::(&new_options).unwrap(), - ) - .unwrap(); - old_options = new_options.clone(); - } - self.update_optimizing_threads(new_options.clone(), old_options.clone()); - match shutdown.recv_timeout(dur) { - Ok(never) => match never {}, - Err(RecvTimeoutError::Disconnected) => return, - Err(RecvTimeoutError::Timeout) => continue, - } - } - } - fn update_optimizing_threads(&self, new: IndexFlexibleOptions, old: IndexFlexibleOptions) { - if !new.optimizing_threads_eq(&old) { - let mut background = self.index.background_indexing.lock(); - if let Some((sender, join_handle)) = background.take() { - drop(sender); - let _ = join_handle.join(); - *background = Some(OptimizerIndexing::new(self.index.clone()).spawn()); - } - } - } -} diff --git a/src/index/views.rs b/src/index/views.rs index e3780b3ea..d1a016e42 100644 --- a/src/index/views.rs +++ b/src/index/views.rs @@ -6,8 +6,8 @@ use pgrx::error; #[pgrx::pg_extern(volatile, strict)] fn _vectors_alter_vector_index(oid: pgrx::pg_sys::Oid, key: String, value: String) { - let id = get_handle(oid); - let mut rpc = check_client(crate::ipc::client()); + let id = from_oid_to_handle(oid); + let mut rpc = check_client(client()); match rpc.setting(id, key, value) { Ok(_) => {} Err(e) => error!("{}", e.to_string()), diff --git a/tests/crash/create.slt b/tests/crash/create.slt index f396567af..240afd25b 100644 --- a/tests/crash/create.slt +++ b/tests/crash/create.slt @@ -1,3 +1,6 @@ +statement ok +SET search_path TO pg_temp, vectors; + statement ok CREATE TABLE t (val vector(3)); diff --git a/tests/crash/restore.slt b/tests/crash/restore.slt index 9bb1f0b9d..1f66757a2 100644 --- a/tests/crash/restore.slt +++ b/tests/crash/restore.slt @@ -1,3 +1,6 @@ +statement ok +SET search_path TO pg_temp, vectors; + query I SELECT EXISTS(SELECT 1 FROM pg_tables WHERE tablename = 't'); ----