diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index d545c5ed7..f2007dab3 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -77,6 +77,7 @@ walkdir = { version = "2.3", default-features = false } flex-error = { version = "0.4.4", default-features = false } subtle = { version = "2", default-features = false } semver = { version = "1.0", default-features = false } +ordered-float = { version = "4.0", default-features = false } # Optional dependencies async-tungstenite = { version = "0.24", default-features = false, features = ["tokio-runtime", "tokio-rustls-native-certs"], optional = true } diff --git a/rpc/src/client/transport/mock.rs b/rpc/src/client/transport/mock.rs index bc05e1892..55734d625 100644 --- a/rpc/src/client/transport/mock.rs +++ b/rpc/src/client/transport/mock.rs @@ -164,7 +164,7 @@ impl MockClientDriver { self.subscribe(id, query, subscription_tx, result_tx); } DriverCommand::Unsubscribe { query, result_tx } => { - self.unsubscribe(query, result_tx); + self.unsubscribe(&query, result_tx); } DriverCommand::Publish(event) => self.publish(*event), DriverCommand::Terminate => return Ok(()), @@ -184,7 +184,7 @@ impl MockClientDriver { result_tx.send(Ok(())).unwrap(); } - fn unsubscribe(&mut self, query: Query, result_tx: ChannelTx>) { + fn unsubscribe(&mut self, query: &Query, result_tx: ChannelTx>) { self.router.remove_by_query(query); result_tx.send(Ok(())).unwrap(); } diff --git a/rpc/src/client/transport/router.rs b/rpc/src/client/transport/router.rs index 4a27a606a..f7871833e 100644 --- a/rpc/src/client/transport/router.rs +++ b/rpc/src/client/transport/router.rs @@ -1,12 +1,18 @@ //! Event routing for subscriptions. +use core::str::FromStr; + use alloc::collections::{BTreeMap as HashMap, BTreeSet as HashSet}; use tracing::debug; -use crate::{client::subscription::SubscriptionTx, error::Error, event::Event, prelude::*}; +use crate::client::subscription::SubscriptionTx; +use crate::error::Error; +use crate::event::Event; +use crate::prelude::*; +use crate::query::Query; -pub type SubscriptionQuery = String; +pub type SubscriptionQuery = Query; pub type SubscriptionId = String; #[cfg_attr(not(feature = "websocket"), allow(dead_code))] @@ -53,7 +59,17 @@ impl SubscriptionRouter { /// event is relevant, based on the associated query. #[cfg_attr(not(feature = "websocket"), allow(dead_code))] pub fn publish_event(&mut self, ev: Event) -> PublishResult { - self.publish(ev.query.clone(), Ok(ev)) + let query = match Query::from_str(&ev.query) { + Ok(query) => query, + Err(e) => { + return PublishResult::Error(format!( + "Failed to parse query from event: {:?}, reason: {e}", + ev.query + )); + }, + }; + + self.publish(query, Ok(ev)) } /// Publishes the given event/error to all of the subscriptions to which the @@ -91,23 +107,15 @@ impl SubscriptionRouter { /// Immediately add a new subscription to the router without waiting for /// confirmation. - pub fn add(&mut self, id: impl ToString, query: impl ToString, tx: SubscriptionTx) { - let query = query.to_string(); - let subs_for_query = match self.subscriptions.get_mut(&query) { - Some(s) => s, - None => { - self.subscriptions.insert(query.clone(), HashMap::new()); - self.subscriptions.get_mut(&query).unwrap() - }, - }; - + pub fn add(&mut self, id: impl ToString, query: SubscriptionQuery, tx: SubscriptionTx) { + let subs_for_query = self.subscriptions.entry(query).or_default(); subs_for_query.insert(id.to_string(), tx); } /// Removes all the subscriptions relating to the given query. - pub fn remove_by_query(&mut self, query: impl ToString) -> usize { + pub fn remove_by_query(&mut self, query: &SubscriptionQuery) -> usize { self.subscriptions - .remove(&query.to_string()) + .remove(query) .map(|subs_for_query| subs_for_query.len()) .unwrap_or(0) } @@ -116,9 +124,9 @@ impl SubscriptionRouter { #[cfg(feature = "websocket-client")] impl SubscriptionRouter { /// Returns the number of active subscriptions for the given query. - pub fn num_subscriptions_for_query(&self, query: impl ToString) -> usize { + pub fn num_subscriptions_for_query(&self, query: &SubscriptionQuery) -> usize { self.subscriptions - .get(&query.to_string()) + .get(query) .map(|subs_for_query| subs_for_query.len()) .unwrap_or(0) } @@ -129,7 +137,8 @@ pub enum PublishResult { Success, NoSubscribers, // All subscriptions for the given query have disconnected. - AllDisconnected(String), + AllDisconnected(SubscriptionQuery), + Error(String), } #[cfg(test)] @@ -178,6 +187,107 @@ mod test { } } + async fn test_router_basic_pub_sub(mut ev: Event) { + let mut router = SubscriptionRouter::default(); + + let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); + let (subs1_event_tx, mut subs1_event_rx) = unbounded(); + let (subs2_event_tx, mut subs2_event_rx) = unbounded(); + let (subs3_event_tx, mut subs3_event_rx) = unbounded(); + + let query1: Query = "tm.event = 'Tx'".parse().unwrap(); + let query2: Query = "tm.event = 'NewBlock'".parse().unwrap(); + + // Two subscriptions with the same query + router.add(subs1_id, query1.clone(), subs1_event_tx); + router.add(subs2_id, query1.clone(), subs2_event_tx); + // Another subscription with a different query + router.add(subs3_id, query2.clone(), subs3_event_tx); + + ev.query = query1.to_string(); + router.publish_event(ev.clone()); + + let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); + let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap(); + must_not_recv(&mut subs3_event_rx, 50).await; + assert_eq!(ev, subs1_ev); + assert_eq!(ev, subs2_ev); + + ev.query = query2.to_string(); + router.publish_event(ev.clone()); + + must_not_recv(&mut subs1_event_rx, 50).await; + must_not_recv(&mut subs2_event_rx, 50).await; + let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap(); + assert_eq!(ev, subs3_ev); + } + + async fn test_router_pub_sub_diff_event_type_format(mut ev: Event) { + let mut router = SubscriptionRouter::default(); + + let subs1_id = uuid_str(); + let (subs1_event_tx, mut subs1_event_rx) = unbounded(); + + let query1: Query = "tm.event = 'Tx'".parse().unwrap(); + router.add(subs1_id, query1.clone(), subs1_event_tx); + + // Query is equivalent but formatted slightly differently + ev.query = "tm.event='Tx'".to_string(); + router.publish_event(ev.clone()); + + let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); + assert_eq!(ev, subs1_ev); + } + + async fn test_router_pub_sub_two_eq_queries_diff_format(mut ev1: Event, mut ev2: Event) { + let mut router = SubscriptionRouter::default(); + + let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); + let (subs1_event_tx, mut subs1_event_rx) = unbounded(); + let (subs2_event_tx, mut subs2_event_rx) = unbounded(); + let (subs3_event_tx, mut subs3_event_rx) = unbounded(); + + let query1: Query = + "tm.event = 'Tx' AND message.module = 'ibc_client' AND message.foo = 'bar'" + .parse() + .unwrap(); + let query2: Query = + "message.module = 'ibc_client' AND message.foo = 'bar' AND tm.event = 'Tx'" + .parse() + .unwrap(); + + assert_eq!(query1, query2); + + let query3: Query = "tm.event = 'NewBlock'".parse().unwrap(); + + router.add(subs1_id, query1.clone(), subs1_event_tx); + router.add(subs2_id, query2.clone(), subs2_event_tx); + router.add(subs3_id, query3.clone(), subs3_event_tx); + + std::dbg!(&router); + + // Queries are equivalent but formatted slightly differently + ev1.query = + "tm.event='Tx' AND message.module='ibc_client' AND message.foo='bar'".to_string(); + router.publish_event(ev1.clone()); + + ev2.query = + "message.module='ibc_client' AND message.foo='bar' AND tm.event='Tx'".to_string(); + router.publish_event(ev2.clone()); + + let subs1_ev1 = must_recv(&mut subs1_event_rx, 500).await.unwrap(); + assert_eq!(ev1, subs1_ev1); + let subs2_ev1 = must_recv(&mut subs2_event_rx, 500).await.unwrap(); + assert_eq!(ev1, subs2_ev1); + + let subs1_ev2 = must_recv(&mut subs1_event_rx, 500).await.unwrap(); + assert_eq!(ev2, subs1_ev2); + let subs2_ev2 = must_recv(&mut subs2_event_rx, 500).await.unwrap(); + assert_eq!(ev2, subs2_ev2); + + must_not_recv(&mut subs3_event_rx, 50).await; + } + mod v0_34 { use super::*; @@ -193,36 +303,22 @@ mod test { #[tokio::test] async fn router_basic_pub_sub() { - let mut router = SubscriptionRouter::default(); - - let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); - let (subs1_event_tx, mut subs1_event_rx) = unbounded(); - let (subs2_event_tx, mut subs2_event_rx) = unbounded(); - let (subs3_event_tx, mut subs3_event_rx) = unbounded(); - - // Two subscriptions with the same query - router.add(subs1_id, "query1", subs1_event_tx); - router.add(subs2_id, "query1", subs2_event_tx); - // Another subscription with a different query - router.add(subs3_id, "query2", subs3_event_tx); - - let mut ev = read_event("subscribe_newblock_0").await; - ev.query = "query1".into(); - router.publish_event(ev.clone()); - - let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); - let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap(); - must_not_recv(&mut subs3_event_rx, 50).await; - assert_eq!(ev, subs1_ev); - assert_eq!(ev, subs2_ev); - - ev.query = "query2".into(); - router.publish_event(ev.clone()); - - must_not_recv(&mut subs1_event_rx, 50).await; - must_not_recv(&mut subs2_event_rx, 50).await; - let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap(); - assert_eq!(ev, subs3_ev); + test_router_basic_pub_sub(read_event("subscribe_newblock_0").await).await + } + + #[tokio::test] + async fn router_pub_sub_diff_event_type_format() { + test_router_pub_sub_diff_event_type_format(read_event("subscribe_newblock_0").await) + .await + } + + #[tokio::test] + async fn router_pub_sub_two_eq_queries_diff_format() { + test_router_pub_sub_two_eq_queries_diff_format( + read_event("subscribe_newblock_0").await, + read_event("subscribe_newblock_1").await, + ) + .await } } @@ -241,36 +337,22 @@ mod test { #[tokio::test] async fn router_basic_pub_sub() { - let mut router = SubscriptionRouter::default(); - - let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); - let (subs1_event_tx, mut subs1_event_rx) = unbounded(); - let (subs2_event_tx, mut subs2_event_rx) = unbounded(); - let (subs3_event_tx, mut subs3_event_rx) = unbounded(); - - // Two subscriptions with the same query - router.add(subs1_id, "query1", subs1_event_tx); - router.add(subs2_id, "query1", subs2_event_tx); - // Another subscription with a different query - router.add(subs3_id, "query2", subs3_event_tx); - - let mut ev = read_event("subscribe_newblock_0").await; - ev.query = "query1".into(); - router.publish_event(ev.clone()); - - let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); - let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap(); - must_not_recv(&mut subs3_event_rx, 50).await; - assert_eq!(ev, subs1_ev); - assert_eq!(ev, subs2_ev); - - ev.query = "query2".into(); - router.publish_event(ev.clone()); - - must_not_recv(&mut subs1_event_rx, 50).await; - must_not_recv(&mut subs2_event_rx, 50).await; - let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap(); - assert_eq!(ev, subs3_ev); + test_router_basic_pub_sub(read_event("subscribe_newblock_0").await).await + } + + #[tokio::test] + async fn router_pub_sub_diff_event_type_format() { + test_router_pub_sub_diff_event_type_format(read_event("subscribe_newblock_0").await) + .await + } + + #[tokio::test] + async fn router_pub_sub_two_eq_queries_diff_format() { + test_router_pub_sub_two_eq_queries_diff_format( + read_event("subscribe_newblock_0").await, + read_event("subscribe_newblock_1").await, + ) + .await } } } diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index 36cc5fd77..015d3ae2e 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -552,30 +552,36 @@ mod sealed { pub async fn subscribe(&self, query: Query) -> Result { let (subscription_tx, subscription_rx) = unbounded(); let (response_tx, mut response_rx) = unbounded(); + // By default we use UUIDs to differentiate subscriptions let id = uuid_str(); self.send_cmd(DriverCommand::Subscribe(SubscribeCommand { id: id.to_string(), - query: query.to_string(), + query: query.clone(), subscription_tx, response_tx, }))?; + // Make sure our subscription request went through successfully. response_rx.recv().await.ok_or_else(|| { Error::client_internal("failed to hear back from WebSocket driver".to_string()) })??; + Ok(Subscription::new(id, query, subscription_rx)) } pub async fn unsubscribe(&self, query: Query) -> Result<(), Error> { let (response_tx, mut response_rx) = unbounded(); + self.send_cmd(DriverCommand::Unsubscribe(UnsubscribeCommand { - query: query.to_string(), + query, response_tx, }))?; + response_rx.recv().await.ok_or_else(|| { Error::client_internal("failed to hear back from WebSocket driver".to_string()) })??; + Ok(()) } } @@ -693,7 +699,7 @@ struct SubscribeCommand { // The desired ID for the outgoing JSON-RPC request. id: String, // The query for which we want to receive events. - query: String, + query: Query, // Where to send subscription events. subscription_tx: SubscriptionTx, // Where to send the result of the subscription request. @@ -703,7 +709,7 @@ struct SubscribeCommand { #[derive(Debug, Clone)] struct UnsubscribeCommand { // The query from which to unsubscribe. - query: String, + query: Query, // Where to send the result of the unsubscribe request. response_tx: ChannelTx>, } @@ -830,7 +836,7 @@ impl WebSocketClientDriver { // If we already have an active subscription for the given query, // there's no need to initiate another one. Just add this subscription // to the router. - if self.router.num_subscriptions_for_query(cmd.query.clone()) > 0 { + if self.router.num_subscriptions_for_query(&cmd.query) > 0 { let (id, query, subscription_tx, response_tx) = (cmd.id, cmd.query, cmd.subscription_tx, cmd.response_tx); self.router.add(id, query, subscription_tx); @@ -840,14 +846,17 @@ impl WebSocketClientDriver { // Otherwise, we need to initiate a subscription request. let wrapper = Wrapper::new_with_id( Id::Str(cmd.id.clone()), - subscribe::Request::new(cmd.query.clone()), + subscribe::Request::new(cmd.query.to_string()), ); + if let Err(e) = self.send_request(wrapper).await { cmd.response_tx.send(Err(e.clone()))?; return Err(e); } + self.pending_commands .insert(cmd.id.clone(), DriverCommand::Subscribe(cmd)); + Ok(()) } @@ -855,7 +864,7 @@ impl WebSocketClientDriver { // Terminate all subscriptions for this query immediately. This // prioritizes acknowledgement of the caller's wishes over networking // problems. - if self.router.remove_by_query(cmd.query.clone()) == 0 { + if self.router.remove_by_query(&cmd.query) == 0 { // If there were no subscriptions for this query, respond // immediately. cmd.response_tx.send(Ok(()))?; @@ -864,14 +873,16 @@ impl WebSocketClientDriver { // Unsubscribe requests can (and probably should) have distinct // JSON-RPC IDs as compared to their subscription IDs. - let wrapper = Wrapper::new(unsubscribe::Request::new(cmd.query.clone())); + let wrapper = Wrapper::new(unsubscribe::Request::new(cmd.query.to_string())); let req_id = wrapper.id().clone(); if let Err(e) = self.send_request(wrapper).await { cmd.response_tx.send(Err(e.clone()))?; return Err(e); } + self.pending_commands .insert(req_id.to_string(), DriverCommand::Unsubscribe(cmd)); + Ok(()) } @@ -937,7 +948,7 @@ impl WebSocketClientDriver { // unsubscribe from it. We issue a fire-and-forget unsubscribe // message. if let Err(e) = self - .send_request(Wrapper::new(unsubscribe::Request::new(query))) + .send_request(Wrapper::new(unsubscribe::Request::new(query.to_string()))) .await { error!("Failed to send unsubscribe request: {}", e); @@ -956,7 +967,7 @@ impl WebSocketClientDriver { // unsubscribe from it. We issue a fire-and-forget unsubscribe // message. if let Err(e) = self - .send_request(Wrapper::new(unsubscribe::Request::new(query))) + .send_request(Wrapper::new(unsubscribe::Request::new(query.to_string()))) .await { error!("Failed to send unsubscribe request: {}", e); diff --git a/rpc/src/event.rs b/rpc/src/event.rs index dfbfa5ba4..0a7ee453f 100644 --- a/rpc/src/event.rs +++ b/rpc/src/event.rs @@ -1,10 +1,12 @@ //! RPC subscription event-related data structures. -use alloc::collections::BTreeMap as HashMap; +use alloc::collections::BTreeMap; -use tendermint::{abci, block, Block}; +use tendermint::abci; +use tendermint::{block, Block}; -use crate::{prelude::*, query::EventType}; +use crate::prelude::*; +use crate::query::EventType; /// An incoming event produced by a [`Subscription`]. /// @@ -16,7 +18,7 @@ pub struct Event { /// The data associated with the event. pub data: EventData, /// Event type and attributes map. - pub events: Option>>, + pub events: Option>>, } impl Event { @@ -80,7 +82,7 @@ pub mod v0_34 { use crate::dialect::v0_34::Event as RpcEvent; use crate::prelude::*; use crate::{dialect, serializers, Response}; - use alloc::collections::BTreeMap as HashMap; + use alloc::collections::BTreeMap; use serde::{Deserialize, Serialize}; use tendermint::Block; @@ -91,7 +93,7 @@ pub mod v0_34 { /// The data associated with the event. pub data: DialectEventData, /// Event type and attributes map. - pub events: Option>>, + pub events: Option>>, } pub type DeEvent = DialectEvent; @@ -256,7 +258,7 @@ mod latest { use super::{Event, EventData, TxInfo, TxResult}; use crate::prelude::*; use crate::{serializers, Response}; - use alloc::collections::BTreeMap as HashMap; + use alloc::collections::BTreeMap; use serde::{Deserialize, Serialize}; use tendermint::abci::Event as RpcEvent; use tendermint::{abci, block, Block}; @@ -268,7 +270,7 @@ mod latest { /// The data associated with the event. pub data: DeEventData, /// Event type and attributes map. - pub events: Option>>, + pub events: Option>>, } impl Response for DeEvent {} @@ -408,7 +410,7 @@ mod latest { pub mod v0_37 { use super::{Event, EventData}; use crate::prelude::*; - use alloc::collections::BTreeMap as HashMap; + use alloc::collections::BTreeMap; use serde::Serialize; use tendermint::{abci, Block}; @@ -421,7 +423,7 @@ pub mod v0_37 { /// The data associated with the event. pub data: SerEventData, /// Event type and attributes map. - pub events: Option>>, + pub events: Option>>, } impl From for SerEvent { @@ -487,7 +489,7 @@ pub mod v0_37 { pub mod v0_38 { use super::{Event, EventData}; use crate::prelude::*; - use alloc::collections::BTreeMap as HashMap; + use alloc::collections::BTreeMap; use serde::Serialize; use tendermint::{abci, block, Block}; @@ -500,7 +502,7 @@ pub mod v0_38 { /// The data associated with the event. pub data: SerEventData, /// Event type and attributes map. - pub events: Option>>, + pub events: Option>>, } impl From for SerEvent { diff --git a/rpc/src/query.rs b/rpc/src/query.rs index 0e7d61e7d..e4baf4cbc 100644 --- a/rpc/src/query.rs +++ b/rpc/src/query.rs @@ -6,6 +6,7 @@ use core::{fmt, str::FromStr}; +use ordered_float::OrderedFloat; use time::{ format_description::well_known::Rfc3339, macros::{format_description, offset}, @@ -54,7 +55,7 @@ use crate::{prelude::*, serializers::timestamp, Error}; /// ``` /// /// [subscribe endpoint documentation]: https://docs.tendermint.com/v0.34/rpc/#/Websocket/subscribe -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct Query { // We can only have at most one event type at present in a query. pub event_type: Option, @@ -288,7 +289,7 @@ peg::parser! { } rule float_op() -> Operand - = f:float() { Operand::Float(f) } + = f:float() { Operand::from(f) } rule tag() -> &'input str = $(['a'..='z' | 'A'..='Z'] ['a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '.']*) @@ -393,7 +394,7 @@ where } /// The types of Tendermint events for which we can query at present. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum EventType { NewBlock, Tx, @@ -421,7 +422,7 @@ impl FromStr for EventType { } /// A condition which is part of a [`Query`]. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct Condition { /// The key this condition applies to. pub key: String, @@ -490,7 +491,7 @@ impl fmt::Display for Condition { /// /// Those operations apply to a given `key`, which is part of /// the enclosing [`Condition`]. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum Operation { /// Check if the value for the key is equal to this operand Eq(Operand), @@ -516,12 +517,12 @@ pub enum Operation { /// /// [`Condition`]: enum.Condition.html /// [tm-subscribe]: https://docs.tendermint.com/v0.34/rpc/#/Websocket/subscribe -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum Operand { String(String), Signed(i64), Unsigned(u64), - Float(f64), + Float(OrderedFloat), Date(Date), DateTime(OffsetDateTime), } @@ -625,13 +626,13 @@ impl From for Operand { impl From for Operand { fn from(source: f64) -> Self { - Operand::Float(source) + Operand::Float(OrderedFloat(source)) } } impl From for Operand { fn from(source: f32) -> Self { - Operand::Float(source as f64) + Operand::Float(OrderedFloat(source as f64)) } } @@ -884,7 +885,7 @@ mod test { assert_eq!(tag, "short.pi"); match op { Operand::Float(f) => { - assert!(floats_eq(*f, core::f64::consts::PI, 5)); + assert!(floats_eq(f.into_inner(), core::f64::consts::PI, 5)); }, _ => panic!("unexpected operand: {:?}", op), } @@ -903,7 +904,7 @@ mod test { assert_eq!(tag, "short.pi"); match op { Operand::Float(f) => { - assert!(floats_eq(*f, -core::f64::consts::PI, 5)); + assert!(floats_eq(f.into_inner(), -core::f64::consts::PI, 5)); }, _ => panic!("unexpected operand: {:?}", op), }