Skip to content

Commit 3b89022

Browse files
authored
Merge pull request #15 from karyontech/codec_api
Update karyon net codec API
2 parents 47f3770 + 9f1da0f commit 3b89022

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1031
-665
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/async_runtime/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl Executor {
2828

2929
#[cfg(feature = "tokio")]
3030
pub fn handle(&self) -> &tokio::runtime::Handle {
31-
return self.inner.handle();
31+
self.inner.handle()
3232
}
3333
}
3434

jsonrpc/README.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ A fast and lightweight async implementation of [JSON-RPC
55

66
features:
77
- Supports TCP, TLS, WebSocket, and Unix protocols.
8-
- Uses `smol`(async-std) as the async runtime, but also supports `tokio` via the
8+
- Uses `smol`(async-std) as the async runtime, with support for `tokio` via the
99
`tokio` feature.
10-
- Allows registration of multiple services (structs) of different types on a
11-
single server.
12-
- Supports pub/sub
13-
- Allows passing an `async_executors::Executor` or tokio's `Runtime` when building
10+
- Enables the registration of multiple services (structs) on a single server.
11+
- Offers support for custom JSON codec.
12+
- Includes support for pub/sub.
13+
- Allows the use of an `async_executors::Executor` or `tokio::Runtime` when building
1414
the server.
1515

1616

@@ -31,7 +31,8 @@ use serde_json::Value;
3131
use smol::stream::StreamExt;
3232

3333
use karyon_jsonrpc::{
34-
RPCError, Server, Client, rpc_impl, rpc_pubsub_impl, SubscriptionID, Channel
34+
error::RPCError, server::{Server, ServerBuilder, Channel}, client::ClientBuilder,
35+
rpc_impl, rpc_pubsub_impl, message::SubscriptionID,
3536
};
3637

3738
struct HelloWorld {}
@@ -84,7 +85,7 @@ async {
8485
let service = Arc::new(HelloWorld {});
8586
// Creates a new server
8687

87-
let server = Server::builder("tcp://127.0.0.1:60000")
88+
let server = ServerBuilder::new("tcp://127.0.0.1:60000")
8889
.expect("create new server builder")
8990
.service(service.clone())
9091
.pubsub_service(service)
@@ -101,7 +102,7 @@ async {
101102
// Client
102103
async {
103104
// Creates a new client
104-
let client = Client::builder("tcp://127.0.0.1:60000")
105+
let client = ClientBuilder::new("tcp://127.0.0.1:60000")
105106
.expect("create new client builder")
106107
.build()
107108
.await

jsonrpc/examples/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use log::info;
44
use serde::{Deserialize, Serialize};
55
use smol::Timer;
66

7-
use karyon_jsonrpc::Client;
7+
use karyon_jsonrpc::client::ClientBuilder;
88

99
#[derive(Deserialize, Serialize)]
1010
struct Req {
@@ -18,7 +18,7 @@ struct Pong {}
1818
fn main() {
1919
env_logger::init();
2020
smol::future::block_on(async {
21-
let client = Client::builder("tcp://127.0.0.1:6000")
21+
let client = ClientBuilder::new("tcp://127.0.0.1:6000")
2222
.expect("Create client builder")
2323
.build()
2424
.await
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use std::time::Duration;
2+
3+
use log::info;
4+
use serde::{Deserialize, Serialize};
5+
use smol::Timer;
6+
7+
use karyon_jsonrpc::{
8+
client::ClientBuilder,
9+
codec::{Codec, Decoder, Encoder},
10+
error::Error,
11+
};
12+
13+
#[derive(Deserialize, Serialize)]
14+
struct Req {
15+
x: u32,
16+
y: u32,
17+
}
18+
19+
#[derive(Deserialize, Serialize, Debug)]
20+
struct Pong {}
21+
22+
#[derive(Clone)]
23+
pub struct CustomJsonCodec {}
24+
25+
impl Codec for CustomJsonCodec {
26+
type Message = serde_json::Value;
27+
type Error = Error;
28+
}
29+
30+
impl Encoder for CustomJsonCodec {
31+
type EnMessage = serde_json::Value;
32+
type EnError = Error;
33+
fn encode(
34+
&self,
35+
src: &Self::EnMessage,
36+
dst: &mut [u8],
37+
) -> std::result::Result<usize, Self::EnError> {
38+
let msg = match serde_json::to_string(src) {
39+
Ok(m) => m,
40+
Err(err) => return Err(Error::Encode(err.to_string())),
41+
};
42+
let buf = msg.as_bytes();
43+
dst[..buf.len()].copy_from_slice(buf);
44+
Ok(buf.len())
45+
}
46+
}
47+
48+
impl Decoder for CustomJsonCodec {
49+
type DeMessage = serde_json::Value;
50+
type DeError = Error;
51+
fn decode(
52+
&self,
53+
src: &mut [u8],
54+
) -> std::result::Result<Option<(usize, Self::DeMessage)>, Self::DeError> {
55+
let de = serde_json::Deserializer::from_slice(src);
56+
let mut iter = de.into_iter::<serde_json::Value>();
57+
58+
let item = match iter.next() {
59+
Some(Ok(item)) => item,
60+
Some(Err(ref e)) if e.is_eof() => return Ok(None),
61+
Some(Err(e)) => return Err(Error::Decode(e.to_string())),
62+
None => return Ok(None),
63+
};
64+
65+
Ok(Some((iter.byte_offset(), item)))
66+
}
67+
}
68+
69+
fn main() {
70+
env_logger::init();
71+
smol::future::block_on(async {
72+
let client = ClientBuilder::new_with_codec("tcp://127.0.0.1:6000", CustomJsonCodec {})
73+
.expect("Create client builder")
74+
.build()
75+
.await
76+
.expect("Create rpc client");
77+
78+
loop {
79+
Timer::after(Duration::from_millis(100)).await;
80+
let result: Pong = client
81+
.call("Calc.ping", ())
82+
.await
83+
.expect("Call Calc.ping method");
84+
info!("Ping result: {:?}", result);
85+
}
86+
});
87+
}

jsonrpc/examples/pubsub_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ use log::info;
44
use serde::{Deserialize, Serialize};
55
use smol::Timer;
66

7-
use karyon_jsonrpc::Client;
7+
use karyon_jsonrpc::client::ClientBuilder;
88

99
#[derive(Deserialize, Serialize, Debug)]
1010
struct Pong {}
1111

1212
async fn run_client() {
13-
let client = Client::builder("tcp://127.0.0.1:6000")
13+
let client = ClientBuilder::new("tcp://127.0.0.1:6000")
1414
.expect("Create client builder")
1515
.build()
1616
.await

jsonrpc/examples/pubsub_server.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use serde_json::Value;
66

77
use karyon_core::async_util::sleep;
88
use karyon_jsonrpc::{
9-
message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCError, Server,
9+
error::RPCError,
10+
message::SubscriptionID,
11+
rpc_impl, rpc_pubsub_impl,
12+
server::{channel::Channel, ServerBuilder},
1013
};
1114

1215
struct Calc {}
@@ -69,7 +72,7 @@ fn main() {
6972
let calc = Arc::new(Calc {});
7073

7174
// Creates a new server
72-
let server = Server::builder("tcp://127.0.0.1:6000")
75+
let server = ServerBuilder::new("tcp://127.0.0.1:6000")
7376
.expect("Create a new server builder")
7477
.service(calc.clone())
7578
.pubsub_service(calc)

jsonrpc/examples/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
44
use serde_json::Value;
55

66
use karyon_core::async_util::sleep;
7-
use karyon_jsonrpc::{rpc_impl, RPCError, Server};
7+
use karyon_jsonrpc::{error::RPCError, rpc_impl, server::ServerBuilder};
88

99
struct Calc {
1010
version: String,
@@ -49,7 +49,7 @@ fn main() {
4949
};
5050

5151
// Creates a new server
52-
let server = Server::builder("tcp://127.0.0.1:6000")
52+
let server = ServerBuilder::new("tcp://127.0.0.1:6000")
5353
.expect("Create a new server builder")
5454
.service(Arc::new(calc))
5555
.build()
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use serde::{Deserialize, Serialize};
4+
use serde_json::Value;
5+
6+
use karyon_core::async_util::sleep;
7+
8+
use karyon_jsonrpc::{
9+
codec::{Codec, Decoder, Encoder},
10+
error::{Error, RPCError},
11+
rpc_impl,
12+
server::ServerBuilder,
13+
};
14+
15+
struct Calc {}
16+
17+
#[derive(Deserialize, Serialize)]
18+
struct Req {
19+
x: u32,
20+
y: u32,
21+
}
22+
23+
#[derive(Deserialize, Serialize)]
24+
struct Pong {}
25+
26+
#[rpc_impl]
27+
impl Calc {
28+
async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
29+
Ok(serde_json::json!(Pong {}))
30+
}
31+
}
32+
33+
#[derive(Clone)]
34+
pub struct CustomJsonCodec {}
35+
36+
impl Codec for CustomJsonCodec {
37+
type Message = serde_json::Value;
38+
type Error = Error;
39+
}
40+
41+
impl Encoder for CustomJsonCodec {
42+
type EnMessage = serde_json::Value;
43+
type EnError = Error;
44+
fn encode(
45+
&self,
46+
src: &Self::EnMessage,
47+
dst: &mut [u8],
48+
) -> std::result::Result<usize, Self::EnError> {
49+
let msg = match serde_json::to_string(src) {
50+
Ok(m) => m,
51+
Err(err) => return Err(Error::Encode(err.to_string())),
52+
};
53+
let buf = msg.as_bytes();
54+
dst[..buf.len()].copy_from_slice(buf);
55+
Ok(buf.len())
56+
}
57+
}
58+
59+
impl Decoder for CustomJsonCodec {
60+
type DeMessage = serde_json::Value;
61+
type DeError = Error;
62+
fn decode(
63+
&self,
64+
src: &mut [u8],
65+
) -> std::result::Result<Option<(usize, Self::DeMessage)>, Self::DeError> {
66+
let de = serde_json::Deserializer::from_slice(src);
67+
let mut iter = de.into_iter::<serde_json::Value>();
68+
69+
let item = match iter.next() {
70+
Some(Ok(item)) => item,
71+
Some(Err(ref e)) if e.is_eof() => return Ok(None),
72+
Some(Err(e)) => return Err(Error::Decode(e.to_string())),
73+
None => return Ok(None),
74+
};
75+
76+
Ok(Some((iter.byte_offset(), item)))
77+
}
78+
}
79+
80+
fn main() {
81+
env_logger::init();
82+
smol::block_on(async {
83+
// Register the Calc service
84+
let calc = Calc {};
85+
86+
// Creates a new server
87+
let server = ServerBuilder::new_with_codec("tcp://127.0.0.1:6000", CustomJsonCodec {})
88+
.expect("Create a new server builder")
89+
.service(Arc::new(calc))
90+
.build()
91+
.await
92+
.expect("start a new server");
93+
94+
// Start the server
95+
server.start();
96+
97+
sleep(Duration::MAX).await;
98+
});
99+
}

jsonrpc/examples/tokio_server/src/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use serde::{Deserialize, Serialize};
44
use serde_json::Value;
55

66
use karyon_jsonrpc::{
7-
message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCError, Server,
7+
error::RPCError,
8+
message::SubscriptionID,
9+
rpc_impl, rpc_pubsub_impl,
10+
server::{Channel, ServerBuilder},
811
};
912

1013
struct Calc {
@@ -84,7 +87,7 @@ async fn main() {
8487
});
8588

8689
// Creates a new server
87-
let server = Server::builder("ws://127.0.0.1:6000")
90+
let server = ServerBuilder::new("ws://127.0.0.1:6000")
8891
.expect("Create a new server builder")
8992
.service(calc.clone())
9093
.pubsub_service(calc)

0 commit comments

Comments
 (0)