Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

primary shutdown 500 test #1240

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions libsql-server/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ impl WriteProxyConnection<RpcStream> {
}
}

#[derive(Debug)]
struct RemoteConnection<R = Streaming<ExecResp>> {
response_stream: R,
request_sender: mpsc::Sender<ExecReq>,
Expand All @@ -276,9 +277,36 @@ impl RemoteConnection {
let (request_sender, receiver) = mpsc::channel(1);

let stream = tokio_stream::wrappers::ReceiverStream::new(receiver);
let mut req = Request::new(stream);
ctx.upgrade_grpc_request(&mut req);
let response_stream = client.stream_exec(req).await?.into_inner();

let retryable_stream = RetryableStream::new(stream);

let mut err: Option<tonic::Status> = None;

let response_stream = loop {
let stream = match retryable_stream.try_clone() {
Ok(s) => s,
Err(_) => {
if let Some(e) = err.take() {
return Err(e.into());
} else {
unreachable!("either weve gotten a valid stream or we have an error");
}
}
};
let mut req = Request::new(stream);
ctx.upgrade_grpc_request(&mut req);

let res = client.stream_exec(req).await;

match dbg!(res) {
Ok(res_stream) => break res_stream.into_inner(),
Err(e) => {
err = Some(e);

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
};

Ok(Self {
response_stream,
Expand All @@ -289,6 +317,54 @@ impl RemoteConnection {
}
}

struct RetryableStream<S> {
inner: Arc<parking_lot::Mutex<RetryableStreamShared<S>>>,
}

struct RetryableStreamShared<S> {
been_polled: bool,
inner: S,
}

impl<S> RetryableStream<S> {
fn new(inner: S) -> Self {
Self {
inner: Arc::new(parking_lot::Mutex::new(RetryableStreamShared {
been_polled: false,
inner,
})),
}
}

fn try_clone(&self) -> Result<Self, ()> {
let lock = self.inner.lock();

if !dbg!(lock.been_polled) {
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(())
}
}
}

impl<S> Stream for RetryableStream<S>
where
S: Stream + Unpin,
{
type Item = S::Item;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let mut inner = self.inner.lock();
inner.been_polled = true;
std::pin::Pin::new(&mut inner.inner).poll_next(cx)
}
}

impl<R> RemoteConnection<R>
where
R: Stream<Item = Result<ExecResp, tonic::Status>> + Unpin,
Expand Down
8 changes: 8 additions & 0 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ where

tokio::select! {
_ = shutdown.notified() => {
tracing::debug!("shutdown notification received");
let shutdown = async {
join_set.shutdown().await;
service_shutdown.notify_waiters();
Expand All @@ -609,6 +610,13 @@ where
},

}

// Flag that can be set to ensure shutdown never completes for
// testing purposes.
if std::env::var("LIBSQL_DELAY_SHUTDOWN").is_ok() {
// std::future::pending::<()>().await;
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
}
}
Some(res) = join_set.join_next() => {
res??;
Expand Down
6 changes: 6 additions & 0 deletions libsql-server/src/namespace/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ impl NamespaceStore {
where
Fun: FnOnce(&Namespace) -> R + 'static,
{
if self.inner.has_shutdown.load(Ordering::Relaxed) {
return Err(Error::NamespaceStoreShutdown);
}

if namespace != NamespaceName::default()
&& !self.inner.metadata.exists(&namespace)
&& !self.inner.allow_lazy_creation
Expand Down Expand Up @@ -446,6 +450,8 @@ impl NamespaceStore {
let mut set = JoinSet::new();
self.inner.has_shutdown.store(true, Ordering::Relaxed);

println!("set shutdown");

for (_name, entry) in self.inner.store.iter() {
let snapshow_at_shutdown = self.inner.snapshot_at_shutdown;
let mut lock = entry.write().await;
Expand Down
59 changes: 57 additions & 2 deletions libsql-server/tests/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
//! Tests for sqld in cluster mode
#![allow(deprecated)]

use std::sync::{Arc, Mutex};

use super::common;

use insta::assert_snapshot;
use libsql::{Database, Value};
use libsql_server::config::{AdminApiConfig, RpcClientConfig, RpcServerConfig, UserApiConfig};
use serde_json::json;
use tempfile::tempdir;
use tokio::{task::JoinSet, time::Duration};
use tokio::{sync::Notify, task::JoinSet, time::Duration};
use turmoil::{Builder, Sim};

use common::net::{init_tracing, TestServer, TurmoilAcceptor, TurmoilConnector};
Expand All @@ -18,11 +20,19 @@ use crate::common::{http::Client, net::SimServer, snapshot_metrics};
mod replica_restart;
mod replication;

pub fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool) {
pub fn make_cluster(
sim: &mut Sim,
num_replica: usize,
disable_namespaces: bool,
) -> Arc<Mutex<Option<Arc<Notify>>>> {
let shutdown_primary = Arc::new(Mutex::new(None));
let shutdown_primary_clone = shutdown_primary.clone();

init_tracing();
let tmp = tempdir().unwrap();
sim.host("primary", move || {
let path = tmp.path().to_path_buf();
let shutdown_primary = shutdown_primary.clone();
async move {
let server = TestServer {
path: path.into(),
Expand All @@ -43,6 +53,8 @@ pub fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool)
..Default::default()
};

*shutdown_primary.lock().unwrap() = Some(server.shutdown.clone());

server.start_sim(8080).await?;

Ok(())
Expand Down Expand Up @@ -80,6 +92,8 @@ pub fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool)
}
});
}

shutdown_primary_clone
}

#[test]
Expand Down Expand Up @@ -115,6 +129,47 @@ fn proxy_write() {
sim.run().unwrap();
}

#[test]
#[ignore]
fn proxy_write_retry() {
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(u64::MAX))
.build();
let shutdown_primary_handle = make_cluster(&mut sim, 1, true);

sim.client("client", async move {
let db =
Database::open_remote_with_connector("http://replica0:8080", "", TurmoilConnector)?;
let conn = db.connect()?;

std::env::set_var("LIBSQL_DELAY_SHUTDOWN", "1");

conn.execute("create table test (x)", ()).await.unwrap();

{
let shutdown_handle = shutdown_primary_handle.lock().unwrap();

shutdown_handle
.as_ref()
.expect("no shutdown handle")
.notify_waiters();

tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}

std::env::remove_var("LIBSQL_DELAY_SHUTDOWN");

db.connect().unwrap();
conn.execute("insert into test values (12)", ())
.await
.unwrap();

Ok(())
});

sim.run().unwrap();
}

#[test]
#[ignore = "libsql client doesn't reuse the stream yet, so we can't do RYW"]
fn replica_read_write() {
Expand Down
Loading