Skip to content

Commit

Permalink
feat: add optimizing_threads to function
Browse files Browse the repository at this point in the history
Signed-off-by: cutecutecat <[email protected]>
  • Loading branch information
cutecutecat committed Feb 27, 2024
1 parent e0dd790 commit 38932e5
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 17 deletions.
21 changes: 16 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ toml = "0.8.10"

[dev-dependencies]
pgrx-tests = "0.11.3"
httpmock = "0.7"

[patch.crates-io]
pgrx = { git = "https://github.com/tensorchord/pgrx.git", branch = "v0.11.3-patch" }
Expand Down
13 changes: 13 additions & 0 deletions crates/base/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,16 @@ pub enum StatError {
#[error("Maintenance should be done.")]
Upgrade,
}

#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum SettingError {
#[error("Setting key {key} is not exist.")]
BadKey { key: String },
#[error("Setting key {key} has a wrong value {value}.")]
BadValue { key: String, value: String },
#[error("Index not found.")]
NotExist,
#[error("Maintenance should be done.")]
Upgrade,
}
25 changes: 19 additions & 6 deletions crates/base/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;

use crate::distance::*;
use crate::vector::*;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -88,7 +91,7 @@ impl Default for SegmentsOptions {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[derive(Debug, Serialize, Deserialize, Validate)]
#[serde(deny_unknown_fields)]
pub struct OptimizingOptions {
#[serde(default = "OptimizingOptions::default_sealing_secs")]
Expand All @@ -101,8 +104,18 @@ pub struct OptimizingOptions {
#[validate(range(min = 0.01, max = 1.00))]
pub delete_threshold: f64,
#[serde(default = "OptimizingOptions::default_optimizing_threads")]
#[validate(range(min = 1, max = 65535))]
pub optimizing_threads: usize,
pub optimizing_threads: AtomicUsize,
}

impl Clone for OptimizingOptions {
fn clone(&self) -> Self {
OptimizingOptions {
sealing_secs: self.sealing_secs,
sealing_size: self.sealing_size,
delete_threshold: self.delete_threshold,
optimizing_threads: AtomicUsize::new(self.optimizing_threads.load(Ordering::Relaxed)),
}
}
}

impl OptimizingOptions {
Expand All @@ -115,10 +128,10 @@ impl OptimizingOptions {
fn default_delete_threshold() -> f64 {
0.2
}
fn default_optimizing_threads() -> usize {
fn default_optimizing_threads() -> AtomicUsize {
match std::thread::available_parallelism() {
Ok(threads) => (threads.get() as f64).sqrt() as _,
Err(_) => 1,
Ok(threads) => AtomicUsize::new((threads.get() as f64).sqrt() as _),
Err(_) => AtomicUsize::new(1),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/base/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub trait WorkerOperations {
fn view_vbase(&self, handle: Handle) -> Result<impl ViewVbaseOperations, VbaseError>;
fn view_list(&self, handle: Handle) -> Result<impl ViewListOperations, ListError>;
fn stat(&self, handle: Handle) -> Result<IndexStat, StatError>;
fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError>;
}

pub trait ViewBasicOperations {
Expand Down
21 changes: 21 additions & 0 deletions crates/service/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::cmp::Reverse;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
Expand Down Expand Up @@ -170,6 +172,25 @@ impl<S: G> Index<S> {
pub fn view(&self) -> Arc<IndexView<S>> {
self.view.load_full()
}
pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> {
match key.as_str() {
"optimizing.threads" => {
let parsed = usize::from_str(value.as_str())
.map_err(|_e| SettingError::BadValue { key, value })?;
self.options
.optimizing
.optimizing_threads
.store(parsed, Ordering::Relaxed);
}
_ => return Err(SettingError::BadKey { key }),
};
std::fs::write(
self.path.join("options"),
serde_json::to_string::<IndexOptions>(&self.options).unwrap(),
)
.unwrap();
Ok(())
}
pub fn refresh(&self) {
let mut protect = self.protect.lock();
if let Some((uuid, write)) = protect.write.clone() {
Expand Down
20 changes: 16 additions & 4 deletions crates/service/src/index/optimizing/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::index::Index;
use crate::index::SealedSegment;
use crate::prelude::*;
use std::cmp::Reverse;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
Expand All @@ -23,17 +24,28 @@ impl<S: G> OptimizerIndexing<S> {
}
pub fn main(self) {
let index = self.index;
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(index.options.optimizing.optimizing_threads)
.build()
.unwrap();
let weak_index = Arc::downgrade(&index);
drop(index);
loop {
{
let Some(index) = weak_index.upgrade() else {
return;
};
let threads = match index
.options
.optimizing
.optimizing_threads
.load(Ordering::Relaxed)
{
0 => OptimizingOptions::default()
.optimizing_threads
.load(Ordering::Relaxed),
threads_limit => threads_limit,
};
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(threads)
.build()
.unwrap();
if let Ok(()) = pool.install(|| optimizing_indexing(index.clone())) {
continue;
}
Expand Down
14 changes: 14 additions & 0 deletions crates/service/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,20 @@ impl Instance {
Instance::Upgrade => None,
}
}
pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> {
match self {
Instance::Vecf32Cos(x) => x.setting(key, value),
Instance::Vecf32Dot(x) => x.setting(key, value),
Instance::Vecf32L2(x) => x.setting(key, value),
Instance::Vecf16Cos(x) => x.setting(key, value),
Instance::Vecf16Dot(x) => x.setting(key, value),
Instance::Vecf16L2(x) => x.setting(key, value),
Instance::SVecf32L2(x) => x.setting(key, value),
Instance::SVecf32Cos(x) => x.setting(key, value),
Instance::SVecf32Dot(x) => x.setting(key, value),
Instance::Upgrade => Err(SettingError::Upgrade),
}
}
}

pub enum InstanceView {
Expand Down
5 changes: 5 additions & 0 deletions crates/service/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ impl WorkerOperations for Worker {
let stat = instance.stat().ok_or(StatError::Upgrade)?;
Ok(stat)
}
fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError> {
let view = self.view();
let instance = view.get(handle).ok_or(SettingError::NotExist)?;
instance.setting(key, value)
}
}

pub struct WorkerView {
Expand Down
8 changes: 8 additions & 0 deletions src/bgworker/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ fn session(worker: Arc<Worker>, handler: ServerRpcHandler) -> Result<Infallible,
ServerRpcHandle::Delete { handle, pointer, x } => {
handler = x.leave(worker.delete(handle, pointer))?;
}
ServerRpcHandle::Setting {
handle,
key,
value,
x,
} => {
handler = x.leave(worker.setting(handle, key, value))?;
}
ServerRpcHandle::Stat { handle, x } => {
handler = x.leave(worker.stat(handle))?;
}
Expand Down
1 change: 0 additions & 1 deletion src/embedding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,5 @@ fn _vectors_text2vec_openai(input: String, model: String) -> Vecf32Output {
Ok(emb) => emb.into_iter().map(F32).collect::<Vec<_>>(),
Err(e) => error!("{}", e.to_string()),
};

Vecf32Output::new(Vecf32Borrowed::new(&embedding))
}
15 changes: 15 additions & 0 deletions src/index/views.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
use crate::prelude::*;
use pgrx::error;
use pgrx::pg_sys::{AsPgCStr, RelnameGetRelid};

#[pgrx::pg_extern(volatile, strict)]
fn _vectors_index_setting(index: String, key: String, value: String) {
unsafe {
let oid = RelnameGetRelid(index.as_pg_cstr());
let id = Handle::from_sys(oid);
let mut rpc = check_client(crate::ipc::client());
match rpc.setting(id, key, value) {
Ok(_) => {}
Err(e) => error!("{}", e.to_string()),
}
}
}

#[pgrx::pg_extern(volatile, strict)]
fn _vectors_index_stat(
Expand Down
1 change: 1 addition & 0 deletions src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,5 @@ defines! {
stream vbase(handle: Handle, vector: OwnedVector, opts: SearchOptions) -> Pointer;
stream list(handle: Handle) -> Pointer;
unary stat(handle: Handle) -> IndexStat;
unary setting(handle: Handle, key: String, value: String) -> ();
}
3 changes: 3 additions & 0 deletions src/sql/finalize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ BEGIN
END;
$$;

CREATE FUNCTION alter_vector_index(input TEXT, "key" TEXT, "value" TEXT) RETURNS void
STRICT LANGUAGE c AS 'MODULE_PATHNAME', '_vectors_index_setting_wrapper';

-- List of casts

CREATE CAST (real[] AS vector)
Expand Down
32 changes: 32 additions & 0 deletions tests/sqllogictest/index_edit.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
statement ok
SET search_path TO pg_temp, vectors;

statement ok
CREATE TABLE t (val vector(3));

statement ok
INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000);

statement ok
CREATE INDEX hnsw_1 ON t USING vectors (val vector_l2_ops)
WITH (options = "[indexing.hnsw]");

query I
SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]' limit 10) t2;
----
10

statement error Index not found.
SELECT alter_vector_index('unknown_index', 'optimizing.threads', '1');

statement error Setting key
SELECT alter_vector_index('hnsw_1', 'unknown_key', '1');

statement error wrong value
SELECT alter_vector_index('hnsw_1', 'optimizing.threads', 'unknown_value');

statement ok
SELECT alter_vector_index('hnsw_1', 'optimizing.threads', '1');

statement ok
DROP TABLE t;

0 comments on commit 38932e5

Please sign in to comment.