Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wasm): experimental support for WASI #733

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 19 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ keywords = ["sql", "database", "embedded", "cli"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["jemalloc"]
default = ["jemalloc", "storage", "rustyline", "console-subscriber"]
simd = []
jemalloc = ["tikv-jemallocator"]
storage = ["moka"]

[dependencies]
anyhow = "1"
Expand All @@ -28,7 +29,7 @@ bytes = "1"
chrono = "0.4"
clap = { version = "4", features = ["derive"] }
comfy-table = { version = "6", default-features = false }
console-subscriber = "0.1"
console-subscriber = { version = "0.1", optional = true }
crc32fast = "1"
csv = "1"
dirs = "4"
Expand All @@ -44,7 +45,7 @@ indoc = "1"
iter-chunks = "0.1"
itertools = "0.10"
minitrace = "0.4.0"
moka = { version = "0.9", features = ["future"] }
moka = { version = "0.9", features = ["future"], optional = true }
num-traits = "0.2"
ordered-float = { version = "3", features = ["serde"] }
parking_lot = "0.12"
Expand All @@ -54,15 +55,15 @@ prost = "0.11.0"
ref-cast = "1.0"
risinglight_proto = "0.1"
rust_decimal = "1"
rustyline = "10"
rustyline = { version = "10", optional = true }
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
smallvec = { version = "1", features = ["serde"] }
sqllogictest = "0.6"
sqlparser = { version = "0.26", features = ["serde"] }
thiserror = "1"
tikv-jemallocator = { version = "0.5", optional = true }
tokio = { version = "1", features = ["full"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "fs", "sync", "signal", "tracing", "macros"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "parking_lot"] }

Expand Down
24 changes: 13 additions & 11 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use futures::TryStreamExt;
use risinglight_proto::rowset::block_statistics::BlockStatisticsType;
use tracing::debug;

use crate::array::{
ArrayBuilder, ArrayBuilderImpl, Chunk, DataChunk, I32ArrayBuilder, Utf8ArrayBuilder,
};
use crate::catalog::RootCatalogRef;
use crate::parser::{parse, ParserError};
use crate::storage::{
InMemoryStorage, SecondaryStorage, SecondaryStorageOptions, Storage, StorageColumnRef,
StorageImpl, Table,
};
use crate::storage::{InMemoryStorage, Storage, StorageColumnRef, StorageImpl, Table};
#[cfg(feature = "storage")]
use crate::storage::{SecondaryStorage, SecondaryStorageOptions};
use crate::v1::binder::Binder;
use crate::v1::executor::ExecutorBuilder;
use crate::v1::logical_planner::LogicalPlaner;
Expand All @@ -42,6 +40,7 @@ impl Database {
}

/// Create a new database instance with merge-tree engine.
#[cfg(feature = "storage")]
pub async fn new_on_disk(options: SecondaryStorageOptions) -> Self {
let storage = Arc::new(SecondaryStorage::open(options).await.unwrap());
storage.spawn_compactor().await;
Expand All @@ -53,6 +52,7 @@ impl Database {
}

pub async fn shutdown(&self) -> Result<(), Error> {
#[cfg(feature = "storage")]
if let StorageImpl::SecondaryStorage(storage) = &self.storage {
storage.shutdown().await?;
}
Expand Down Expand Up @@ -94,7 +94,9 @@ impl Database {
pub async fn run_internal(&self, cmd: &str) -> Result<Vec<Chunk>, Error> {
if let Some((cmd, arg)) = cmd.split_once(' ') {
if cmd == "stat" {
#[cfg(feature = "storage")]
if let StorageImpl::SecondaryStorage(ref storage) = self.storage {
use risinglight_proto::rowset::block_statistics::BlockStatisticsType;
let (table, col) = arg.split_once(' ').expect("failed to parse command");
let table_id = self
.catalog
Expand Down Expand Up @@ -141,15 +143,14 @@ impl Database {
.to_string()
.as_str(),
));
Ok(vec![Chunk::new(vec![DataChunk::from_iter([
return Ok(vec![Chunk::new(vec![DataChunk::from_iter([
ArrayBuilderImpl::from(stat_name),
ArrayBuilderImpl::from(stat_value),
])])])
} else {
Err(Error::InternalError(
"this storage engine doesn't support statistics".to_string(),
))
])])]);
}
Err(Error::InternalError(
"this storage engine doesn't support statistics".to_string(),
))
} else {
Err(Error::InternalError("unsupported command".to_string()))
}
Expand Down Expand Up @@ -183,6 +184,7 @@ impl Database {
StorageImpl::InMemoryStorage(s) => {
crate::executor_v2::build(self.catalog.clone(), s, &optimized)
}
#[cfg(feature = "storage")]
StorageImpl::SecondaryStorage(s) => {
crate::executor_v2::build(self.catalog.clone(), s, &optimized)
}
Expand Down
7 changes: 7 additions & 0 deletions src/executor_v2/copy_from_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct CopyFromFileExecutor {
const IMPORT_PROGRESS_BAR_LIMIT: u64 = 1024 * 1024;

impl CopyFromFileExecutor {
#[cfg(not(target_os = "wasi"))]
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self) {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
Expand All @@ -34,6 +35,12 @@ impl CopyFromFileExecutor {
handle.await.unwrap()?;
}

#[cfg(target_os = "wasi")]
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self) {
panic!("not supported in WASI");
}

/// Read records from file using blocking IO.
///
/// The read data chunks will be sent through `tx`.
Expand Down
7 changes: 7 additions & 0 deletions src/executor_v2/copy_to_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct CopyToFileExecutor {
}

impl CopyToFileExecutor {
#[cfg(not(target_os = "wasi"))]
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self, child: BoxedExecutor) {
let (sender, recver) = mpsc::channel(1);
Expand All @@ -36,6 +37,12 @@ impl CopyToFileExecutor {
yield DataChunk::single(rows as _);
}

#[cfg(target_os = "wasi")]
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self, child: BoxedExecutor) {
panic!("not supported in WASI");
}

fn write_file_blocking(
path: PathBuf,
format: FileFormat,
Expand Down
57 changes: 37 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ use clap::Parser;
use humantime::format_duration;
use minitrace::prelude::*;
use risinglight::array::{datachunk_to_sqllogictest_string, Chunk};
use risinglight::storage::SecondaryStorageOptions;
#[cfg(not(feature = "rustyline"))]
use risinglight::utils::rustyline_mock as rustyline;
use risinglight::utils::time::RoundingDuration;
use risinglight::Database;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::{select, signal};
use tokio::select;
use tracing::{info, warn, Level};
use tracing_subscriber::prelude::*;

Expand Down Expand Up @@ -116,22 +117,26 @@ async fn run_query_in_background(
}
};

select! {
_ = signal::ctrl_c() => {
#[cfg(not(target_os = "wasi"))]
let ret = select! {
_ = tokio::signal::ctrl_c() => {
// we simply drop the future `task` to cancel the query.
println!("Interrupted");
return;
}
ret = task => {
match ret {
Ok(chunks) => {
for chunk in chunks {
print_chunk(&chunk, &output_format);
}
print_execution_time(start_time);
}
Err(err) => println!("{}", err),
ret = task => ret,
};
#[cfg(target_os = "wasi")]
let ret = task.await;

match ret {
Ok(chunks) => {
for chunk in chunks {
print_chunk(&chunk, &output_format);
}
print_execution_time(start_time);
}
Err(err) => println!("{}", err),
}
}

Expand Down Expand Up @@ -302,11 +307,13 @@ async fn run_sqllogictest(
Ok(())
}

#[tokio::main]
#[cfg_attr(not(target_os = "wasi"), tokio::main)]
#[cfg_attr(target_os = "wasi", tokio::main(flavor = "current_thread"))]
async fn main() -> Result<()> {
let args = Args::parse();

if args.tokio_console {
#[cfg(feature = "console-subscriber")]
console_subscriber::init();
} else {
let fmt_layer = tracing_subscriber::fmt::layer().compact();
Expand All @@ -321,12 +328,22 @@ async fn main() -> Result<()> {

info!("using query engine v2. type '\\v1' to use the legacy engine");

let db = if args.memory {
info!("using memory engine");
Database::new_in_memory()
} else {
info!("using Secondary engine");
Database::new_on_disk(SecondaryStorageOptions::default_for_cli()).await
let db = match args.memory {
true => {
info!("using memory engine");
Database::new_in_memory()
}
#[cfg(not(feature = "storage"))]
_ => {
info!("using memory engine");
Database::new_in_memory()
}
#[cfg(feature = "storage")]
false => {
info!("using Secondary engine");
use risinglight::storage::SecondaryStorageOptions;
Database::new_on_disk(SecondaryStorageOptions::default_for_cli()).await
}
};

if let Some(file) = args.file {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
mod memory;
pub use memory::InMemoryStorage;

#[cfg(feature = "storage")]
mod secondary;
#[cfg(feature = "storage")]
pub use secondary::{SecondaryStorage, StorageOptions as SecondaryStorageOptions};

mod error;
Expand All @@ -27,6 +29,7 @@ use crate::v1::binder::BoundExpr;
#[derive(Clone)]
pub enum StorageImpl {
InMemoryStorage(Arc<InMemoryStorage>),
#[cfg(feature = "storage")]
SecondaryStorage(Arc<SecondaryStorage>),
}

Expand All @@ -46,6 +49,7 @@ impl StorageImpl {
impl StorageImpl {
pub fn enable_filter_scan(&self) -> bool {
match self {
#[cfg(feature = "storage")]
Self::SecondaryStorage(_) => true,
Self::InMemoryStorage(_) => false,
}
Expand Down
2 changes: 2 additions & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Copyright 2022 RisingLight Project Authors. Licensed under Apache-2.0.

#[cfg(not(feature = "rustyline"))]
pub mod rustyline_mock;
pub mod time;