Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed May 31, 2024
1 parent 9d7cf5e commit 14727a4
Showing 1 changed file with 133 additions and 66 deletions.
199 changes: 133 additions & 66 deletions rpc/src/client/transport/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,107 @@ mod test {
}
}

async fn test_router_basic_pub_sub(mut ev: Event) {
let mut router = SubscriptionRouter::default();

let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str());
let (subs1_event_tx, mut subs1_event_rx) = unbounded();
let (subs2_event_tx, mut subs2_event_rx) = unbounded();
let (subs3_event_tx, mut subs3_event_rx) = unbounded();

let query1: Query = "tm.event = 'Tx'".parse().unwrap();
let query2: Query = "tm.event = 'NewBlock'".parse().unwrap();

// Two subscriptions with the same query
router.add(subs1_id, query1.clone(), subs1_event_tx);
router.add(subs2_id, query1.clone(), subs2_event_tx);
// Another subscription with a different query
router.add(subs3_id, query2.clone(), subs3_event_tx);

ev.query = query1.to_string();
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap();
must_not_recv(&mut subs3_event_rx, 50).await;
assert_eq!(ev, subs1_ev);
assert_eq!(ev, subs2_ev);

ev.query = query2.to_string();
router.publish_event(ev.clone());

must_not_recv(&mut subs1_event_rx, 50).await;
must_not_recv(&mut subs2_event_rx, 50).await;
let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap();
assert_eq!(ev, subs3_ev);
}

async fn test_router_pub_sub_diff_event_type_format(mut ev: Event) {
let mut router = SubscriptionRouter::default();

let subs1_id = uuid_str();
let (subs1_event_tx, mut subs1_event_rx) = unbounded();

let query1: Query = "tm.event = 'Tx'".parse().unwrap();
router.add(subs1_id, query1.clone(), subs1_event_tx);

// Query is equivalent but formatted slightly differently
ev.query = "tm.event='Tx'".to_string();
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
assert_eq!(ev, subs1_ev);
}

async fn test_router_pub_sub_two_eq_queries_diff_format(mut ev1: Event, mut ev2: Event) {
let mut router = SubscriptionRouter::default();

let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str());
let (subs1_event_tx, mut subs1_event_rx) = unbounded();
let (subs2_event_tx, mut subs2_event_rx) = unbounded();
let (subs3_event_tx, mut subs3_event_rx) = unbounded();

let query1: Query =
"tm.event = 'Tx' AND message.module = 'ibc_client' AND message.foo = 'bar'"
.parse()
.unwrap();
let query2: Query =
"message.module = 'ibc_client' AND message.foo = 'bar' AND tm.event = 'Tx'"
.parse()
.unwrap();

assert_eq!(query1, query2);

let query3: Query = "tm.event = 'NewBlock'".parse().unwrap();

router.add(subs1_id, query1.clone(), subs1_event_tx);
router.add(subs2_id, query2.clone(), subs2_event_tx);
router.add(subs3_id, query3.clone(), subs3_event_tx);

std::dbg!(&router);

// Queries are equivalent but formatted slightly differently
ev1.query =
"tm.event='Tx' AND message.module='ibc_client' AND message.foo='bar'".to_string();
router.publish_event(ev1.clone());

ev2.query =
"message.module='ibc_client' AND message.foo='bar' AND tm.event='Tx'".to_string();
router.publish_event(ev2.clone());

let subs1_ev1 = must_recv(&mut subs1_event_rx, 500).await.unwrap();
assert_eq!(ev1, subs1_ev1);
let subs2_ev1 = must_recv(&mut subs2_event_rx, 500).await.unwrap();
assert_eq!(ev1, subs2_ev1);

let subs1_ev2 = must_recv(&mut subs1_event_rx, 500).await.unwrap();
assert_eq!(ev2, subs1_ev2);
let subs2_ev2 = must_recv(&mut subs2_event_rx, 500).await.unwrap();
assert_eq!(ev2, subs2_ev2);

