Skip to content

Commit

Permalink
libsql-wal
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Apr 25, 2024
1 parent 7770121 commit 42784f1
Show file tree
Hide file tree
Showing 8,317 changed files with 163,201 additions and 130 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
330 changes: 247 additions & 83 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"vendored/sqlite3-parser",

"xtask", "libsql-hrana",
"libsql-wal"
]

exclude = [
Expand Down
6 changes: 3 additions & 3 deletions libsql-ffi/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ fn build_multiple_ciphers(out_path: &Path) {
let cxx = env("CXX");

let toolchain_path = sqlite3mc_build_dir.join("toolchain.cmake");
let cmake_toolchain_opt = format!("-DCMAKE_TOOLCHAIN_FILE=toolchain.cmake");
let cmake_toolchain_opt = "-DCMAKE_TOOLCHAIN_FILE=toolchain.cmake".to_string();

let mut toolchain_file = OpenOptions::new()
.create(true)
Expand Down Expand Up @@ -363,8 +363,8 @@ fn build_multiple_ciphers(out_path: &Path) {

let mut make = Command::new("cmake");
make.current_dir(sqlite3mc_build_dir.clone());
make.args(&["--build", "."]);
make.args(&["--config", "Release"]);
make.args(["--build", "."]);
make.args(["--config", "Release"]);
if !make.status().unwrap().success() {
panic!("Failed to run make");
}
Expand Down
1 change: 1 addition & 0 deletions libsql-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ itertools = "0.10.5"
jsonwebtoken = "9"
libsql = { path = "../libsql/", optional = true }
libsql_replication = { path = "../libsql-replication" }
libsql-wal = { path = "../libsql-wal/" }
metrics = "0.21.1"
metrics-util = "0.15"
metrics-exporter-prometheus = "0.12.2"
Expand Down
25 changes: 15 additions & 10 deletions libsql-server/src/connection/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use std::time::{Duration, Instant};
use crossbeam::deque::Steal;
use crossbeam::sync::{Parker, Unparker};
use hashbrown::HashMap;

Check failure on line 8 in libsql-server/src/connection/connection_manager.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/connection/connection_manager.rs
use libsql_sys::wal::either::Either;
use libsql_sys::wal::wrapper::{WrapWal, WrappedWal};
use libsql_sys::wal::{CheckpointMode, Sqlite3Wal, Wal};
use libsql_sys::wal::{CheckpointMode, Sqlite3Wal, Wal, Sqlite3WalManager};
use libsql_wal::fs::StdFs;
use libsql_wal::wal::{LibsqlWal, LibsqlWalManager};
use metrics::atomics::AtomicU64;
use parking_lot::{Mutex, MutexGuard};
use rusqlite::ErrorCode;
Expand All @@ -17,7 +20,9 @@ use super::TXN_TIMEOUT;

pub type ConnId = u64;

pub type ManagedConnectionWal = WrappedWal<ManagedConnectionWalWrapper, Sqlite3Wal>;
pub type InnerWalManager = Either<Sqlite3WalManager, LibsqlWalManager<StdFs>>;
pub type InnerWal = Either<Sqlite3Wal, LibsqlWal<StdFs>>;
pub type ManagedConnectionWal = WrappedWal<ManagedConnectionWalWrapper, InnerWal>;

#[derive(Copy, Clone, Debug)]
struct Slot {
Expand Down Expand Up @@ -356,9 +361,9 @@ impl SlotState {
}
}

impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
impl WrapWal<InnerWal> for ManagedConnectionWalWrapper {
#[tracing::instrument(skip_all, fields(id = self.id))]
fn begin_write_txn(&mut self, wrapped: &mut Sqlite3Wal) -> libsql_sys::wal::Result<()> {
fn begin_write_txn(&mut self, wrapped: &mut InnerWal) -> libsql_sys::wal::Result<()> {
tracing::debug!("begin write");
self.acquire()?;
match wrapped.begin_write_txn() {
Expand Down Expand Up @@ -387,7 +392,7 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
#[tracing::instrument(skip_all, fields(id = self.id))]
fn checkpoint(
&mut self,
wrapped: &mut Sqlite3Wal,
wrapped: &mut InnerWal,
db: &mut libsql_sys::wal::Sqlite3Db,
mode: libsql_sys::wal::CheckpointMode,
busy_handler: Option<&mut dyn libsql_sys::wal::BusyHandler>,
Expand Down Expand Up @@ -438,13 +443,13 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn begin_read_txn(&mut self, wrapped: &mut Sqlite3Wal) -> libsql_sys::wal::Result<bool> {
fn begin_read_txn(&mut self, wrapped: &mut InnerWal) -> libsql_sys::wal::Result<bool> {
tracing::debug!("begin read txn");
wrapped.begin_read_txn()
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn end_read_txn(&mut self, wrapped: &mut Sqlite3Wal) {
fn end_read_txn(&mut self, wrapped: &mut InnerWal) {
wrapped.end_read_txn();
{
let current = self.manager.current.lock();
Expand All @@ -467,7 +472,7 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn end_write_txn(&mut self, wrapped: &mut Sqlite3Wal) -> libsql_sys::wal::Result<()> {
fn end_write_txn(&mut self, wrapped: &mut InnerWal) -> libsql_sys::wal::Result<()> {
wrapped.end_write_txn()?;
tracing::debug!("end write txn");
self.release();
Expand All @@ -476,10 +481,10 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn close<M: libsql_sys::wal::WalManager<Wal = Sqlite3Wal>>(
fn close<M: libsql_sys::wal::WalManager<Wal = InnerWal>>(
&mut self,
manager: &M,
wrapped: &mut Sqlite3Wal,
wrapped: &mut InnerWal,
db: &mut libsql_sys::wal::Sqlite3Db,
sync_flags: std::ffi::c_int,
_scratch: Option<&mut [u8]>,
Expand Down
23 changes: 19 additions & 4 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use libsql_sys::wal::wrapper::{WrapWal, WrappedWal};
use libsql_sys::wal::{BusyHandler, CheckpointCallback, Sqlite3WalManager, Wal, WalManager};
use libsql_sys::wal::{BusyHandler, CheckpointCallback, Wal, WalManager};
use libsql_sys::EncryptionConfig;
use metrics::histogram;
use parking_lot::Mutex;
Expand All @@ -25,7 +25,7 @@ use crate::stats::{Stats, StatsUpdateMessage};
use crate::{Result, BLOCKING_RT};

Check failure on line 25 in libsql-server/src/connection/libsql.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/connection/libsql.rs

use super::connection_manager::{
ConnectionManager, ManagedConnectionWal, ManagedConnectionWalWrapper,
ConnectionManager, ManagedConnectionWal, ManagedConnectionWalWrapper, InnerWalManager,
};
use super::program::{
check_describe_auth, check_program_auth, DescribeCol, DescribeParam, DescribeResponse, Vm,
Expand All @@ -48,6 +48,7 @@ pub struct MakeLibSqlConn<W> {
encryption_config: Option<EncryptionConfig>,
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
}

impl<W> MakeLibSqlConn<W>
Expand All @@ -68,6 +69,7 @@ where
encryption_config: Option<EncryptionConfig>,
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
) -> Result<Self> {
let mut this = Self {
db_path,
Expand All @@ -84,6 +86,7 @@ where
block_writes,
resolve_attach_path,
connection_manager: ConnectionManager::default(),
make_wal_manager,
};

let db = this.try_create_db().await?;
Expand Down Expand Up @@ -138,6 +141,7 @@ where
self.block_writes.clone(),
self.resolve_attach_path.clone(),
self.connection_manager.clone(),
self.make_wal_manager.clone(),
)
.await
}
Expand All @@ -162,6 +166,9 @@ pub struct LibSqlConnection<T> {
#[cfg(test)]
impl LibSqlConnection<libsql_sys::wal::wrapper::PassthroughWalWrapper> {
pub async fn new_test(path: &Path) -> Self {
use libsql_sys::wal::either::Either;
use libsql_sys::wal::Sqlite3WalManager;

Self::new(
path.to_owned(),
Arc::new([]),
Expand All @@ -173,6 +180,7 @@ impl LibSqlConnection<libsql_sys::wal::wrapper::PassthroughWalWrapper> {
Default::default(),
Arc::new(|_| unreachable!()),
ConnectionManager::default(),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap()
Expand Down Expand Up @@ -307,13 +315,14 @@ where
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
connection_manager: ConnectionManager,
make_wal: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
) -> crate::Result<Self> {
let (conn, id) = tokio::task::spawn_blocking({
let connection_manager = connection_manager.clone();
move || -> crate::Result<_> {
let manager = ManagedConnectionWalWrapper::new(connection_manager);
let id = manager.id();
let wal = Sqlite3WalManager::default().wrap(manager).wrap(wal_wrapper);
let wal = make_wal().wrap(manager).wrap(wal_wrapper);

let conn = Connection::new(
path.as_ref(),
Expand Down Expand Up @@ -674,8 +683,9 @@ where
#[cfg(test)]
mod test {
use itertools::Itertools;
use libsql_sys::wal::either::Either;
use libsql_sys::wal::wrapper::PassthroughWalWrapper;
use libsql_sys::wal::Sqlite3Wal;
use libsql_sys::wal::{Sqlite3Wal, Sqlite3WalManager};
use rand::Rng;
use tempfile::tempdir;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -737,6 +747,7 @@ mod test {
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -781,6 +792,7 @@ mod test {
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -830,6 +842,7 @@ mod test {
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -911,6 +924,7 @@ mod test {
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -996,6 +1010,7 @@ mod test {
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions libsql-server/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::replication::FrameNo;
use crate::stats::Stats;
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};

use super::connection_manager::InnerWalManager;
use super::libsql::{LibSqlConnection, MakeLibSqlConn};
use super::program::DescribeResponse;
use super::{Connection, RequestContext};
Expand Down Expand Up @@ -60,6 +61,7 @@ impl MakeWriteProxyConn {
primary_replication_index: Option<FrameNo>,

Check failure on line 61 in libsql-server/src/connection/write_proxy.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/connection/write_proxy.rs
encryption_config: Option<EncryptionConfig>,
resolve_attach_path: ResolveNamespacePathFn,
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Send + Sync + 'static>,
) -> crate::Result<Self> {
let client = ProxyClient::with_origin(channel, uri);
let make_read_only_conn = MakeLibSqlConn::new(
Expand All @@ -75,6 +77,7 @@ impl MakeWriteProxyConn {
encryption_config.clone(),
Arc::new(AtomicBool::new(false)), // this is always false for write proxy
resolve_attach_path,
make_wal_manager,
)
.await?;

Expand Down
4 changes: 4 additions & 0 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use config::{
use http::user::UserApi;

Check failure on line 28 in libsql-server/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/lib.rs
use hyper::client::HttpConnector;
use hyper_rustls::HttpsConnector;
use libsql_sys::wal::Sqlite3WalManager;
use libsql_sys::wal::either::Either;
use namespace::{NamespaceConfig, NamespaceName};
use net::Connector;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -411,6 +413,7 @@ where
let (scheduler_sender, scheduler_receiver) = mpsc::channel(128);

let (stats_sender, stats_receiver) = mpsc::channel(8);
let make_wal_manager = Arc::new(|| Either::Left(Sqlite3WalManager::default()));
let ns_config = NamespaceConfig {
db_kind,
base_path: self.path.clone(),
Expand All @@ -429,6 +432,7 @@ where
channel: channel.clone(),
uri: uri.clone(),
migration_scheduler: scheduler_sender.into(),
make_wal_manager,
};

let (metastore_conn_maker, meta_store_wal_manager) =
Expand Down
4 changes: 4 additions & 0 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use uuid::Uuid;

use crate::auth::parse_jwt_key;
use crate::connection::config::DatabaseConfig;
use crate::connection::connection_manager::InnerWalManager;
use crate::connection::libsql::{open_conn, MakeLibSqlConn};
use crate::connection::write_proxy::MakeWriteProxyConn;
use crate::connection::Connection;
Expand Down Expand Up @@ -361,6 +362,7 @@ impl Namespace {
ns_config.encryption_config.clone(),
block_writes,
resolve_attach_path,
ns_config.make_wal_manager.clone(),
)
.await?
.throttled(
Expand Down Expand Up @@ -590,6 +592,7 @@ impl Namespace {
primary_current_replicatio_index,
config.encryption_config.clone(),
resolve_attach_path,
config.make_wal_manager.clone(),
)
.await?
.throttled(
Expand Down Expand Up @@ -736,6 +739,7 @@ pub struct NamespaceConfig {
pub(crate) bottomless_replication: Option<bottomless::replicator::Options>,
pub(crate) scripted_backup: Option<ScriptBackupManager>,
pub(crate) migration_scheduler: SchedulerHandle,
pub(crate) make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
}

pub type DumpStream =
Expand Down
3 changes: 3 additions & 0 deletions libsql-server/src/schema/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,8 @@ async fn step_job_run_success(
#[cfg(test)]

Check failure on line 760 in libsql-server/src/schema/scheduler.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/schema/scheduler.rs
mod test {
use insta::assert_debug_snapshot;
use libsql_sys::wal::Sqlite3WalManager;
use libsql_sys::wal::either::Either;
use std::path::Path;
use tempfile::tempdir;

Expand Down Expand Up @@ -876,6 +878,7 @@ mod test {
bottomless_replication: None,
scripted_backup: None,
migration_scheduler,
make_wal_manager: Arc::new(|| Either::Left(Sqlite3WalManager::default())),
}
}

Expand Down
Empty file.

0 comments on commit 42784f1

Please sign in to comment.