Skip to content

Commit

Permalink
put flexible at IndexProtect
Browse files Browse the repository at this point in the history
Signed-off-by: cutecutecat <[email protected]>
  • Loading branch information
cutecutecat committed Mar 25, 2024
1 parent 39f93fb commit f6a7a64
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 125 deletions.
32 changes: 5 additions & 27 deletions crates/base/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::distance::*;
use crate::vector::*;
use serde::{Deserialize, Serialize};
use std::sync::atomic::AtomicU16;
use std::sync::atomic::Ordering;
use thiserror::Error;
use uuid::Uuid;
use validator::{Validate, ValidationError};
Expand Down Expand Up @@ -91,42 +89,22 @@ pub enum SettingError {
NotExist,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct IndexFlexibleOptions {
#[serde(default = "IndexFlexibleOptions::default_optimizing_threads")]
pub optimizing_threads: AtomicU16,
pub optimizing_threads: u16,
}

impl IndexFlexibleOptions {
fn default_optimizing_threads() -> AtomicU16 {
pub fn default_optimizing_threads() -> u16 {
match std::thread::available_parallelism() {
Ok(threads) => AtomicU16::new((threads.get() as f64).sqrt() as _),
Err(_) => AtomicU16::new(1),
Ok(threads) => (threads.get() as f64).sqrt() as _,
Err(_) => 1,
}
}
pub fn optimizing_threads_eq(&self, other: &Self) -> bool {
self.optimizing_threads.load(Ordering::Relaxed)
== other.optimizing_threads.load(Ordering::Relaxed)
}
}

impl Clone for IndexFlexibleOptions {
fn clone(&self) -> Self {
IndexFlexibleOptions {
optimizing_threads: AtomicU16::new(self.optimizing_threads.load(Ordering::Relaxed)),
}
}
}

impl PartialEq for IndexFlexibleOptions {
fn eq(&self, other: &Self) -> bool {
self.optimizing_threads_eq(other)
}
}

impl Eq for IndexFlexibleOptions {}

impl Default for IndexFlexibleOptions {
fn default() -> Self {
Self {
Expand Down
60 changes: 26 additions & 34 deletions crates/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ pub mod indexing;
pub mod optimizing;
pub mod segments;

mod setting;
mod utils;

use self::delete::Delete;
use self::segments::growing::GrowingSegment;
use self::segments::sealed::SealedSegment;
use crate::optimizing::indexing::OptimizerIndexing;
use crate::optimizing::sealing::OptimizerSealing;
use crate::setting::FlexibleOptionSyncing;
use crate::utils::tournament_tree::LoserTree;
use arc_swap::ArcSwap;
pub use base::distance::*;
Expand All @@ -37,7 +35,6 @@ use std::collections::HashSet;
use std::convert::Infallible;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Instant;
Expand All @@ -55,15 +52,13 @@ pub struct OutdatedError;
pub struct Index<O: Op> {
path: PathBuf,
options: IndexOptions,
flexible: IndexFlexibleOptions,
delete: Arc<Delete>,
protect: Mutex<IndexProtect<O>>,
view: ArcSwap<IndexView<O>>,
instant_index: AtomicCell<Instant>,
instant_write: AtomicCell<Instant>,
background_indexing: Mutex<Option<(Sender<Infallible>, JoinHandle<()>)>>,
background_sealing: Mutex<Option<(Sender<Infallible>, JoinHandle<()>)>>,
background_setting: Mutex<Option<(Sender<Infallible>, JoinHandle<()>)>>,
_tracker: Arc<IndexTracker>,
}

Expand All @@ -87,14 +82,14 @@ impl<O: Op> Index<O> {
IndexStartup {
sealeds: HashSet::new(),
growings: HashSet::new(),
flexible,
},
);
let delete = Delete::create(path.join("delete"));
sync_dir(&path);
let index = Arc::new(Index {
path: path.clone(),
options: options.clone(),
flexible,
delete: delete.clone(),
protect: Mutex::new(IndexProtect {
startup,
Expand All @@ -113,7 +108,6 @@ impl<O: Op> Index<O> {
instant_write: AtomicCell::new(Instant::now()),
background_indexing: Mutex::new(None),
background_sealing: Mutex::new(None),
background_setting: Mutex::new(None),
_tracker: Arc::new(IndexTracker { path }),
});
Ok(index)
Expand All @@ -123,10 +117,6 @@ impl<O: Op> Index<O> {
let options =
serde_json::from_slice::<IndexOptions>(&std::fs::read(path.join("options")).unwrap())
.unwrap();
let flexible = serde_json::from_slice::<IndexFlexibleOptions>(
&std::fs::read(path.join("flexible")).unwrap_or_default(),
)
.unwrap_or_default();
let tracker = Arc::new(IndexTracker { path: path.clone() });
let startup = FileAtomic::<IndexStartup>::open(path.join("startup"));
clean(
Expand Down Expand Up @@ -174,7 +164,6 @@ impl<O: Op> Index<O> {
Arc::new(Index {
path: path.clone(),
options: options.clone(),
flexible,
delete: delete.clone(),
protect: Mutex::new(IndexProtect {
startup,
Expand All @@ -193,7 +182,6 @@ impl<O: Op> Index<O> {
instant_write: AtomicCell::new(Instant::now()),
background_indexing: Mutex::new(None),
background_sealing: Mutex::new(None),
background_setting: Mutex::new(None),
_tracker: tracker,
})
}
Expand All @@ -203,20 +191,23 @@ impl<O: Op> Index<O> {
pub fn view(&self) -> Arc<IndexView<O>> {
self.view.load_full()
}
pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> {
pub fn setting(self: &Arc<Self>, key: String, value: String) -> Result<(), SettingError> {
let mut protect = self.protect.lock();
match key.as_str() {
"optimizing.threads" => {
let parsed = i32::from_str(value.as_str())
.map_err(|_e| SettingError::BadValue { key, value })?;
let threads = match parsed {
-1 => IndexFlexibleOptions::default()
.optimizing_threads
.load(Ordering::Relaxed),
let optimizing_threads = match parsed {
-1 => IndexFlexibleOptions::default_optimizing_threads(),
threads_limit => threads_limit as u16,
};
self.flexible
.optimizing_threads
.store(threads, Ordering::Relaxed);
protect.flexible_set(IndexFlexibleOptions { optimizing_threads });
let mut background = self.background_indexing.lock();
if let Some((sender, join_handle)) = background.take() {
drop(sender);
let _ = join_handle.join();
*background = Some(OptimizerIndexing::new(self.clone()).spawn());
}
}
_ => return Err(SettingError::BadKey { key }),
};
Expand Down Expand Up @@ -290,12 +281,6 @@ impl<O: Op> Index<O> {
*background_sealing = Some(OptimizerSealing::new(self.clone()).spawn());
}
}
{
let mut background_setting = self.background_setting.lock();
if background_setting.is_none() {
*background_setting = Some(FlexibleOptionSyncing::new(self.clone()).spawn());
}
}
}
pub fn stop(&self) {
{
Expand All @@ -312,13 +297,6 @@ impl<O: Op> Index<O> {
let _ = join_handle.join();
}
}
{
let mut background_setting = self.background_setting.lock();
if let Some((sender, join_handle)) = background_setting.take() {
drop(sender);
let _ = join_handle.join();
}
}
}
pub fn wait(&self) -> Arc<IndexTracker> {
Arc::clone(&self._tracker)
Expand Down Expand Up @@ -556,6 +534,7 @@ impl<O: Op> IndexView<O> {
struct IndexStartup {
sealeds: HashSet<Uuid>,
growings: HashSet<Uuid>,
flexible: IndexFlexibleOptions,
}

struct IndexProtect<O: Op> {
Expand Down Expand Up @@ -585,7 +564,20 @@ impl<O: Op> IndexProtect<O> {
self.startup.set(IndexStartup {
sealeds: startup_sealeds,
growings: startup_growings,
flexible: self.flexible_get(),
});
swap.swap(view);
}
fn flexible_set(&mut self, flexible: IndexFlexibleOptions) {
let src = self.startup.get();
self.startup.set(IndexStartup {
sealeds: src.sealeds.clone(),
growings: src.sealeds.clone(),
flexible,
});
}
fn flexible_get(&self) -> IndexFlexibleOptions {
let src = self.startup.get();
src.flexible.clone()
}
}
6 changes: 4 additions & 2 deletions crates/index/src/optimizing/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crossbeam::channel::TryRecvError;
use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender};
use std::cmp::Reverse;
use std::convert::Infallible;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Instant;
Expand Down Expand Up @@ -103,7 +102,10 @@ impl<O: Op> OptimizerIndexing<O> {
}
fn main(self, shutdown: Receiver<Infallible>) {
let index = self.index;
let threads = index.flexible.optimizing_threads.load(Ordering::Relaxed);
let threads = {
let protect = index.protect.lock();
protect.flexible_get().optimizing_threads
};
rayon::ThreadPoolBuilder::new()
.num_threads(threads as usize)
.build_scoped(|pool| {
Expand Down
60 changes: 0 additions & 60 deletions crates/index/src/setting.rs

This file was deleted.

4 changes: 2 additions & 2 deletions src/index/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use pgrx::error;

#[pgrx::pg_extern(volatile, strict)]
fn _vectors_alter_vector_index(oid: pgrx::pg_sys::Oid, key: String, value: String) {
let id = get_handle(oid);
let mut rpc = check_client(crate::ipc::client());
let id = from_oid_to_handle(oid);
let mut rpc = check_client(client());
match rpc.setting(id, key, value) {
Ok(_) => {}
Err(e) => error!("{}", e.to_string()),
Expand Down
3 changes: 3 additions & 0 deletions tests/crash/create.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
SET search_path TO pg_temp, vectors;

statement ok
CREATE TABLE t (val vector(3));

Expand Down
3 changes: 3 additions & 0 deletions tests/crash/restore.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
SET search_path TO pg_temp, vectors;

query I
SELECT EXISTS(SELECT 1 FROM pg_tables WHERE tablename = 't');
----
Expand Down

0 comments on commit f6a7a64

Please sign in to comment.