diff --git a/Cargo.toml b/Cargo.toml index 0ff221598..68c2912c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ service = { path = "crates/service" } pgrx = { git = "https://github.com/tensorchord/pgrx.git", branch = "v0.12.0-alpha.1-patch" } [lints] -rust.unsafe_op_in_unsafe_fn = "deny" +rust.unsafe_op_in_unsafe_fn = "forbid" rust.unused_lifetimes = "warn" rust.unused_qualifications = "warn" diff --git a/src/datatype/memory_vecf16.rs b/src/datatype/memory_vecf16.rs index 9f9497168..17e515181 100644 --- a/src/datatype/memory_vecf16.rs +++ b/src/datatype/memory_vecf16.rs @@ -36,10 +36,6 @@ impl Vecf16Header { self.dims as usize } pub fn slice(&self) -> &[F16] { - debug_assert_eq!(self.varlena & 3, 0); - // TODO: force checking it in the future - // debug_assert_eq!(self.kind, 1); - // debug_assert_eq!(self.reserved, 0); unsafe { std::slice::from_raw_parts(self.phantom.as_ptr(), self.dims as usize) } } pub fn for_borrow(&self) -> Vecf16Borrowed<'_> { diff --git a/src/datatype/memory_veci8.rs b/src/datatype/memory_veci8.rs index 386ba3088..2afc4bf6e 100644 --- a/src/datatype/memory_veci8.rs +++ b/src/datatype/memory_veci8.rs @@ -87,8 +87,6 @@ impl Veci8Header { } pub fn data(&self) -> &[I8] { - debug_assert_eq!(self.varlena & 3, 0); - debug_assert_eq!(self.kind, VECI8_KIND); unsafe { std::slice::from_raw_parts(self.phantom.as_ptr(), self.len as usize) } } diff --git a/src/index/am.rs b/src/index/am.rs index feb34274c..e008efd23 100644 --- a/src/index/am.rs +++ b/src/index/am.rs @@ -1,29 +1,32 @@ -#![allow(unsafe_op_in_unsafe_fn)] - -use super::am_build; +use super::am_options; use super::am_scan; -use super::am_setup; -use super::am_update; +use crate::error::*; use crate::gucs::planning::ENABLE_INDEX; -use crate::index::utils::{from_datum, get_handle}; +use crate::index::am_scan::Scanner; +use crate::index::catalog::{on_index_build, on_index_write}; +use crate::index::utils::{ctid_to_pointer, pointer_to_ctid}; +use crate::index::utils::{from_datum_to_vector, from_oid_to_handle}; +use crate::ipc::{client, ClientRpc}; use crate::utils::cells::PgCell; -use crate::utils::sys::IntoSys; +use am_options::Reloption; +use base::index::*; use pgrx::datum::Internal; use pgrx::pg_sys::Datum; -static RELOPT_KIND: PgCell = unsafe { PgCell::new(0) }; +static RELOPT_KIND_VECTORS: PgCell = unsafe { PgCell::new(0) }; pub unsafe fn init() { - use pgrx::pg_sys::AsPgCStr; - RELOPT_KIND.set(pgrx::pg_sys::add_reloption_kind()); - pgrx::pg_sys::add_string_reloption( - RELOPT_KIND.get(), - "options".as_pg_cstr(), - "".as_pg_cstr(), - "".as_pg_cstr(), - None, - pgrx::pg_sys::AccessExclusiveLock as pgrx::pg_sys::LOCKMODE, - ); + unsafe { + RELOPT_KIND_VECTORS.set(pgrx::pg_sys::add_reloption_kind()); + pgrx::pg_sys::add_string_reloption( + RELOPT_KIND_VECTORS.get(), + c"options".as_ptr(), + c"Vector index options, represented as a TOML string.".as_ptr(), + c"".as_ptr(), + None, + pgrx::pg_sys::AccessExclusiveLock as pgrx::pg_sys::LOCKMODE, + ); + } } #[pgrx::pg_extern(sql = "\ @@ -62,21 +65,20 @@ const AM_HANDLER: pgrx::pg_sys::IndexAmRoutine = { am_routine.ambuild = Some(ambuild); am_routine.ambuildempty = Some(ambuildempty); am_routine.aminsert = Some(aminsert); + am_routine.ambulkdelete = Some(ambulkdelete); + am_routine.amvacuumcleanup = Some(amvacuumcleanup); am_routine.ambeginscan = Some(ambeginscan); am_routine.amrescan = Some(amrescan); am_routine.amgettuple = Some(amgettuple); am_routine.amendscan = Some(amendscan); - am_routine.ambulkdelete = Some(ambulkdelete); - am_routine.amvacuumcleanup = Some(amvacuumcleanup); - am_routine }; #[pgrx::pg_guard] pub unsafe extern "C" fn amvalidate(opclass_oid: pgrx::pg_sys::Oid) -> bool { - if am_setup::convert_opclass_to_vd(opclass_oid).is_some() { + if am_options::convert_opclass_to_vd(opclass_oid).is_some() { pgrx::info!("Vector indexes can only be built on built-in operator classes."); true } else { @@ -86,21 +88,16 @@ pub unsafe extern "C" fn amvalidate(opclass_oid: pgrx::pg_sys::Oid) -> bool { #[pgrx::pg_guard] pub unsafe extern "C" fn amoptions(reloptions: Datum, validate: bool) -> *mut pgrx::pg_sys::bytea { - use pgrx::pg_sys::AsPgCStr; - - let tab: &[pgrx::pg_sys::relopt_parse_elt] = &[pgrx::pg_sys::relopt_parse_elt { - optname: "options".as_pg_cstr(), - opttype: pgrx::pg_sys::relopt_type_RELOPT_TYPE_STRING, - offset: am_setup::helper_offset() as i32, - }]; - let rdopts = pgrx::pg_sys::build_reloptions( - reloptions, - validate, - RELOPT_KIND.get(), - am_setup::helper_size(), - tab.as_ptr(), - tab.len() as _, - ); + let rdopts = unsafe { + pgrx::pg_sys::build_reloptions( + reloptions, + validate, + RELOPT_KIND_VECTORS.get(), + std::mem::size_of::(), + Reloption::TAB.as_ptr(), + Reloption::TAB.len() as _, + ) + }; rdopts as *mut pgrx::pg_sys::bytea } @@ -115,127 +112,182 @@ pub unsafe extern "C" fn amcostestimate( index_correlation: *mut f64, index_pages: *mut f64, ) { - if (*path).indexorderbys.is_null() || !ENABLE_INDEX.get() { - *index_startup_cost = f64::MAX; - *index_total_cost = f64::MAX; - *index_selectivity = 0.0; - *index_correlation = 0.0; + unsafe { + if (*path).indexorderbys.is_null() || !ENABLE_INDEX.get() { + *index_startup_cost = f64::MAX; + *index_total_cost = f64::MAX; + *index_selectivity = 0.0; + *index_correlation = 0.0; + *index_pages = 0.0; + return; + } + *index_startup_cost = 0.0; + *index_total_cost = 0.0; + *index_selectivity = 1.0; + *index_correlation = 1.0; *index_pages = 0.0; - return; } - *index_startup_cost = 0.0; - *index_total_cost = 0.0; - *index_selectivity = 1.0; - *index_correlation = 1.0; - *index_pages = 0.0; } #[pgrx::pg_guard] pub unsafe extern "C" fn ambuild( - heap_relation: pgrx::pg_sys::Relation, - index_relation: pgrx::pg_sys::Relation, + heap: pgrx::pg_sys::Relation, + index: pgrx::pg_sys::Relation, index_info: *mut pgrx::pg_sys::IndexInfo, ) -> *mut pgrx::pg_sys::IndexBuildResult { - let result = pgrx::PgBox::::alloc0(); - am_build::build( - index_relation, - Some((heap_relation, index_info, result.as_ptr())), - ); - make_well_formed(index_relation); - result.into_pg() -} - -#[pgrx::pg_guard] -pub unsafe extern "C" fn ambuildempty(_index: pgrx::pg_sys::Relation) {} - -#[repr(C)] -struct VectorsPageOpaqueData { - _reserved: [u8; 2048], -} - -const _: () = assert!(std::mem::size_of::() == 2048); - -unsafe fn make_well_formed(index_relation: pgrx::pg_sys::Relation) { + pub struct Builder { + pub rpc: ClientRpc, + pub heap: *mut pgrx::pg_sys::RelationData, + pub index_info: *mut pgrx::pg_sys::IndexInfo, + pub result: *mut pgrx::pg_sys::IndexBuildResult, + } + let oid = unsafe { (*index).rd_id }; + let handle = from_oid_to_handle(oid); + let options = unsafe { am_options::options(index) }; + let mut rpc = check_client(client()); + match rpc.create(handle, options) { + Ok(()) => (), + Err(CreateError::InvalidIndexOptions { reason }) => { + bad_service_invalid_index_options(&reason); + } + } + on_index_build(handle); + let result = unsafe { pgrx::PgBox::::alloc0() }; + let mut builder = Builder { + rpc, + heap, + index_info, + result: result.as_ptr(), + }; + let table_am = unsafe { &*(*heap).rd_tableam }; unsafe { - let meta_buffer = pgrx::pg_sys::ReadBuffer(index_relation, 0xFFFFFFFF /* P_NEW */); - pgrx::pg_sys::LockBuffer(meta_buffer, pgrx::pg_sys::BUFFER_LOCK_EXCLUSIVE as _); - assert!(pgrx::pg_sys::BufferGetBlockNumber(meta_buffer) == 0); - let state = pgrx::pg_sys::GenericXLogStart(index_relation); - let meta_page = pgrx::pg_sys::GenericXLogRegisterBuffer( - state, - meta_buffer, - pgrx::pg_sys::GENERIC_XLOG_FULL_IMAGE as _, - ); - pgrx::pg_sys::PageInit( - meta_page, - pgrx::pg_sys::BLCKSZ as usize, - std::mem::size_of::(), + table_am.index_build_range_scan.unwrap()( + heap, + index, + index_info, + true, + false, + true, + 0, + pgrx::pg_sys::InvalidBlockNumber, + Some(callback), + (&mut builder) as *mut Builder as *mut std::os::raw::c_void, + std::ptr::null_mut(), ); - pgrx::pg_sys::GenericXLogFinish(state); - pgrx::pg_sys::UnlockReleaseBuffer(meta_buffer); } -} - -unsafe fn check_well_formed(index_relation: pgrx::pg_sys::Relation) { - if !test_well_formed(index_relation) { - am_build::build(index_relation, None); - make_well_formed(index_relation); + #[pgrx::pg_guard] + unsafe extern "C" fn callback( + index: pgrx::pg_sys::Relation, + ctid: pgrx::pg_sys::ItemPointer, + values: *mut Datum, + is_null: *mut bool, + _tuple_is_alive: bool, + state: *mut std::os::raw::c_void, + ) { + let state = unsafe { &mut *state.cast::() }; + let vector = unsafe { from_datum_to_vector(*values.add(0), *is_null.add(0)) }; + if let Some(vector) = vector { + let oid = unsafe { (*index).rd_id }; + let handle = from_oid_to_handle(oid); + let pointer = ctid_to_pointer(unsafe { ctid.read() }); + match state.rpc.insert(handle, vector, pointer) { + Ok(()) => (), + Err(InsertError::NotExist) => bad_service_not_exist(), + Err(InsertError::InvalidVector) => bad_service_invalid_vector(), + } + unsafe { + (*state.result).index_tuples += 1.0; + } + } + unsafe { + (*state.result).heap_tuples += 1.0; + } } + result.into_pg() } -unsafe fn test_well_formed(index_relation: pgrx::pg_sys::Relation) -> bool { - pgrx::pg_sys::RelationGetNumberOfBlocksInFork( - index_relation, - pgrx::pg_sys::ForkNumber_MAIN_FORKNUM, - ) == 1 +#[pgrx::pg_guard] +pub unsafe extern "C" fn ambuildempty(_index: pgrx::pg_sys::Relation) { + pgrx::error!("Unlogged indexes are not supported."); } #[pgrx::pg_guard] pub unsafe extern "C" fn aminsert( - index_relation: pgrx::pg_sys::Relation, + index: pgrx::pg_sys::Relation, values: *mut Datum, is_null: *mut bool, heap_tid: pgrx::pg_sys::ItemPointer, - _heap_relation: pgrx::pg_sys::Relation, + _heap: pgrx::pg_sys::Relation, _check_unique: pgrx::pg_sys::IndexUniqueCheck, _index_unchanged: bool, _index_info: *mut pgrx::pg_sys::IndexInfo, ) -> bool { - check_well_formed(index_relation); - let oid = (*index_relation).rd_id; - let id = get_handle(oid); - let vector = from_datum(*values.add(0), *is_null.add(0)); - if let Some(v) = vector { - am_update::update_insert(id, v, *heap_tid); + let oid = unsafe { (*index).rd_id }; + let handle = from_oid_to_handle(oid); + let vector = unsafe { from_datum_to_vector(*values.add(0), *is_null.add(0)) }; + if let Some(vector) = vector { + let pointer = ctid_to_pointer(unsafe { heap_tid.read() }); + + on_index_write(handle); + + let mut rpc = check_client(client()); + + match rpc.insert(handle, vector, pointer) { + Ok(()) => (), + Err(InsertError::NotExist) => bad_service_not_exist(), + Err(InsertError::InvalidVector) => bad_service_invalid_vector(), + } } false } #[pgrx::pg_guard] pub unsafe extern "C" fn ambeginscan( - index_relation: pgrx::pg_sys::Relation, + index: pgrx::pg_sys::Relation, n_keys: std::os::raw::c_int, n_orderbys: std::os::raw::c_int, ) -> pgrx::pg_sys::IndexScanDesc { - check_well_formed(index_relation); - assert!(n_keys == 0); - assert!(n_orderbys == 1); - am_scan::make_scan(index_relation) + use pgrx::PgMemoryContexts::CurrentMemoryContext; + let scan = unsafe { pgrx::pg_sys::RelationGetIndexScan(index, n_keys, n_orderbys) }; + unsafe { + let scanner = am_scan::scan_make(None); + (*scan).opaque = CurrentMemoryContext.leak_and_drop_on_delete(scanner).cast(); + } + scan } #[pgrx::pg_guard] pub unsafe extern "C" fn amrescan( scan: pgrx::pg_sys::IndexScanDesc, - _keys: pgrx::pg_sys::ScanKey, - n_keys: std::os::raw::c_int, + keys: pgrx::pg_sys::ScanKey, + _n_keys: std::os::raw::c_int, orderbys: pgrx::pg_sys::ScanKey, - n_orderbys: std::os::raw::c_int, + _n_orderbys: std::os::raw::c_int, ) { - assert!((*scan).numberOfKeys == n_keys); - assert!((*scan).numberOfOrderBys == n_orderbys); - assert!(n_keys == 0); - assert!(n_orderbys == 1); - am_scan::start_scan(scan, orderbys); + unsafe { + if (*scan).numberOfKeys != 0 { + pgrx::error!("vector search with attributes is not supported"); + } + if (*scan).numberOfOrderBys == 0 { + pgrx::error!("vector search without a ORDER BY clause is not supported"); + } + if (*scan).numberOfOrderBys != 1 { + pgrx::error!("vector search with too many ORDER BY clauses is not supported"); + } + if !keys.is_null() && (*scan).numberOfKeys > 0 { + std::ptr::copy(keys, (*scan).keyData, (*scan).numberOfKeys as _); + } + if !orderbys.is_null() && (*scan).numberOfOrderBys > 0 { + std::ptr::copy(orderbys, (*scan).orderByData, (*scan).numberOfOrderBys as _); + } + let main = (*scan).orderByData.add(0); + let value = (*main).sk_argument; + let is_null = ((*main).sk_flags & pgrx::pg_sys::SK_ISNULL as i32) != 0; + let vector = from_datum_to_vector(value, is_null); + let scanner = (*scan).opaque.cast::().as_mut().unwrap_unchecked(); + let scanner = std::mem::replace(scanner, am_scan::scan_make(vector)); + am_scan::scan_release(scanner); + } } #[pgrx::pg_guard] @@ -243,47 +295,82 @@ pub unsafe extern "C" fn amgettuple( scan: pgrx::pg_sys::IndexScanDesc, direction: pgrx::pg_sys::ScanDirection, ) -> bool { - assert!(direction == pgrx::pg_sys::ScanDirection_ForwardScanDirection); - am_scan::next_scan(scan) + if direction != pgrx::pg_sys::ScanDirection_ForwardScanDirection { + pgrx::error!("vector search without a forward scan direction is not supported"); + } + // https://www.postgresql.org/docs/current/index-locking.html + // If heap entries referenced physical pointers are deleted before + // they are consumed by PostgreSQL, PostgreSQL will received wrong + // physical pointers: no rows or irreverent rows are referenced. + if unsafe { (*(*scan).xs_snapshot).snapshot_type } != pgrx::pg_sys::SnapshotType_SNAPSHOT_MVCC { + pgrx::error!("scanning with a non-MVCC-compliant snapshot is not supported"); + } + let scanner = unsafe { (*scan).opaque.cast::().as_mut().unwrap_unchecked() }; + let oid = unsafe { (*(*scan).indexRelation).rd_id }; + let handle = from_oid_to_handle(oid); + if let Some(pointer) = am_scan::scan_next(scanner, handle) { + let ctid = pointer_to_ctid(pointer); + unsafe { + (*scan).xs_heaptid = ctid; + (*scan).xs_recheck = false; + (*scan).xs_recheckorderby = false; + } + true + } else { + false + } } #[pgrx::pg_guard] pub unsafe extern "C" fn amendscan(scan: pgrx::pg_sys::IndexScanDesc) { - am_scan::end_scan(scan); + unsafe { + let scanner = (*scan).opaque.cast::().as_mut().unwrap_unchecked(); + let scanner = std::mem::replace(scanner, am_scan::scan_make(None)); + am_scan::scan_release(scanner); + } } #[pgrx::pg_guard] pub unsafe extern "C" fn ambulkdelete( info: *mut pgrx::pg_sys::IndexVacuumInfo, - _stats: *mut pgrx::pg_sys::IndexBulkDeleteResult, + stats: *mut pgrx::pg_sys::IndexBulkDeleteResult, callback: pgrx::pg_sys::IndexBulkDeleteCallback, callback_state: *mut std::os::raw::c_void, ) -> *mut pgrx::pg_sys::IndexBulkDeleteResult { - if !test_well_formed((*info).index) { - pgrx::warning!("The vector index is not initialized."); + let mut stats = stats; + if stats.is_null() { + stats = unsafe { + pgrx::pg_sys::palloc0(std::mem::size_of::()).cast() + }; } - let oid = (*(*info).index).rd_id; - let id = get_handle(oid); + let oid = unsafe { (*(*info).index).rd_id }; + let handle = from_oid_to_handle(oid); if let Some(callback) = callback { - am_update::update_delete(id, |pointer| { - callback( - &mut pointer.into_sys() as *mut pgrx::pg_sys::ItemPointerData, - callback_state, - ) - }); + on_index_write(handle); + + let mut x = match check_client(client()).list(handle) { + Ok(x) => x, + Err((_, ListError::NotExist)) => bad_service_not_exist(), + }; + let mut y = check_client(client()); + while let Some(pointer) = x.next() { + let mut ctid = pointer_to_ctid(pointer); + if unsafe { callback(&mut ctid, callback_state) } { + match y.delete(handle, pointer) { + Ok(()) => (), + Err(DeleteError::NotExist) => (), + } + } + } + x.leave(); } - let result = pgrx::PgBox::::alloc0(); - result.into_pg() + stats } #[pgrx::pg_guard] pub unsafe extern "C" fn amvacuumcleanup( - info: *mut pgrx::pg_sys::IndexVacuumInfo, + _info: *mut pgrx::pg_sys::IndexVacuumInfo, _stats: *mut pgrx::pg_sys::IndexBulkDeleteResult, ) -> *mut pgrx::pg_sys::IndexBulkDeleteResult { - if !test_well_formed((*info).index) { - pgrx::warning!("The vector index is not initialized."); - } - let result = pgrx::PgBox::::alloc0(); - result.into_pg() + std::ptr::null_mut() } diff --git a/src/index/am_build.rs b/src/index/am_build.rs deleted file mode 100644 index 2aff0ff7d..000000000 --- a/src/index/am_build.rs +++ /dev/null @@ -1,80 +0,0 @@ -#![allow(unsafe_op_in_unsafe_fn)] - -use crate::error::*; -use crate::index::am_setup::options; -use crate::index::utils::{from_datum, get_handle}; -use crate::ipc::ClientRpc; -use crate::utils::sys::FromSys; -use base::index::*; -use base::search::*; -use pgrx::pg_sys::{IndexBuildResult, IndexInfo, RelationData}; - -pub struct Builder { - pub rpc: ClientRpc, - pub heap_relation: *mut RelationData, - pub index_info: *mut IndexInfo, - pub result: *mut IndexBuildResult, -} - -pub unsafe fn build( - index: pgrx::pg_sys::Relation, - data: Option<(*mut RelationData, *mut IndexInfo, *mut IndexBuildResult)>, -) { - let oid = (*index).rd_id; - let id = get_handle(oid); - let options = options(index); - let mut rpc = check_client(crate::ipc::client()); - match rpc.create(id, options) { - Ok(()) => (), - Err(CreateError::InvalidIndexOptions { reason }) => { - bad_service_invalid_index_options(&reason); - } - } - super::hook_maintain::maintain_index_in_index_create(id); - if let Some((heap_relation, index_info, result)) = data { - let mut builder = Builder { - rpc, - heap_relation, - index_info, - result, - }; - pgrx::pg_sys::IndexBuildHeapScan( - heap_relation, - index, - index_info, - Some(callback), - &mut builder, - ); - } -} - -#[pgrx::pg_guard] -unsafe extern "C" fn callback( - index_relation: pgrx::pg_sys::Relation, - ctid: pgrx::pg_sys::ItemPointer, - values: *mut pgrx::pg_sys::Datum, - is_null: *mut bool, - _tuple_is_alive: bool, - state: *mut std::os::raw::c_void, -) { - let state = &mut *(state as *mut Builder); - if *is_null.add(0) { - (*state.result).heap_tuples += 1.0; - return; - } - let oid = (*index_relation).rd_id; - let id = get_handle(oid); - let vector = from_datum(*values.add(0), *is_null.add(0)); - let vector = match vector { - Some(v) => v, - None => unreachable!(), - }; - let pointer = Pointer::from_sys(*ctid); - match state.rpc.insert(id, vector, pointer) { - Ok(()) => (), - Err(InsertError::NotExist) => bad_service_not_exist(), - Err(InsertError::InvalidVector) => bad_service_invalid_vector(), - } - (*state.result).heap_tuples += 1.0; - (*state.result).index_tuples += 1.0; -} diff --git a/src/index/am_setup.rs b/src/index/am_options.rs similarity index 78% rename from src/index/am_setup.rs rename to src/index/am_options.rs index 3e91bb25e..f9634103f 100644 --- a/src/index/am_setup.rs +++ b/src/index/am_options.rs @@ -8,17 +8,25 @@ use std::ffi::CStr; #[derive(Copy, Clone, Debug, Default)] #[repr(C)] -pub struct Helper { - pub vl_len_: i32, - pub offset: i32, +pub struct Reloption { + vl_len_: i32, + pub options: i32, } -pub fn helper_offset() -> usize { - std::mem::offset_of!(Helper, offset) -} - -pub fn helper_size() -> usize { - std::mem::size_of::() +impl Reloption { + pub const TAB: &'static [pgrx::pg_sys::relopt_parse_elt] = &[pgrx::pg_sys::relopt_parse_elt { + optname: c"options".as_ptr(), + opttype: pgrx::pg_sys::relopt_type_RELOPT_TYPE_STRING, + offset: std::mem::offset_of!(Reloption, options) as i32, + }]; + unsafe fn options(&self) -> &CStr { + unsafe { + let ptr = std::ptr::addr_of!(*self) + .cast::() + .offset(self.options as _); + CStr::from_ptr(ptr) + } + } } pub fn convert_opclass_to_vd(opclass_oid: pgrx::pg_sys::Oid) -> Option<(VectorKind, DistanceKind)> { @@ -75,8 +83,8 @@ fn convert_name_to_vd(name: &str) -> Option<(VectorKind, DistanceKind)> { } } -unsafe fn convert_varlena_to_soi( - varlena: *const pgrx::pg_sys::varlena, +unsafe fn convert_reloptions_to_options( + reloptions: *const pgrx::pg_sys::varlena, ) -> (SegmentsOptions, OptimizingOptions, IndexingOptions) { #[derive(Debug, Clone, Deserialize, Default)] #[serde(deny_unknown_fields)] @@ -88,21 +96,20 @@ unsafe fn convert_varlena_to_soi( #[serde(default)] indexing: IndexingOptions, } - let helper = varlena as *const Helper; - if helper.is_null() || unsafe { (*helper).offset == 0 } { + let reloption = reloptions as *const Reloption; + if reloption.is_null() || unsafe { (*reloption).options == 0 } { return Default::default(); } - let ptr = unsafe { (helper as *const libc::c_char).offset((*helper).offset as isize) }; - let s = unsafe { CStr::from_ptr(ptr) }.to_string_lossy().to_string(); + let s = unsafe { (*reloption).options() }.to_string_lossy(); match toml::from_str::(&s) { Ok(p) => (p.segment, p.optimizing, p.indexing), Err(e) => pgrx::error!("failed to parse options: {}", e), } } -pub unsafe fn options(index_relation: pgrx::pg_sys::Relation) -> IndexOptions { - let opfamily = unsafe { (*index_relation).rd_opfamily.read() }; - let att = unsafe { &mut *(*index_relation).rd_att }; +pub unsafe fn options(index: pgrx::pg_sys::Relation) -> IndexOptions { + let opfamily = unsafe { (*index).rd_opfamily.read() }; + let att = unsafe { &mut *(*index).rd_att }; let atts = unsafe { att.attrs.as_slice(att.natts as _) }; if atts.is_empty() { pgrx::error!("indexing on no columns is not supported"); @@ -117,7 +124,7 @@ pub unsafe fn options(index_relation: pgrx::pg_sys::Relation) -> IndexOptions { let (v, d) = convert_opfamily_to_vd(opfamily).unwrap(); // get segment, optimizing, indexing let (segment, optimizing, indexing) = - unsafe { convert_varlena_to_soi((*index_relation).rd_options) }; + unsafe { convert_reloptions_to_options((*index).rd_options) }; IndexOptions { vector: VectorOptions { dims, v, d }, segment, diff --git a/src/index/am_scan.rs b/src/index/am_scan.rs index cf762b03a..394a5409b 100644 --- a/src/index/am_scan.rs +++ b/src/index/am_scan.rs @@ -1,15 +1,11 @@ -#![allow(unsafe_op_in_unsafe_fn)] - use crate::error::*; use crate::gucs::executing::search_options; use crate::gucs::planning::Mode; use crate::gucs::planning::SEARCH_MODE; -use crate::index::utils::{from_datum, get_handle}; -use crate::ipc::{ClientBasic, ClientVbase}; -use crate::utils::sys::IntoSys; +use crate::ipc::{client, ClientBasic, ClientVbase}; use base::index::*; +use base::search::*; use base::vector::*; -use pgrx::pg_sys::SK_ISNULL; pub enum Scanner { Initial { vector: Option }, @@ -18,69 +14,19 @@ pub enum Scanner { Empty {}, } -pub unsafe fn make_scan(index_relation: pgrx::pg_sys::Relation) -> pgrx::pg_sys::IndexScanDesc { - use pgrx::PgMemoryContexts; - - let scan = pgrx::pg_sys::RelationGetIndexScan(index_relation, 0, 1); - - (*scan).xs_recheck = false; - (*scan).xs_recheckorderby = false; - - (*scan).opaque = PgMemoryContexts::CurrentMemoryContext - .leak_and_drop_on_delete(Scanner::Initial { vector: None }) as _; - - (*scan).xs_orderbyvals = pgrx::pg_sys::palloc0(std::mem::size_of::()) as _; - - (*scan).xs_orderbynulls = { - let data = pgrx::pg_sys::palloc(std::mem::size_of::()) as *mut bool; - data.write_bytes(1, 1); - data - }; - - scan +pub fn scan_make(vector: Option) -> Scanner { + Scanner::Initial { vector } } -pub unsafe fn start_scan(scan: pgrx::pg_sys::IndexScanDesc, orderbys: pgrx::pg_sys::ScanKey) { - std::ptr::copy(orderbys, (*scan).orderByData, 1); - let is_null = (SK_ISNULL & (*orderbys.add(0)).sk_flags as u32) != 0; - let vector = from_datum((*orderbys.add(0)).sk_argument, is_null); - - let scanner = &mut *((*scan).opaque as *mut Scanner); - let scanner = std::mem::replace(scanner, Scanner::Initial { vector }); - - match scanner { - Scanner::Initial { .. } => {} - Scanner::Basic { basic, .. } => { - basic.leave(); - } - Scanner::Vbase { vbase, .. } => { - vbase.leave(); - } - Scanner::Empty {} => {} - } -} - -pub unsafe fn next_scan(scan: pgrx::pg_sys::IndexScanDesc) -> bool { - let scanner = &mut *((*scan).opaque as *mut Scanner); +pub fn scan_next(scanner: &mut Scanner, handle: Handle) -> Option { if let Scanner::Initial { vector } = scanner { if let Some(vector) = vector.as_ref() { - // https://www.postgresql.org/docs/current/index-locking.html - // If heap entries referenced physical pointers are deleted before - // they are consumed by PostgreSQL, PostgreSQL will received wrong - // physical pointers: no rows or irreverent rows are referenced. - if (*(*scan).xs_snapshot).snapshot_type != pgrx::pg_sys::SnapshotType_SNAPSHOT_MVCC { - pgrx::error!("scanning with a non-MVCC-compliant snapshot is not supported"); - } - - let oid = (*(*scan).indexRelation).rd_id; - let id = get_handle(oid); - - let rpc = check_client(crate::ipc::client()); + let rpc = check_client(client()); match SEARCH_MODE.get() { Mode::basic => { let opts = search_options(); - let basic = match rpc.basic(id, vector.clone(), opts) { + let basic = match rpc.basic(handle, vector.clone(), opts) { Ok(x) => x, Err((_, BasicError::NotExist)) => bad_service_not_exist(), Err((_, BasicError::InvalidVector)) => bad_service_invalid_vector(), @@ -90,7 +36,7 @@ pub unsafe fn next_scan(scan: pgrx::pg_sys::IndexScanDesc) -> bool { } Mode::vbase => { let opts = search_options(); - let vbase = match rpc.vbase(id, vector.clone(), opts) { + let vbase = match rpc.vbase(handle, vector.clone(), opts) { Ok(x) => x, Err((_, VbaseError::NotExist)) => bad_service_not_exist(), Err((_, VbaseError::InvalidVector)) => bad_service_invalid_vector(), @@ -105,30 +51,13 @@ pub unsafe fn next_scan(scan: pgrx::pg_sys::IndexScanDesc) -> bool { } match scanner { Scanner::Initial { .. } => unreachable!(), - Scanner::Basic { basic, .. } => { - if let Some(p) = basic.next() { - (*scan).xs_heaptid = p.into_sys(); - true - } else { - false - } - } - Scanner::Vbase { vbase, .. } => { - if let Some(p) = vbase.next() { - (*scan).xs_heaptid = p.into_sys(); - true - } else { - false - } - } - Scanner::Empty {} => false, + Scanner::Basic { basic, .. } => basic.next(), + Scanner::Vbase { vbase, .. } => vbase.next(), + Scanner::Empty {} => None, } } -pub unsafe fn end_scan(scan: pgrx::pg_sys::IndexScanDesc) { - let scanner = &mut *((*scan).opaque as *mut Scanner); - let scanner = std::mem::replace(scanner, Scanner::Initial { vector: None }); - +pub fn scan_release(scanner: Scanner) { match scanner { Scanner::Initial { .. } => {} Scanner::Basic { basic, .. } => { diff --git a/src/index/am_update.rs b/src/index/am_update.rs deleted file mode 100644 index 4c661713b..000000000 --- a/src/index/am_update.rs +++ /dev/null @@ -1,38 +0,0 @@ -use crate::error::*; -use crate::index::hook_maintain::maintain_index_in_index_access; -use crate::utils::sys::FromSys; -use base::index::*; -use base::search::*; -use base::vector::*; - -pub fn update_insert(handle: Handle, vector: OwnedVector, tid: pgrx::pg_sys::ItemPointerData) { - maintain_index_in_index_access(handle); - - let pointer = Pointer::from_sys(tid); - let mut rpc = check_client(crate::ipc::client()); - - match rpc.insert(handle, vector, pointer) { - Ok(()) => (), - Err(InsertError::NotExist) => bad_service_not_exist(), - Err(InsertError::InvalidVector) => bad_service_invalid_vector(), - } -} - -pub fn update_delete(handle: Handle, f: impl Fn(Pointer) -> bool) { - maintain_index_in_index_access(handle); - - let mut rpc_list = match check_client(crate::ipc::client()).list(handle) { - Ok(x) => x, - Err((_, ListError::NotExist)) => bad_service_not_exist(), - }; - let mut rpc = check_client(crate::ipc::client()); - while let Some(p) = rpc_list.next() { - if f(p) { - match rpc.delete(handle, p) { - Ok(()) => (), - Err(DeleteError::NotExist) => (), - } - } - } - rpc_list.leave(); -} diff --git a/src/index/hook_maintain.rs b/src/index/catalog.rs similarity index 93% rename from src/index/hook_maintain.rs rename to src/index/catalog.rs index cd28d5137..4697d37bd 100644 --- a/src/index/hook_maintain.rs +++ b/src/index/catalog.rs @@ -1,5 +1,5 @@ use crate::error::*; -use crate::index::utils::get_handle; +use crate::index::utils::from_oid_to_handle; use crate::ipc::client; use crate::utils::cells::PgRefCell; use base::search::Handle; @@ -29,7 +29,7 @@ impl Transaction { static TRANSACTION: PgRefCell = unsafe { PgRefCell::new(Transaction::new()) }; -pub fn maintain_index_in_index_create(handle: Handle) { +pub fn on_index_build(handle: Handle) { let mut t = TRANSACTION.borrow_mut(); match t.index.get(&handle) { Some(TransactionIndex::Create) => { @@ -39,7 +39,7 @@ pub fn maintain_index_in_index_create(handle: Handle) { // It's a reindex t.index.insert(handle, TransactionIndex::Create); } - Some(TransactionIndex::Drop) => panic!("Reuse handle in a transaction."), + Some(TransactionIndex::Drop) => unreachable!("reused oid in a transaction."), None => { // It's an index or reindex t.index.insert(handle, TransactionIndex::Create); @@ -47,7 +47,7 @@ pub fn maintain_index_in_index_create(handle: Handle) { } } -pub fn maintain_index_in_index_access(handle: Handle) { +pub fn on_index_write(handle: Handle) { let mut t = TRANSACTION.borrow_mut(); match t.index.get(&handle) { Some(TransactionIndex::Create) => (), @@ -60,7 +60,7 @@ pub fn maintain_index_in_index_access(handle: Handle) { } } -pub unsafe fn maintain_index_in_object_access( +pub unsafe fn on_object_access( access: pgrx::pg_sys::ObjectAccessType, class_id: Oid, object_id: Oid, @@ -81,12 +81,12 @@ pub unsafe fn maintain_index_in_object_access( let search = pgrx::pg_catalog::PgClass::search_reloid(object_id).unwrap(); if let Some(pg_class) = search.get() { if let Some(()) = check_vector_index(pg_class) { - let handle = get_handle(object_id); + let handle = from_oid_to_handle(object_id); let mut t = TRANSACTION.borrow_mut(); match t.index.get(&handle) { Some(TransactionIndex::Create) => { // It's created in this transaction, so drop it immediately - let handle = get_handle(object_id); + let handle = from_oid_to_handle(object_id); let mut rpc = check_client(client()); if let Err(e) = rpc.drop(handle) { pgrx::warning!("Failed to drop {handle} for abortting: {e}."); @@ -161,7 +161,7 @@ fn check_vector_index_slow_path(pg_am: pgrx::pg_catalog::PgAm<'_>) -> Option<()> Some(()) } -pub unsafe fn maintain_index_for_commit() { +pub unsafe fn on_commit() { let t = std::mem::replace(&mut *TRANSACTION.borrow_mut(), Transaction::new()); if let Err(e) = std::panic::catch_unwind(|| { if t.index.is_empty() { @@ -193,7 +193,7 @@ pub unsafe fn maintain_index_for_commit() { } } -pub unsafe fn maintain_index_for_abort() { +pub unsafe fn on_abort() { let t = std::mem::replace(&mut *TRANSACTION.borrow_mut(), Transaction::new()); if let Err(e) = std::panic::catch_unwind(|| { if t.index.is_empty() { diff --git a/src/index/hook_compat.rs b/src/index/compatibility.rs similarity index 98% rename from src/index/hook_compat.rs rename to src/index/compatibility.rs index 4a357b145..1db7687e1 100644 --- a/src/index/hook_compat.rs +++ b/src/index/compatibility.rs @@ -15,7 +15,7 @@ unsafe fn swap_destroy(target: &mut *mut T, value: *mut T) { } } -pub unsafe fn pgvector_stmt_rewrite(pstmt: *mut pgrx::pg_sys::PlannedStmt) { +pub unsafe fn on_process_utility(pstmt: *mut pgrx::pg_sys::PlannedStmt) { let enabled = ENABLE_PGVECTOR_COMPATIBILITY.get(); if !enabled { return; diff --git a/src/index/functions.rs b/src/index/functions.rs index 701fa1b89..4bb7f492d 100644 --- a/src/index/functions.rs +++ b/src/index/functions.rs @@ -1,4 +1,7 @@ #[pgrx::pg_extern(volatile, strict, parallel_safe)] fn _vectors_pgvectors_upgrade() { + if crate::bgworker::is_started() { + return; + } let _ = std::fs::remove_dir_all("pg_vectors"); } diff --git a/src/index/hooks.rs b/src/index/hooks.rs index e40f067c4..e1f2f14d2 100644 --- a/src/index/hooks.rs +++ b/src/index/hooks.rs @@ -28,7 +28,7 @@ unsafe extern "C" fn vectors_process_utility( completion_tag: *mut pgrx::pg_sys::QueryCompletion, ) { unsafe { - super::hook_compat::pgvector_stmt_rewrite(pstmt); + super::compatibility::on_process_utility(pstmt); } unsafe { if let Some(prev_process_utility) = PREV_PROCESS_UTILITY { @@ -66,9 +66,7 @@ unsafe extern "C" fn vectors_object_access( arg: *mut libc::c_void, ) { unsafe { - super::hook_maintain::maintain_index_in_object_access( - access, class_id, object_id, sub_id, arg, - ); + super::catalog::on_object_access(access, class_id, object_id, sub_id, arg); if let Some(next_object_access) = NEXT_OBJECT_ACCESS_HOOK { next_object_access(access, class_id, object_id, sub_id, arg); } @@ -80,11 +78,11 @@ unsafe extern "C" fn xact_callback(event: pgrx::pg_sys::XactEvent, _data: pgrx:: match event { pgrx::pg_sys::XactEvent_XACT_EVENT_PRE_COMMIT | pgrx::pg_sys::XactEvent_XACT_EVENT_PARALLEL_PRE_COMMIT => unsafe { - super::hook_maintain::maintain_index_for_commit(); + super::catalog::on_commit(); }, pgrx::pg_sys::XactEvent_XACT_EVENT_ABORT | pgrx::pg_sys::XactEvent_XACT_EVENT_PARALLEL_ABORT => unsafe { - super::hook_maintain::maintain_index_for_abort(); + super::catalog::on_abort(); }, _ => {} } diff --git a/src/index/mod.rs b/src/index/mod.rs index 35a4f375b..3f196e33c 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -1,11 +1,9 @@ mod am; -mod am_build; +mod am_options; mod am_scan; -mod am_setup; -mod am_update; +mod catalog; +mod compatibility; mod functions; -mod hook_compat; -mod hook_maintain; mod hooks; mod utils; mod views; diff --git a/src/index/utils.rs b/src/index/utils.rs index 05a28bd8a..2af00fcfa 100644 --- a/src/index/utils.rs +++ b/src/index/utils.rs @@ -1,5 +1,3 @@ -#![allow(unsafe_op_in_unsafe_fn)] - use crate::datatype::memory_bvecf32::BVecf32Header; use crate::datatype::memory_svecf32::SVecf32Header; use crate::datatype::memory_vecf16::Vecf16Header; @@ -9,49 +7,54 @@ use crate::utils::cells::PgCell; use base::search::*; use base::vector::*; -#[repr(C, align(8))] -struct Header { - varlena: u32, - dims: u16, - kind: u16, -} - -pub unsafe fn from_datum(values: pgrx::pg_sys::Datum, is_null: bool) -> Option { +pub unsafe fn from_datum_to_vector( + value: pgrx::pg_sys::Datum, + is_null: bool, +) -> Option { + #[repr(C, align(8))] + struct Header { + varlena: u32, + _reserved: u16, + kind: u16, + } if is_null { return None; } - let p = values.cast_mut_ptr::(); - let q = pgrx::pg_sys::pg_detoast_datum(p); - let vector = match (*q.cast::
()).kind { + let p = value.cast_mut_ptr::(); + let q = scopeguard::guard(unsafe { pgrx::pg_sys::pg_detoast_datum(p) }, |q| { + if p != q { + unsafe { + pgrx::pg_sys::pfree(q.cast()); + } + } + }); + let vector = match unsafe { (*q.cast::
()).kind } { 0 => { - let v = &*q.cast::(); + let v = unsafe { &*q.cast::() }; Some(OwnedVector::Vecf32(v.for_borrow().for_own())) } 1 => { - let v = &*q.cast::(); + let v = unsafe { &*q.cast::() }; Some(OwnedVector::Vecf16(v.for_borrow().for_own())) } 2 => { - let v = &*q.cast::(); + let v = unsafe { &*q.cast::() }; Some(OwnedVector::SVecf32(v.for_borrow().for_own())) } 3 => { - let v = &*q.cast::(); + let v = unsafe { &*q.cast::() }; Some(OwnedVector::BVecf32(v.for_borrow().for_own())) } 4 => { - let v = &*q.cast::(); + let v = unsafe { &*q.cast::() }; Some(OwnedVector::Veci8(v.for_borrow().for_own())) } _ => unreachable!(), }; - if p != q { - pgrx::pg_sys::pfree(q.cast()); - } vector } -pub fn get_handle(oid: pgrx::pg_sys::Oid) -> Handle { +pub fn from_oid_to_handle(oid: pgrx::pg_sys::Oid) -> Handle { static SYSTEM_IDENTIFIER: PgCell = unsafe { PgCell::new(0) }; if SYSTEM_IDENTIFIER.get() == 0 { SYSTEM_IDENTIFIER.set(unsafe { pgrx::pg_sys::GetSystemIdentifier() }); @@ -61,3 +64,22 @@ pub fn get_handle(oid: pgrx::pg_sys::Oid) -> Handle { let c = oid.as_u32() as u128; Handle::new(a << 96 | b << 32 | c) } + +pub fn pointer_to_ctid(pointer: Pointer) -> pgrx::pg_sys::ItemPointerData { + let value = pointer.as_u64(); + pgrx::pg_sys::ItemPointerData { + ip_blkid: pgrx::pg_sys::BlockIdData { + bi_hi: ((value >> 32) & 0xffff) as u16, + bi_lo: ((value >> 16) & 0xffff) as u16, + }, + ip_posid: (value & 0xffff) as u16, + } +} + +pub fn ctid_to_pointer(ctid: pgrx::pg_sys::ItemPointerData) -> Pointer { + let mut value = 0; + value |= (ctid.ip_blkid.bi_hi as u64) << 32; + value |= (ctid.ip_blkid.bi_lo as u64) << 16; + value |= ctid.ip_posid as u64; + Pointer::new(value) +} diff --git a/src/index/views.rs b/src/index/views.rs index 63495ef25..5f85a7a2a 100644 --- a/src/index/views.rs +++ b/src/index/views.rs @@ -1,5 +1,6 @@ use crate::error::*; -use crate::index::utils::get_handle; +use crate::index::utils::from_oid_to_handle; +use crate::ipc::client; use base::index::*; #[pgrx::pg_extern(volatile, strict, parallel_safe)] @@ -7,10 +8,10 @@ fn _vectors_index_stat( oid: pgrx::pg_sys::Oid, ) -> pgrx::composite_type!('static, "vectors.vector_index_stat") { use pgrx::heap_tuple::PgHeapTuple; - let id = get_handle(oid); + let handle = from_oid_to_handle(oid); let mut res = PgHeapTuple::new_composite_type("vectors.vector_index_stat").unwrap(); - let mut rpc = check_client(crate::ipc::client()); - let stat = rpc.stat(id); + let mut rpc = check_client(client()); + let stat = rpc.stat(handle); match stat { Ok(IndexStat { indexing, diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 94d64f557..967246145 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,2 @@ pub mod cells; pub mod parse; -pub mod sys; diff --git a/src/utils/sys.rs b/src/utils/sys.rs deleted file mode 100644 index e35b11366..000000000 --- a/src/utils/sys.rs +++ /dev/null @@ -1,32 +0,0 @@ -use base::search::*; - -pub trait FromSys { - fn from_sys(sys: T) -> Self; -} - -impl FromSys for Pointer { - fn from_sys(sys: pgrx::pg_sys::ItemPointerData) -> Self { - let mut newtype = 0; - newtype |= (sys.ip_blkid.bi_hi as u64) << 32; - newtype |= (sys.ip_blkid.bi_lo as u64) << 16; - newtype |= sys.ip_posid as u64; - Self::new(newtype) - } -} - -pub trait IntoSys { - fn into_sys(self) -> T; -} - -impl IntoSys for Pointer { - fn into_sys(self) -> pgrx::pg_sys::ItemPointerData { - let x = self.as_u64(); - pgrx::pg_sys::ItemPointerData { - ip_blkid: pgrx::pg_sys::BlockIdData { - bi_hi: ((x >> 32) & 0xffff) as u16, - bi_lo: ((x >> 16) & 0xffff) as u16, - }, - ip_posid: (x & 0xffff) as u16, - } - } -}