must_not_recv(&mut subs3_event_rx, 50).await;
}

mod v0_34 {
use super::*;

Expand All @@ -202,39 +303,22 @@ mod test {

#[tokio::test]
async fn router_basic_pub_sub() {
let mut router = SubscriptionRouter::default();

let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str());
let (subs1_event_tx, mut subs1_event_rx) = unbounded();
let (subs2_event_tx, mut subs2_event_rx) = unbounded();
let (subs3_event_tx, mut subs3_event_rx) = unbounded();

let query1: Query = "tm.event = 'Tx'".parse().unwrap();
let query2: Query = "tm.event = 'NewBlock'".parse().unwrap();

// Two subscriptions with the same query
router.add(subs1_id, query1.clone(), subs1_event_tx);
router.add(subs2_id, query1.clone(), subs2_event_tx);
// Another subscription with a different query
router.add(subs3_id, query2.clone(), subs3_event_tx);

let mut ev = read_event("subscribe_newblock_0").await;
ev.query = query1.to_string();
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap();
must_not_recv(&mut subs3_event_rx, 50).await;
assert_eq!(ev, subs1_ev);
assert_eq!(ev, subs2_ev);

ev.query = query2.to_string();
router.publish_event(ev.clone());

must_not_recv(&mut subs1_event_rx, 50).await;
must_not_recv(&mut subs2_event_rx, 50).await;
let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap();
assert_eq!(ev, subs3_ev);
test_router_basic_pub_sub(read_event("subscribe_newblock_0").await).await
}

#[tokio::test]
async fn router_pub_sub_diff_event_type_format() {
test_router_pub_sub_diff_event_type_format(read_event("subscribe_newblock_0").await)
.await
}

#[tokio::test]
async fn router_pub_sub_two_eq_queries_diff_format() {
test_router_pub_sub_two_eq_queries_diff_format(
read_event("subscribe_newblock_0").await,
read_event("subscribe_newblock_1").await,
)
.await
}
}

Expand All @@ -253,39 +337,22 @@ mod test {

#[tokio::test]
async fn router_basic_pub_sub() {
let mut router = SubscriptionRouter::default();

let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str());
let (subs1_event_tx, mut subs1_event_rx) = unbounded();
let (subs2_event_tx, mut subs2_event_rx) = unbounded();
let (subs3_event_tx, mut subs3_event_rx) = unbounded();

let query1: Query = "tm.event = 'Tx'".parse().unwrap();
let query2: Query = "tm.event = 'NewBlock'".parse().unwrap();

// Two subscriptions with the same query
router.add(subs1_id, query1.clone(), subs1_event_tx);
router.add(subs2_id, query1.clone(), subs2_event_tx);
// Another subscription with a different query
router.add(subs3_id, query2.clone(), subs3_event_tx);

let mut ev = read_event("subscribe_newblock_0").await;
ev.query = query1.to_string();
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap();
must_not_recv(&mut subs3_event_rx, 50).await;
assert_eq!(ev, subs1_ev);
assert_eq!(ev, subs2_ev);

ev.query = query2.to_string();
router.publish_event(ev.clone());

must_not_recv(&mut subs1_event_rx, 50).await;
must_not_recv(&mut subs2_event_rx, 50).await;
let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap();
assert_eq!(ev, subs3_ev);
test_router_basic_pub_sub(read_event("subscribe_newblock_0").await).await
}

#[tokio::test]
async fn router_pub_sub_diff_event_type_format() {
test_router_pub_sub_diff_event_type_format(read_event("subscribe_newblock_0").await)
.await
}

#[tokio::test]
async fn router_pub_sub_two_eq_queries_diff_format() {
test_router_pub_sub_two_eq_queries_diff_format(
read_event("subscribe_newblock_0").await,
read_event("subscribe_newblock_1").await,
)
.await
}
}
}

0 comments on commit 14727a4

Please sign in to comment.