From 6b1ec771ed87214077e94a4ed0b22c6c55dcc80a Mon Sep 17 00:00:00 2001 From: ChronosXYZ Date: Mon, 23 Sep 2024 15:59:13 +0300 Subject: [PATCH] Make websocket subscriptions work Port fix from https://github.com/informalsystems/tendermint-rs/pull/1433 --- rpc/Cargo.toml | 65 +++---- rpc/src/client/transport/mock.rs | 4 +- rpc/src/client/transport/router.rs | 236 +++++++++++++++++--------- rpc/src/client/transport/websocket.rs | 49 ++++-- rpc/src/query.rs | 23 +-- 5 files changed, 242 insertions(+), 135 deletions(-) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 5375e9de..debc5482 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -1,14 +1,14 @@ [package] -name = "cometbft-rpc" -version = "0.1.0-alpha.2" -edition = "2021" -license = "Apache-2.0" -homepage = "https://cometbft.com/" +name = "cometbft-rpc" +version = "0.1.0-alpha.2" +edition = "2021" +license = "Apache-2.0" +homepage = "https://cometbft.com/" repository = "https://github.com/cometbft/cometbft-rs" -readme = "README.md" -keywords = ["blockchain", "cosmos", "cometbft", "tendermint"] +readme = "README.md" +keywords = ["blockchain", "cosmos", "cometbft", "tendermint"] categories = ["cryptography::cryptocurrencies", "network-programming"] -authors = [ +authors = [ "Informal Systems ", "Ismail Khoffi ", "Alexander Simmerl ", @@ -26,23 +26,13 @@ all-features = true [[bin]] name = "cometbft-rpc" path = "src/client/bin/main.rs" -required-features = [ "cli" ] +required-features = ["cli"] [features] default = ["flex-error/std", "flex-error/eyre_tracer"] -cli = [ - "http-client", - "structopt", - "tracing-subscriber", - "websocket-client" -] -http-client = [ - "futures", - "reqwest", - "tokio/macros", - "tracing" -] -secp256k1 = [ "cometbft/secp256k1" ] +cli = ["http-client", "structopt", "tracing-subscriber", "websocket-client"] +http-client = ["futures", "reqwest", "tokio/macros", "tracing"] +secp256k1 = ["cometbft/secp256k1"] websocket-client = [ "async-tungstenite", "futures", @@ -51,7 +41,7 @@ websocket-client = [ "tokio/macros", "tokio/sync", "tokio/time", - "tracing" + "tracing", ] [dependencies] @@ -64,28 +54,43 @@ bytes = { version = "1.0", default-features = false } getrandom = { version = "0.2", default-features = false, features = ["js"] } peg = { version = "0.8", default-features = false } pin-project = { version = "1.0.1", default-features = false } -serde = { version = "1", default-features = false, features = [ "derive" ] } +serde = { version = "1", default-features = false, features = ["derive"] } serde_bytes = { version = "0.11", default-features = false } serde_json = { version = "1", default-features = false, features = ["std"] } thiserror = { version = "1", default-features = false } -time = { version = "0.3", default-features = false, features = ["macros", "parsing"] } +time = { version = "0.3", default-features = false, features = [ + "macros", + "parsing", +] } uuid = { version = "1.7", default-features = false } rand = { version = "0.8" } -subtle-encoding = { version = "0.5", default-features = false, features = ["bech32-preview"] } +subtle-encoding = { version = "0.5", default-features = false, features = [ + "bech32-preview", +] } url = { version = "2.4.1", default-features = false } walkdir = { version = "2.3", default-features = false } flex-error = { version = "0.4.4", default-features = false } subtle = { version = "2", default-features = false } semver = { version = "1.0", default-features = false } +ordered-float = { version = "4.0", default-features = false } # Optional dependencies -async-tungstenite = { version = "0.24", default-features = false, features = ["tokio-runtime", "tokio-rustls-native-certs"], optional = true } +async-tungstenite = { version = "0.24", default-features = false, features = [ + "tokio-runtime", + "tokio-rustls-native-certs", +], optional = true } futures = { version = "0.3", optional = true, default-features = false } -reqwest = { version = "0.11.20", optional = true, default-features = false, features = ["rustls-tls-native-roots"] } +reqwest = { version = "0.11.20", optional = true, default-features = false, features = [ + "rustls-tls-native-roots", +] } structopt = { version = "0.3", optional = true, default-features = false } -tokio = { version = "1.0", optional = true, default-features = false, features = ["rt-multi-thread"] } +tokio = { version = "1.0", optional = true, default-features = false, features = [ + "rt-multi-thread", +] } tracing = { version = "0.1", optional = true, default-features = false } -tracing-subscriber = { version = "0.3", optional = true, default-features = false, features = ["fmt"] } +tracing-subscriber = { version = "0.3", optional = true, default-features = false, features = [ + "fmt", +] } [dev-dependencies] http = { version = "1", default-features = false, features = ["std"] } diff --git a/rpc/src/client/transport/mock.rs b/rpc/src/client/transport/mock.rs index f15f7c3b..cc29f0a7 100644 --- a/rpc/src/client/transport/mock.rs +++ b/rpc/src/client/transport/mock.rs @@ -164,7 +164,7 @@ impl MockClientDriver { self.subscribe(id, query, subscription_tx, result_tx); } DriverCommand::Unsubscribe { query, result_tx } => { - self.unsubscribe(query, result_tx); + self.unsubscribe(&query, result_tx); } DriverCommand::Publish(event) => self.publish(*event), DriverCommand::Terminate => return Ok(()), @@ -184,7 +184,7 @@ impl MockClientDriver { result_tx.send(Ok(())).unwrap(); } - fn unsubscribe(&mut self, query: Query, result_tx: ChannelTx>) { + fn unsubscribe(&mut self, query: &Query, result_tx: ChannelTx>) { self.router.remove_by_query(query); result_tx.send(Ok(())).unwrap(); } diff --git a/rpc/src/client/transport/router.rs b/rpc/src/client/transport/router.rs index 4a27a606..480d814b 100644 --- a/rpc/src/client/transport/router.rs +++ b/rpc/src/client/transport/router.rs @@ -1,12 +1,16 @@ //! Event routing for subscriptions. +use core::str::FromStr; + use alloc::collections::{BTreeMap as HashMap, BTreeSet as HashSet}; use tracing::debug; -use crate::{client::subscription::SubscriptionTx, error::Error, event::Event, prelude::*}; +use crate::{ + client::subscription::SubscriptionTx, error::Error, event::Event, prelude::*, query::Query, +}; -pub type SubscriptionQuery = String; +pub type SubscriptionQuery = Query; pub type SubscriptionId = String; #[cfg_attr(not(feature = "websocket"), allow(dead_code))] @@ -53,7 +57,17 @@ impl SubscriptionRouter { /// event is relevant, based on the associated query. #[cfg_attr(not(feature = "websocket"), allow(dead_code))] pub fn publish_event(&mut self, ev: Event) -> PublishResult { - self.publish(ev.query.clone(), Ok(ev)) + let query = match Query::from_str(&ev.query) { + Ok(query) => query, + Err(e) => { + return PublishResult::Error(format!( + "Failed to parse query from event: {:?}, reason: {e}", + ev.query + )); + }, + }; + + self.publish(query, Ok(ev)) } /// Publishes the given event/error to all of the subscriptions to which the @@ -91,23 +105,15 @@ impl SubscriptionRouter { /// Immediately add a new subscription to the router without waiting for /// confirmation. - pub fn add(&mut self, id: impl ToString, query: impl ToString, tx: SubscriptionTx) { - let query = query.to_string(); - let subs_for_query = match self.subscriptions.get_mut(&query) { - Some(s) => s, - None => { - self.subscriptions.insert(query.clone(), HashMap::new()); - self.subscriptions.get_mut(&query).unwrap() - }, - }; - + pub fn add(&mut self, id: impl ToString, query: SubscriptionQuery, tx: SubscriptionTx) { + let subs_for_query = self.subscriptions.entry(query).or_default(); subs_for_query.insert(id.to_string(), tx); } /// Removes all the subscriptions relating to the given query. - pub fn remove_by_query(&mut self, query: impl ToString) -> usize { + pub fn remove_by_query(&mut self, query: &SubscriptionQuery) -> usize { self.subscriptions - .remove(&query.to_string()) + .remove(query) .map(|subs_for_query| subs_for_query.len()) .unwrap_or(0) } @@ -116,9 +122,9 @@ impl SubscriptionRouter { #[cfg(feature = "websocket-client")] impl SubscriptionRouter { /// Returns the number of active subscriptions for the given query. - pub fn num_subscriptions_for_query(&self, query: impl ToString) -> usize { + pub fn num_subscriptions_for_query(&self, query: &Query) -> usize { self.subscriptions - .get(&query.to_string()) + .get(query) .map(|subs_for_query| subs_for_query.len()) .unwrap_or(0) } @@ -129,7 +135,8 @@ pub enum PublishResult { Success, NoSubscribers, // All subscriptions for the given query have disconnected. - AllDisconnected(String), + AllDisconnected(SubscriptionQuery), + Error(String), } #[cfg(test)] @@ -178,6 +185,107 @@ mod test { } } + async fn test_router_basic_pub_sub(mut ev: Event) { + let mut router = SubscriptionRouter::default(); + + let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); + let (subs1_event_tx, mut subs1_event_rx) = unbounded(); + let (subs2_event_tx, mut subs2_event_rx) = unbounded(); + let (subs3_event_tx, mut subs3_event_rx) = unbounded(); + + let query1: Query = "tm.event = 'Tx'".parse().unwrap(); + let query2: Query = "tm.event = 'NewBlock'".parse().unwrap(); + + // Two subscriptions with the same query + router.add(subs1_id, query1.clone(), subs1_event_tx); + router.add(subs2_id, query1.clone(), subs2_event_tx); + // Another subscription with a different query + router.add(subs3_id, query2.clone(), subs3_event_tx); + + ev.query = query1.to_string(); + router.publish_event(ev.clone()); + + let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); + let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap(); + must_not_recv(&mut subs3_event_rx, 50).await; + assert_eq!(ev, subs1_ev); + assert_eq!(ev, subs2_ev); + + ev.query = query2.to_string(); + router.publish_event(ev.clone()); + + must_not_recv(&mut subs1_event_rx, 50).await; + must_not_recv(&mut subs2_event_rx, 50).await; + let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap(); + assert_eq!(ev, subs3_ev); + } + + async fn test_router_pub_sub_diff_event_type_format(mut ev: Event) { + let mut router = SubscriptionRouter::default(); + + let subs1_id = uuid_str(); + let (subs1_event_tx, mut subs1_event_rx) = unbounded(); + + let query1: Query = "tm.event = 'Tx'".parse().unwrap(); + router.add(subs1_id, query1.clone(), subs1_event_tx); + + // Query is equivalent but formatted slightly differently + ev.query = "tm.event='Tx'".to_string(); + router.publish_event(ev.clone()); + + let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); + assert_eq!(ev, subs1_ev); + } + + async fn test_router_pub_sub_two_eq_queries_diff_format(mut ev1: Event, mut ev2: Event) { + let mut router = SubscriptionRouter::default(); + + let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); + let (subs1_event_tx, mut subs1_event_rx) = unbounded(); + let (subs2_event_tx, mut subs2_event_rx) = unbounded(); + let (subs3_event_tx, mut subs3_event_rx) = unbounded(); + + let query1: Query = + "tm.event = 'Tx' AND message.module = 'ibc_client' AND message.foo = 'bar'" + .parse() + .unwrap(); + let query2: Query = + "message.module = 'ibc_client' AND message.foo = 'bar' AND tm.event = 'Tx'" + .parse() + .unwrap(); + + assert_eq!(query1, query2); + + let query3: Query = "tm.event = 'NewBlock'".parse().unwrap(); + + router.add(subs1_id, query1.clone(), subs1_event_tx); + router.add(subs2_id, query2.clone(), subs2_event_tx); + router.add(subs3_id, query3.clone(), subs3_event_tx); + + std::dbg!(&router); + + // Queries are equivalent but formatted slightly differently + ev1.query = + "tm.event='Tx' AND message.module='ibc_client' AND message.foo='bar'".to_string(); + router.publish_event(ev1.clone()); + + ev2.query = + "message.module='ibc_client' AND message.foo='bar' AND tm.event='Tx'".to_string(); + router.publish_event(ev2.clone()); + + let subs1_ev1 = must_recv(&mut subs1_event_rx, 500).await.unwrap(); + assert_eq!(ev1, subs1_ev1); + let subs2_ev1 = must_recv(&mut subs2_event_rx, 500).await.unwrap(); + assert_eq!(ev1, subs2_ev1); + + let subs1_ev2 = must_recv(&mut subs1_event_rx, 500).await.unwrap(); + assert_eq!(ev2, subs1_ev2); + let subs2_ev2 = must_recv(&mut subs2_event_rx, 500).await.unwrap(); + assert_eq!(ev2, subs2_ev2); + + must_not_recv(&mut subs3_event_rx, 50).await; + } + mod v0_34 { use super::*; @@ -193,36 +301,22 @@ mod test { #[tokio::test] async fn router_basic_pub_sub() { - let mut router = SubscriptionRouter::default(); - - let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); - let (subs1_event_tx, mut subs1_event_rx) = unbounded(); - let (subs2_event_tx, mut subs2_event_rx) = unbounded(); - let (subs3_event_tx, mut subs3_event_rx) = unbounded(); - - // Two subscriptions with the same query - router.add(subs1_id, "query1", subs1_event_tx); - router.add(subs2_id, "query1", subs2_event_tx); - // Another subscription with a different query - router.add(subs3_id, "query2", subs3_event_tx); - - let mut ev = read_event("subscribe_newblock_0").await; - ev.query = "query1".into(); - router.publish_event(ev.clone()); - - let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); - let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap(); - must_not_recv(&mut subs3_event_rx, 50).await; - assert_eq!(ev, subs1_ev); - assert_eq!(ev, subs2_ev); - - ev.query = "query2".into(); - router.publish_event(ev.clone()); - - must_not_recv(&mut subs1_event_rx, 50).await; - must_not_recv(&mut subs2_event_rx, 50).await; - let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap(); - assert_eq!(ev, subs3_ev); + test_router_basic_pub_sub(read_event("subscribe_newblock_0").await).await + } + + #[tokio::test] + async fn router_pub_sub_diff_event_type_format() { + test_router_pub_sub_diff_event_type_format(read_event("subscribe_newblock_0").await) + .await + } + + #[tokio::test] + async fn router_pub_sub_two_eq_queries_diff_format() { + test_router_pub_sub_two_eq_queries_diff_format( + read_event("subscribe_newblock_0").await, + read_event("subscribe_newblock_1").await, + ) + .await } } @@ -241,36 +335,22 @@ mod test { #[tokio::test] async fn router_basic_pub_sub() { - let mut router = SubscriptionRouter::default(); - - let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); - let (subs1_event_tx, mut subs1_event_rx) = unbounded(); - let (subs2_event_tx, mut subs2_event_rx) = unbounded(); - let (subs3_event_tx, mut subs3_event_rx) = unbounded(); - - // Two subscriptions with the same query - router.add(subs1_id, "query1", subs1_event_tx); - router.add(subs2_id, "query1", subs2_event_tx); - // Another subscription with a different query - router.add(subs3_id, "query2", subs3_event_tx); - - let mut ev = read_event("subscribe_newblock_0").await; - ev.query = "query1".into(); - router.publish_event(ev.clone()); - - let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); - let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap(); - must_not_recv(&mut subs3_event_rx, 50).await; - assert_eq!(ev, subs1_ev); - assert_eq!(ev, subs2_ev); - - ev.query = "query2".into(); - router.publish_event(ev.clone()); - - must_not_recv(&mut subs1_event_rx, 50).await; - must_not_recv(&mut subs2_event_rx, 50).await; - let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap(); - assert_eq!(ev, subs3_ev); + test_router_basic_pub_sub(read_event("subscribe_newblock_0").await).await + } + + #[tokio::test] + async fn router_pub_sub_diff_event_type_format() { + test_router_pub_sub_diff_event_type_format(read_event("subscribe_newblock_0").await) + .await + } + + #[tokio::test] + async fn router_pub_sub_two_eq_queries_diff_format() { + test_router_pub_sub_two_eq_queries_diff_format( + read_event("subscribe_newblock_0").await, + read_event("subscribe_newblock_1").await, + ) + .await } } } diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index 153c9e34..479b0908 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -556,30 +556,36 @@ mod sealed { pub async fn subscribe(&self, query: Query) -> Result { let (subscription_tx, subscription_rx) = unbounded(); let (response_tx, mut response_rx) = unbounded(); + // By default we use UUIDs to differentiate subscriptions let id = uuid_str(); self.send_cmd(DriverCommand::Subscribe(SubscribeCommand { id: id.to_string(), - query: query.to_string(), + query: query.clone(), subscription_tx, response_tx, }))?; + // Make sure our subscription request went through successfully. response_rx.recv().await.ok_or_else(|| { Error::client_internal("failed to hear back from WebSocket driver".to_string()) })??; + Ok(Subscription::new(id, query, subscription_rx)) } pub async fn unsubscribe(&self, query: Query) -> Result<(), Error> { let (response_tx, mut response_rx) = unbounded(); + self.send_cmd(DriverCommand::Unsubscribe(UnsubscribeCommand { - query: query.to_string(), + query, response_tx, }))?; + response_rx.recv().await.ok_or_else(|| { Error::client_internal("failed to hear back from WebSocket driver".to_string()) })??; + Ok(()) } } @@ -697,7 +703,7 @@ struct SubscribeCommand { // The desired ID for the outgoing JSON-RPC request. id: String, // The query for which we want to receive events. - query: String, + query: Query, // Where to send subscription events. subscription_tx: SubscriptionTx, // Where to send the result of the subscription request. @@ -707,7 +713,7 @@ struct SubscribeCommand { #[derive(Debug, Clone)] struct UnsubscribeCommand { // The query from which to unsubscribe. - query: String, + query: Query, // Where to send the result of the unsubscribe request. response_tx: ChannelTx>, } @@ -834,7 +840,7 @@ impl WebSocketClientDriver { // If we already have an active subscription for the given query, // there's no need to initiate another one. Just add this subscription // to the router. - if self.router.num_subscriptions_for_query(cmd.query.clone()) > 0 { + if self.router.num_subscriptions_for_query(&cmd.query) > 0 { let (id, query, subscription_tx, response_tx) = (cmd.id, cmd.query, cmd.subscription_tx, cmd.response_tx); self.router.add(id, query, subscription_tx); @@ -844,14 +850,17 @@ impl WebSocketClientDriver { // Otherwise, we need to initiate a subscription request. let wrapper = Wrapper::new_with_id( Id::Str(cmd.id.clone()), - subscribe::Request::new(cmd.query.clone()), + subscribe::Request::new(cmd.query.to_string()), ); + if let Err(e) = self.send_request(wrapper).await { cmd.response_tx.send(Err(e.clone()))?; return Err(e); } + self.pending_commands .insert(cmd.id.clone(), DriverCommand::Subscribe(cmd)); + Ok(()) } @@ -859,7 +868,7 @@ impl WebSocketClientDriver { // Terminate all subscriptions for this query immediately. This // prioritizes acknowledgement of the caller's wishes over networking // problems. - if self.router.remove_by_query(cmd.query.clone()) == 0 { + if self.router.remove_by_query(&cmd.query) == 0 { // If there were no subscriptions for this query, respond // immediately. cmd.response_tx.send(Ok(()))?; @@ -868,14 +877,16 @@ impl WebSocketClientDriver { // Unsubscribe requests can (and probably should) have distinct // JSON-RPC IDs as compared to their subscription IDs. - let wrapper = Wrapper::new(unsubscribe::Request::new(cmd.query.clone())); + let wrapper = Wrapper::new(unsubscribe::Request::new(cmd.query.to_string())); let req_id = wrapper.id().clone(); if let Err(e) = self.send_request(wrapper).await { cmd.response_tx.send(Err(e.clone()))?; return Err(e); } + self.pending_commands .insert(req_id.to_string(), DriverCommand::Unsubscribe(cmd)); + Ok(()) } @@ -892,10 +903,20 @@ impl WebSocketClientDriver { CompatMode::V0_37 => event::v1::DeEvent::from_string(&msg).map(Into::into), CompatMode::V0_34 => event::v0_34::DeEvent::from_string(&msg).map(Into::into), }; - if let Ok(ev) = parse_res { - debug!("JSON-RPC event: {}", msg); - self.publish_event(ev).await; - return Ok(()); + // if let Ok(ev) = parse_res { + // debug!("JSON-RPC event: {}", msg); + // self.publish_event(ev).await; + // return Ok(()); + // } + match parse_res { + Ok(ev) => { + debug!("JSON-RPC event: {}", msg); + self.publish_event(ev).await; + return Ok(()); + }, + Err(e) => { + debug!("Incoming message is not a JSON-RPC event: {}", e); + }, } let wrapper: response::Wrapper = match serde_json::from_str(&msg) { @@ -941,7 +962,7 @@ impl WebSocketClientDriver { // unsubscribe from it. We issue a fire-and-forget unsubscribe // message. if let Err(e) = self - .send_request(Wrapper::new(unsubscribe::Request::new(query))) + .send_request(Wrapper::new(unsubscribe::Request::new(query.to_string()))) .await { error!("Failed to send unsubscribe request: {}", e); @@ -960,7 +981,7 @@ impl WebSocketClientDriver { // unsubscribe from it. We issue a fire-and-forget unsubscribe // message. if let Err(e) = self - .send_request(Wrapper::new(unsubscribe::Request::new(query))) + .send_request(Wrapper::new(unsubscribe::Request::new(query.to_string()))) .await { error!("Failed to send unsubscribe request: {}", e); diff --git a/rpc/src/query.rs b/rpc/src/query.rs index 7afed731..d23c0306 100644 --- a/rpc/src/query.rs +++ b/rpc/src/query.rs @@ -6,6 +6,7 @@ use core::{fmt, str::FromStr}; +use ordered_float::OrderedFloat; use time::{ format_description::well_known::Rfc3339, macros::{format_description, offset}, @@ -54,7 +55,7 @@ use crate::{prelude::*, serializers::timestamp, Error}; /// ``` /// /// [subscribe endpoint documentation]: https://docs.cometbft.com/v1/rpc/#/Websocket/subscribe -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct Query { // We can only have at most one event type at present in a query. pub event_type: Option, @@ -288,7 +289,7 @@ peg::parser! { } rule float_op() -> Operand - = f:float() { Operand::Float(f) } + = f:float() { Operand::from(f) } rule tag() -> &'input str = $(['a'..='z' | 'A'..='Z'] ['a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '.']*) @@ -393,7 +394,7 @@ where } /// The types of CometBFT events for which we can query at present. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum EventType { NewBlock, Tx, @@ -421,7 +422,7 @@ impl FromStr for EventType { } /// A condition which is part of a [`Query`]. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct Condition { /// The key this condition applies to. pub key: String, @@ -490,7 +491,7 @@ impl fmt::Display for Condition { /// /// Those operations apply to a given `key`, which is part of /// the enclosing [`Condition`]. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum Operation { /// Check if the value for the key is equal to this operand Eq(Operand), @@ -516,12 +517,12 @@ pub enum Operation { /// /// [`Condition`]: enum.Condition.html /// [tm-subscribe]: https://docs.cometbft.com/v1/rpc/#/Websocket/subscribe -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum Operand { String(String), Signed(i64), Unsigned(u64), - Float(f64), + Float(OrderedFloat), Date(Date), DateTime(OffsetDateTime), } @@ -625,13 +626,13 @@ impl From for Operand { impl From for Operand { fn from(source: f64) -> Self { - Operand::Float(source) + Operand::Float(OrderedFloat(source)) } } impl From for Operand { fn from(source: f32) -> Self { - Operand::Float(source as f64) + Operand::Float(OrderedFloat(source as f64)) } } @@ -884,7 +885,7 @@ mod test { assert_eq!(tag, "short.pi"); match op { Operand::Float(f) => { - assert!(floats_eq(*f, core::f64::consts::PI, 5)); + assert!(floats_eq(f.into_inner(), core::f64::consts::PI, 5)); }, _ => panic!("unexpected operand: {:?}", op), } @@ -903,7 +904,7 @@ mod test { assert_eq!(tag, "short.pi"); match op { Operand::Float(f) => { - assert!(floats_eq(*f, -core::f64::consts::PI, 5)); + assert!(floats_eq(f.into_inner(), -core::f64::consts::PI, 5)); }, _ => panic!("unexpected operand: {:?}", op), }