Skip to content

Commit

Permalink
feature: support redis tcp server
Browse files Browse the repository at this point in the history
  • Loading branch information
luffy2025 committed Dec 1, 2024
1 parent 1b68a7c commit 22f70bc
Show file tree
Hide file tree
Showing 14 changed files with 597 additions and 30 deletions.
441 changes: 440 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,12 @@ anyhow = "1.0.93"
bytes = "1.8.0"
dashmap = "6.1.0"
enum_dispatch = "0.3.13"
features = { version = "0.10.0", default-features = false }
futures = { version = "0.3.31", default-features = false }
lazy_static = "1.5.0"
thiserror = "2.0.3"
tokio = { version = "1.41.1", features = ["rt", "rt-multi-thread", "macros", "net"] }
tokio-stream = { version = "0.1.16", default-features = false }
tokio-util = { version = "0.7.12", features = ["codec"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
2 changes: 2 additions & 0 deletions src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use dashmap::DashMap;
use std::ops::Deref;
use std::sync::Arc;

#[derive(Debug, Clone)]
pub struct Backend(Arc<BackendInner>);

#[derive(Debug, Clone)]
pub struct BackendInner {
map: DashMap<String, RespFrame>,
hmap: DashMap<String, DashMap<String, RespFrame>>,
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/get.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::backend::Backend;
use crate::cmd::{extract_args, validate_command, CommandError, Executor, RESP_EMPTY};
use crate::cmd::{extract_args, validate_command, CommandError, CommandExecutor, RESP_EMPTY};
use crate::{RespArray, RespFrame};

#[allow(dead_code)]
#[derive(Debug)]
pub struct Get {
key: String,
}

impl Executor for Get {
impl CommandExecutor for Get {
fn execute(self, backend: &Backend) -> Result<RespFrame, CommandError> {
match backend.get(&self.key) {
Some(value) => Ok(value),
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/hget.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::backend::Backend;
use crate::cmd::{extract_args, validate_command, CommandError, Executor, RESP_EMPTY};
use crate::cmd::{extract_args, validate_command, CommandError, CommandExecutor, RESP_EMPTY};
use crate::{RespArray, RespFrame};

#[allow(dead_code)]
#[derive(Debug)]
pub struct HGet {
key: String,
field: String,
}

impl Executor for HGet {
impl CommandExecutor for HGet {
fn execute(self, backend: &Backend) -> Result<RespFrame, CommandError> {
match backend.hget(&self.key, &self.field) {
Some(value) => Ok(value),
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/hset.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::backend::Backend;
use crate::cmd::{extract_args, validate_command, CommandError, Executor, RESP_OK};
use crate::cmd::{extract_args, validate_command, CommandError, CommandExecutor, RESP_OK};
use crate::{RespArray, RespFrame};

#[allow(dead_code)]
#[derive(Debug)]
pub struct HSet {
key: String,
field: String,
value: RespFrame,
}

impl Executor for HSet {
impl CommandExecutor for HSet {
fn execute(self, backend: &Backend) -> Result<RespFrame, CommandError> {
backend.hset(self.key, self.field, self.value);
Ok(RESP_OK.clone())
Expand Down
18 changes: 16 additions & 2 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::cmd::hget::HGet;
use crate::cmd::hset::HSet;
use crate::cmd::set::Set;
use crate::{RespArray, RespFrame, RespNull};
use enum_dispatch::enum_dispatch;
use lazy_static::lazy_static;
use thiserror::Error;

Expand All @@ -18,12 +19,18 @@ lazy_static! {
static ref BACKEND: Backend = Backend::new();
}

#[derive(Debug)]
pub struct Unrecognized;

#[derive(Debug)]
#[enum_dispatch(CommandExecutor)]
pub enum Command {
Get(Get),
Set(Set),
HGet(HGet),
HSet(HSet),
// HSetAll(hset_all::HSetAll),
Unrecognized(Unrecognized),
}

#[derive(Error, Debug, PartialEq, Eq)]
Expand All @@ -38,10 +45,17 @@ pub enum CommandError {
FromUtf8Error(#[from] std::string::FromUtf8Error),
}

pub trait Executor {
#[enum_dispatch]
pub trait CommandExecutor {
fn execute(self, backend: &Backend) -> Result<RespFrame, CommandError>;
}

impl CommandExecutor for Unrecognized {
fn execute(self, _: &Backend) -> Result<RespFrame, CommandError> {
Ok(RESP_OK.clone())
}
}

impl TryFrom<RespFrame> for Command {
type Error = CommandError;

Expand All @@ -58,7 +72,7 @@ impl TryFrom<RespFrame> for Command {
b"hget" => Ok(Command::HGet(HGet::try_from(frame)?)),
b"hset" => Ok(Command::HSet(HSet::try_from(frame)?)),
// b"hset_all" => Ok(Command::HSetAll(hset_all::HSetAll::new(cmd))),
_ => Err(CommandError::InvalidCmd("Invalid command".to_string())),
_ => Ok(Command::Unrecognized(Unrecognized)),
},
_ => Err(CommandError::InvalidArgs("Invalid arguments".to_string())),
}
Expand Down
8 changes: 3 additions & 5 deletions src/cmd/set.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::backend::Backend;
use crate::cmd::{extract_args, validate_command, CommandError, Executor, RESP_OK};
use crate::cmd::{extract_args, validate_command, CommandError, CommandExecutor, RESP_OK};
use crate::{RespArray, RespFrame};

#[allow(dead_code)]
#[derive(Debug)]
pub struct Set {
key: String,
value: RespFrame,
}

impl Executor for Set {
impl CommandExecutor for Set {
fn execute(self, backend: &Backend) -> Result<RespFrame, CommandError> {
backend.set(self.key, self.value);
Ok(RESP_OK.clone())
Expand All @@ -27,8 +27,6 @@ impl TryFrom<RespArray> for Set {
match (args.next(), args.next()) {
(Some(RespFrame::BulkString(key)), Some(value)) => {
let key = String::from_utf8(key.to_vec())?;
println!("{:?}", key);
println!("{:?}", value);
Ok(Set { key, value })
}
_ => Err(CommandError::InvalidArgs("Invalid arguments".to_string())),
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod backend;
pub mod cmd;
pub mod network;
mod resp;

pub use backend::*;
pub use resp::*;
25 changes: 22 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
use anyhow::Result;
use r_redis::{network, Backend};
use tokio::net::TcpListener;
use tracing::{info, warn};

fn main() -> Result<()> {
println!("Hello, world!");
Ok(())
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();

let addr = "0.0.0.0:6379";
let listener = TcpListener::bind(addr).await?;
info!("Listening on: {}", addr);

let backend = Backend::new();
loop {
let (stream, raddr) = listener.accept().await?;
let cloned_backend = backend.clone();
tokio::spawn(async move {
match network::stream_handler(stream, cloned_backend).await {
Ok(_) => info!("Connection from {} closed", raddr),
Err(e) => warn!("Connection from {} error: {:?}", raddr, e),
}
});
}
}
82 changes: 82 additions & 0 deletions src/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use crate::cmd::{Command, CommandExecutor};
use crate::{Backend, RespDecode, RespEncode, RespError, RespFrame};
use anyhow::Result;
use futures::SinkExt;
use tokio::net::TcpStream;
use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, Encoder, Framed};
use tracing::info;

pub async fn stream_handler(stream: TcpStream, backend: Backend) -> Result<()> {
let mut framed = Framed::new(stream, RespFrameCodec);
loop {
match framed.next().await {
Some(Ok(req)) => {
info!(
"Received request: {:?}",
String::from_utf8_lossy(req.encode().as_slice())
);
let resp = request_handler(RedisRequest {
frame: req,
backend: backend.clone(),
})
.await?;
info!(
"Send response: {:?}",
String::from_utf8_lossy(resp.frame.encode().as_slice())
);
framed.send(resp.frame).await?;
}
Some(Err(e)) => return Err(e),
None => {
info!("Connection closed");
return Ok(());
}
}
}
}

async fn request_handler(req: RedisRequest) -> Result<RedisResponse> {
let cmd = Command::try_from(req.frame)?;
info!("Execute command: {:?}", cmd);
let resp = cmd.execute(&req.backend)?;
Ok(RedisResponse { frame: resp })
}

#[derive(Debug)]
struct RedisRequest {
frame: RespFrame,
backend: Backend,
}

#[derive(Debug)]
struct RedisResponse {
frame: RespFrame,
}

#[derive(Debug)]
struct RespFrameCodec;

impl Encoder<RespFrame> for RespFrameCodec {
type Error = anyhow::Error;

fn encode(&mut self, item: RespFrame, dst: &mut bytes::BytesMut) -> Result<()> {
let data = item.encode();
dst.extend_from_slice(&data);
Ok(())
}
}

impl Decoder for RespFrameCodec {
type Item = RespFrame;
type Error = anyhow::Error;

fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match RespFrame::decode(src) {
Ok(frame) => Ok(Some(frame)),
Err(RespError::NotComplete) => Ok(None),
Err(RespError::Empty) => Ok(None),
Err(e) => Err(e.into()),
}
}
}
2 changes: 1 addition & 1 deletion src/resp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ mod tests {
fn test_resp_array_decode_not_complete() -> Result<()> {
let mut buf = bytes::BytesMut::from(&b"*3\r\n$3\r\nget\r\n$5\r\nhello\r\n+China\r"[..]);
let ret = RespArray::decode(&mut buf);
assert_eq!(ret.unwrap_err(), RespError::NotCompete);
assert_eq!(ret.unwrap_err(), RespError::NotComplete);

buf.extend_from_slice(b"\n");
let ret = RespArray::decode(&mut buf)?;
Expand Down
20 changes: 12 additions & 8 deletions src/resp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ pub enum RespError {
#[error("Invalid frame length: {0}")]
InvalidFrameLength(isize),
#[error("Frame is not compete")]
NotCompete,
NotComplete,
#[error("Frame is empty")]
Empty,
#[error("ParseIntError: {0}")]
ParseIntError(#[from] std::num::ParseIntError),
#[error("ParseFloatError: {0}")]
Expand All @@ -80,7 +82,7 @@ impl RespDecode for RespFrame {
Some(e) => Err(RespError::InvalidFrame(
format!("Invalid prefix: {}", e.to_ascii_lowercase()).to_string(),
)),
_ => Err(RespError::NotCompete),
None => Err(RespError::Empty),
}
}
}
Expand All @@ -97,7 +99,7 @@ pub fn extract_end_and_length(

pub fn extract_simple_frame_data(buf: &mut BytesMut, prefix: &[u8]) -> Result<usize, RespError> {
if buf.len() < 3 {
return Err(RespError::NotCompete);
return Err(RespError::NotComplete);
}
if !buf.starts_with(prefix) {
return Err(RespError::InvalidFrameType("SimpleError".into()));
Expand All @@ -111,7 +113,7 @@ pub fn extract_simple_frame_data(buf: &mut BytesMut, prefix: &[u8]) -> Result<us
}
}
if end == 0 {
return Err(RespError::NotCompete);
return Err(RespError::NotComplete);
}
Ok(end)
}
Expand All @@ -128,20 +130,22 @@ pub fn is_combine_complete(buf: &[u8], len: usize) -> Result<(), RespError> {
i += 1;
}
// The content in the item may contain \r\n, so here it is >=
(count >= len).then_some(()).ok_or(RespError::NotCompete)
(count >= len).then_some(()).ok_or(RespError::NotComplete)
}

pub fn is_single_complete(buf: &[u8], len: usize) -> Result<(), RespError> {
is_fixed_complete(buf)?;
(buf.len() >= len + 4 + 2)
.then_some(())
.ok_or(RespError::NotCompete)
.ok_or(RespError::NotComplete)
}

pub fn is_fixed_complete(buf: &[u8]) -> Result<(), RespError> {
(buf.len() > 2).then_some(()).ok_or(RespError::NotCompete)?;
(buf.len() > 2)
.then_some(())
.ok_or(RespError::NotComplete)?;

buf.ends_with(b"\r\n")
.then_some(())
.ok_or(RespError::NotCompete)
.ok_or(RespError::NotComplete)
}
2 changes: 1 addition & 1 deletion src/resp/simple_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ mod tests {

buf.extend_from_slice(b"+Hello\r".as_ref());
let ret = SimpleString::decode(&mut buf);
assert_eq!(ret.unwrap_err(), RespError::NotCompete);
assert_eq!(ret.unwrap_err(), RespError::NotComplete);

buf.put_u8(b'\n');
let ret = SimpleString::decode(&mut buf)?;
Expand Down

0 comments on commit 22f70bc

Please sign in to comment.