Skip to content

Commit 4393629

Browse files
committed
fix by comments
Signed-off-by: cutecutecat <[email protected]>
1 parent 64170ff commit 4393629

File tree

9 files changed

+65
-80
lines changed

9 files changed

+65
-80
lines changed

crates/base/src/index.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ pub enum StatError {
8080

8181
#[must_use]
8282
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
83-
pub enum SettingError {
83+
pub enum AlterError {
8484
#[error("Setting key {key} is not exist.")]
8585
BadKey { key: String },
8686
#[error("Setting key {key} has a wrong value {value}.")]
@@ -89,19 +89,17 @@ pub enum SettingError {
8989
NotExist,
9090
}
9191

92-
#[derive(Debug, Clone, Serialize, Deserialize)]
92+
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
9393
#[serde(deny_unknown_fields)]
9494
pub struct IndexFlexibleOptions {
9595
#[serde(default = "IndexFlexibleOptions::default_optimizing_threads")]
96+
#[validate(range(min = 1, max = 65535))]
9697
pub optimizing_threads: u16,
9798
}
9899

99100
impl IndexFlexibleOptions {
100101
pub fn default_optimizing_threads() -> u16 {
101-
match std::thread::available_parallelism() {
102-
Ok(threads) => (threads.get() as f64).sqrt() as _,
103-
Err(_) => 1,
104-
}
102+
1
105103
}
106104
}
107105

crates/base/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub trait WorkerOperations {
1717
fn view_vbase(&self, handle: Handle) -> Result<impl ViewVbaseOperations, VbaseError>;
1818
fn view_list(&self, handle: Handle) -> Result<impl ViewListOperations, ListError>;
1919
fn stat(&self, handle: Handle) -> Result<IndexStat, StatError>;
20-
fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError>;
20+
fn alter(&self, handle: Handle, key: String, value: String) -> Result<(), AlterError>;
2121
}
2222

2323
pub trait ViewBasicOperations {

crates/index/src/lib.rs

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ pub struct Index<O: Op> {
6464

6565
impl<O: Op> Index<O> {
6666
pub fn create(path: PathBuf, options: IndexOptions) -> Result<Arc<Self>, CreateError> {
67-
let flexible = IndexFlexibleOptions::default();
6867
if let Err(err) = options.validate() {
6968
return Err(CreateError::InvalidIndexOptions {
7069
reason: err.to_string(),
@@ -82,7 +81,7 @@ impl<O: Op> Index<O> {
8281
IndexStartup {
8382
sealeds: HashSet::new(),
8483
growings: HashSet::new(),
85-
flexible,
84+
flexible: IndexFlexibleOptions::default(),
8685
},
8786
);
8887
let delete = Delete::create(path.join("delete"));
@@ -99,6 +98,7 @@ impl<O: Op> Index<O> {
9998
}),
10099
view: ArcSwap::new(Arc::new(IndexView {
101100
options: options.clone(),
101+
flexible: IndexFlexibleOptions::default(),
102102
sealed: HashMap::new(),
103103
growing: HashMap::new(),
104104
delete: delete.clone(),
@@ -119,6 +119,7 @@ impl<O: Op> Index<O> {
119119
.unwrap();
120120
let tracker = Arc::new(IndexTracker { path: path.clone() });
121121
let startup = FileAtomic::<IndexStartup>::open(path.join("startup"));
122+
let flexible = startup.get().flexible.clone();
122123
clean(
123124
path.join("segments"),
124125
startup
@@ -173,6 +174,7 @@ impl<O: Op> Index<O> {
173174
}),
174175
view: ArcSwap::new(Arc::new(IndexView {
175176
options: options.clone(),
177+
flexible,
176178
delete: delete.clone(),
177179
sealed,
178180
growing,
@@ -191,25 +193,20 @@ impl<O: Op> Index<O> {
191193
pub fn view(&self) -> Arc<IndexView<O>> {
192194
self.view.load_full()
193195
}
194-
pub fn setting(self: &Arc<Self>, key: String, value: String) -> Result<(), SettingError> {
196+
pub fn alter(self: &Arc<Self>, key: String, value: String) -> Result<(), AlterError> {
195197
let mut protect = self.protect.lock();
196198
match key.as_str() {
197199
"optimizing.threads" => {
198200
let parsed = i32::from_str(value.as_str())
199-
.map_err(|_e| SettingError::BadValue { key, value })?;
201+
.map_err(|_e| AlterError::BadValue { key, value })?;
200202
let optimizing_threads = match parsed {
201-
-1 => IndexFlexibleOptions::default_optimizing_threads(),
203+
0 => IndexFlexibleOptions::default_optimizing_threads(),
202204
threads_limit => threads_limit as u16,
203205
};
204206
protect.flexible_set(IndexFlexibleOptions { optimizing_threads });
205-
let mut background = self.background_indexing.lock();
206-
if let Some((sender, join_handle)) = background.take() {
207-
drop(sender);
208-
let _ = join_handle.join();
209-
*background = Some(OptimizerIndexing::new(self.clone()).spawn());
210-
}
207+
protect.maintain(self.options.clone(), self.delete.clone(), &self.view);
211208
}
212-
_ => return Err(SettingError::BadKey { key }),
209+
_ => return Err(AlterError::BadKey { key }),
213210
};
214211
Ok(())
215212
}
@@ -320,6 +317,7 @@ impl Drop for IndexTracker {
320317

321318
pub struct IndexView<O: Op> {
322319
pub options: IndexOptions,
320+
pub flexible: IndexFlexibleOptions,
323321
pub delete: Arc<Delete>,
324322
pub sealed: HashMap<Uuid, Arc<SealedSegment<O>>>,
325323
pub growing: HashMap<Uuid, Arc<GrowingSegment<O>>>,
@@ -545,14 +543,17 @@ struct IndexProtect<O: Op> {
545543
}
546544

547545
impl<O: Op> IndexProtect<O> {
546+
/// Export IndexProtect to IndexView
548547
fn maintain(
549548
&mut self,
550549
options: IndexOptions,
551550
delete: Arc<Delete>,
552551
swap: &ArcSwap<IndexView<O>>,
553552
) {
553+
let old_startup = self.startup.get();
554554
let view = Arc::new(IndexView {
555555
options,
556+
flexible: old_startup.flexible.clone(),
556557
delete,
557558
sealed: self.sealed.clone(),
558559
growing: self.growing.clone(),
@@ -564,7 +565,7 @@ impl<O: Op> IndexProtect<O> {
564565
self.startup.set(IndexStartup {
565566
sealeds: startup_sealeds,
566567
growings: startup_growings,
567-
flexible: self.flexible_get(),
568+
flexible: old_startup.flexible.clone(),
568569
});
569570
swap.swap(view);
570571
}
@@ -576,8 +577,4 @@ impl<O: Op> IndexProtect<O> {
576577
flexible,
577578
});
578579
}
579-
fn flexible_get(&self) -> IndexFlexibleOptions {
580-
let src = self.startup.get();
581-
src.flexible.clone()
582-
}
583580
}

crates/index/src/optimizing/indexing.rs

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use base::operator::Borrowed;
88
pub use base::search::*;
99
pub use base::vector::*;
1010
use crossbeam::channel::RecvError;
11-
use crossbeam::channel::TryRecvError;
1211
use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender};
1312
use std::cmp::Reverse;
1413
use std::convert::Infallible;
@@ -102,38 +101,29 @@ impl<O: Op> OptimizerIndexing<O> {
102101
}
103102
fn main(self, shutdown: Receiver<Infallible>) {
104103
let index = self.index;
105-
let threads = {
106-
let protect = index.protect.lock();
107-
protect.flexible_get().optimizing_threads
108-
};
109-
rayon::ThreadPoolBuilder::new()
110-
.num_threads(threads as usize)
111-
.build_scoped(|pool| {
112-
std::thread::scope(|scope| {
113-
scope.spawn(|| match shutdown.recv() {
114-
Ok(never) => match never {},
115-
Err(RecvError) => {
116-
pool.stop();
117-
}
118-
});
119-
loop {
120-
if let Ok(()) = pool.install(|| optimizing_indexing(index.clone())) {
121-
match shutdown.try_recv() {
122-
Ok(never) => match never {},
123-
Err(TryRecvError::Disconnected) => return,
124-
Err(TryRecvError::Empty) => (),
125-
}
126-
continue;
127-
}
128-
match shutdown.recv_timeout(std::time::Duration::from_secs(60)) {
104+
loop {
105+
let view = index.view();
106+
let threads = view.flexible.optimizing_threads;
107+
rayon::ThreadPoolBuilder::new()
108+
.num_threads(threads as usize)
109+
.build_scoped(|pool| {
110+
std::thread::scope(|scope| {
111+
scope.spawn(|| match shutdown.recv() {
129112
Ok(never) => match never {},
130-
Err(RecvTimeoutError::Disconnected) => return,
131-
Err(RecvTimeoutError::Timeout) => (),
132-
}
133-
}
134-
});
135-
})
136-
.unwrap();
113+
Err(RecvError) => {
114+
pool.stop();
115+
}
116+
});
117+
let _ = pool.install(|| optimizing_indexing(index.clone()));
118+
})
119+
})
120+
.unwrap();
121+
match shutdown.recv_timeout(std::time::Duration::from_secs(60)) {
122+
Ok(never) => match never {},
123+
Err(RecvTimeoutError::Disconnected) => return,
124+
Err(RecvTimeoutError::Timeout) => (),
125+
}
126+
}
137127
}
138128
}
139129

crates/service/src/instance.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -187,24 +187,24 @@ impl Instance {
187187
Instance::Veci8Dot(x) => x.stat(),
188188
}
189189
}
190-
pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> {
190+
pub fn alter(&self, key: String, value: String) -> Result<(), AlterError> {
191191
match self {
192-
Instance::Vecf32Cos(x) => x.setting(key, value),
193-
Instance::Vecf32Dot(x) => x.setting(key, value),
194-
Instance::Vecf32L2(x) => x.setting(key, value),
195-
Instance::Vecf16Cos(x) => x.setting(key, value),
196-
Instance::Vecf16Dot(x) => x.setting(key, value),
197-
Instance::Vecf16L2(x) => x.setting(key, value),
198-
Instance::SVecf32Cos(x) => x.setting(key, value),
199-
Instance::SVecf32Dot(x) => x.setting(key, value),
200-
Instance::SVecf32L2(x) => x.setting(key, value),
201-
Instance::BVecf32Cos(x) => x.setting(key, value),
202-
Instance::BVecf32Dot(x) => x.setting(key, value),
203-
Instance::BVecf32L2(x) => x.setting(key, value),
204-
Instance::BVecf32Jaccard(x) => x.setting(key, value),
205-
Instance::Veci8L2(x) => x.setting(key, value),
206-
Instance::Veci8Cos(x) => x.setting(key, value),
207-
Instance::Veci8Dot(x) => x.setting(key, value),
192+
Instance::Vecf32Cos(x) => x.alter(key, value),
193+
Instance::Vecf32Dot(x) => x.alter(key, value),
194+
Instance::Vecf32L2(x) => x.alter(key, value),
195+
Instance::Vecf16Cos(x) => x.alter(key, value),
196+
Instance::Vecf16Dot(x) => x.alter(key, value),
197+
Instance::Vecf16L2(x) => x.alter(key, value),
198+
Instance::SVecf32Cos(x) => x.alter(key, value),
199+
Instance::SVecf32Dot(x) => x.alter(key, value),
200+
Instance::SVecf32L2(x) => x.alter(key, value),
201+
Instance::BVecf32Cos(x) => x.alter(key, value),
202+
Instance::BVecf32Dot(x) => x.alter(key, value),
203+
Instance::BVecf32L2(x) => x.alter(key, value),
204+
Instance::BVecf32Jaccard(x) => x.alter(key, value),
205+
Instance::Veci8L2(x) => x.alter(key, value),
206+
Instance::Veci8Cos(x) => x.alter(key, value),
207+
Instance::Veci8Dot(x) => x.alter(key, value),
208208
}
209209
}
210210
pub fn start(&self) {

crates/service/src/worker.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,10 @@ impl WorkerOperations for Worker {
176176
let stat = instance.stat();
177177
Ok(stat)
178178
}
179-
fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError> {
179+
fn alter(&self, handle: Handle, key: String, value: String) -> Result<(), AlterError> {
180180
let view = self.view();
181-
let instance = view.get(handle).ok_or(SettingError::NotExist)?;
182-
instance.setting(key, value)
181+
let instance = view.get(handle).ok_or(AlterError::NotExist)?;
182+
instance.alter(key, value)
183183
}
184184
}
185185

src/bgworker/normal.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,13 @@ fn session(worker: Arc<Worker>, handler: ServerRpcHandler) -> Result<Infallible,
8989
ServerRpcHandle::Stat { handle, x } => {
9090
handler = x.leave(worker.stat(handle))?;
9191
}
92-
ServerRpcHandle::Setting {
92+
ServerRpcHandle::Alter {
9393
handle,
9494
key,
9595
value,
9696
x,
9797
} => {
98-
handler = x.leave(worker.setting(handle, key, value))?;
98+
handler = x.leave(worker.alter(handle, key, value))?;
9999
}
100100
ServerRpcHandle::Basic {
101101
handle,

src/index/views.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use pgrx::error;
88
fn _vectors_alter_vector_index(oid: pgrx::pg_sys::Oid, key: String, value: String) {
99
let id = from_oid_to_handle(oid);
1010
let mut rpc = check_client(client());
11-
match rpc.setting(id, key, value) {
11+
match rpc.alter(id, key, value) {
1212
Ok(_) => {}
1313
Err(e) => error!("{}", e.to_string()),
1414
}

src/ipc/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,5 +329,5 @@ defines! {
329329
stream vbase(handle: Handle, vector: OwnedVector, opts: SearchOptions) -> Pointer;
330330
stream list(handle: Handle) -> Pointer;
331331
unary stat(handle: Handle) -> IndexStat;
332-
unary setting(handle: Handle, key: String, value: String) -> ();
332+
unary alter(handle: Handle, key: String, value: String) -> ();
333333
}

0 commit comments

Comments
 (0)