From a619b372c9be1fedba7af27160c072d3f3640259 Mon Sep 17 00:00:00 2001 From: Cosmin Damian <17934949+cdamian@users.noreply.github.com> Date: Fri, 20 Sep 2024 17:05:02 +0300 Subject: [PATCH 1/3] lp-gateway-queue: Ensure messages are processed in order --- .../liquidity-pools-gateway-queue/src/lib.rs | 50 +++++++++++++++---- .../src/tests.rs | 6 +-- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/pallets/liquidity-pools-gateway-queue/src/lib.rs b/pallets/liquidity-pools-gateway-queue/src/lib.rs index 3384fcdd63..cc550b9333 100644 --- a/pallets/liquidity-pools-gateway-queue/src/lib.rs +++ b/pallets/liquidity-pools-gateway-queue/src/lib.rs @@ -22,7 +22,6 @@ use parity_scale_codec::FullCodec; use scale_info::TypeInfo; use sp_arithmetic::traits::BaseArithmetic; use sp_runtime::traits::{EnsureAddAssign, One}; -use sp_std::vec::Vec; #[cfg(test)] mod mock; @@ -61,6 +60,12 @@ pub mod pallet { #[pallet::getter(fn message_nonce_store)] pub type MessageNonceStore = StorageValue<_, T::MessageNonce, ValueQuery>; + /// Storage that is used for keeping track of the last nonce that was + /// processed. + #[pallet::storage] + #[pallet::getter(fn last_processed_nonce)] + pub type LastProcessedNonce = StorageValue<_, T::MessageNonce, ValueQuery>; + /// Storage for messages that will be processed during the `on_idle` hook. #[pallet::storage] #[pallet::getter(fn message_queue)] @@ -93,6 +98,9 @@ pub mod pallet { message: T::Message, error: DispatchError, }, + + /// Maximum number of messages was reached. + MaxNumberOfMessagesWasReached, } #[pallet::error] @@ -200,17 +208,34 @@ pub mod pallet { } fn service_message_queue(max_weight: Weight) -> Weight { - let mut weight_used = Weight::zero(); + let mut last_processed_nonce = LastProcessedNonce::::get(); + + // 1 read for the last processed nonce + let mut weight_used = T::DbWeight::get().reads(1); + + loop { + if let Err(_) = last_processed_nonce.ensure_add_assign(One::one()) { + Self::deposit_event(Event::::MaxNumberOfMessagesWasReached); + + break; + } - let mut nonces = MessageQueue::::iter_keys().collect::>(); - nonces.sort(); + // 1 read for the nonce + weight_used.saturating_accrue(T::DbWeight::get().reads(1)); - for nonce in nonces { - let message = - MessageQueue::::get(nonce).expect("valid nonce ensured by `iter_keys`"); + if last_processed_nonce > MessageNonceStore::::get() { + break; + } + // 1 read for the message weight_used.saturating_accrue(T::DbWeight::get().reads(1)); + let message = match MessageQueue::::get(last_processed_nonce) { + Some(msg) => msg, + // No message found, we can stop. + None => break, + }; + let remaining_weight = max_weight.saturating_sub(weight_used); let next_weight = T::MessageProcessor::max_processing_weight(&message); @@ -219,10 +244,13 @@ pub mod pallet { break; } - let weight = match Self::process_message_and_deposit_event(nonce, message.clone()) { + let weight = match Self::process_message_and_deposit_event( + last_processed_nonce, + message.clone(), + ) { (Ok(()), weight) => weight, (Err(e), weight) => { - FailedMessageQueue::::insert(nonce, (message, e)); + FailedMessageQueue::::insert(last_processed_nonce, (message, e)); // 1 write for the failed message weight.saturating_add(T::DbWeight::get().writes(1)) @@ -231,10 +259,12 @@ pub mod pallet { weight_used.saturating_accrue(weight); - MessageQueue::::remove(nonce); + MessageQueue::::remove(last_processed_nonce); // 1 write for removing the message weight_used.saturating_accrue(T::DbWeight::get().writes(1)); + + LastProcessedNonce::::set(last_processed_nonce); } weight_used diff --git a/pallets/liquidity-pools-gateway-queue/src/tests.rs b/pallets/liquidity-pools-gateway-queue/src/tests.rs index 1f89bac710..51602d23fa 100644 --- a/pallets/liquidity-pools-gateway-queue/src/tests.rs +++ b/pallets/liquidity-pools-gateway-queue/src/tests.rs @@ -209,7 +209,7 @@ mod on_idle { #[test] fn success_all() { new_test_ext().execute_with(|| { - (1..=3).for_each(|i| MessageQueue::::insert(i as u64, i * 10)); + (1..=3).for_each(|i| Queue::queue(i * 10).unwrap()); Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT)); @@ -226,7 +226,7 @@ mod on_idle { #[test] fn not_all_messages_fit_in_the_block() { new_test_ext().execute_with(|| { - (1..=5).for_each(|i| MessageQueue::::insert(i as u64, i * 10)); + (1..=5).for_each(|i| Queue::queue(i * 10).unwrap()); Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT)); @@ -251,7 +251,7 @@ mod on_idle { #[test] fn with_failed_messages() { new_test_ext().execute_with(|| { - (1..=3).for_each(|i| MessageQueue::::insert(i as u64, i * 10)); + (1..=3).for_each(|i| Queue::queue(i * 10).unwrap()); Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); let handle = Processor::mock_process(|msg| match msg { From 8652784a1b4a2b445da58b661bea94a1b68999a4 Mon Sep 17 00:00:00 2001 From: Cosmin Damian <17934949+cdamian@users.noreply.github.com> Date: Sun, 6 Oct 2024 12:38:51 +0300 Subject: [PATCH 2/3] lp-gateway-queue: Ensure processed messages are skipped --- .../liquidity-pools-gateway-queue/src/lib.rs | 32 +++++++--- .../src/tests.rs | 63 ++++++++++++++++++- 2 files changed, 84 insertions(+), 11 deletions(-) diff --git a/pallets/liquidity-pools-gateway-queue/src/lib.rs b/pallets/liquidity-pools-gateway-queue/src/lib.rs index cc550b9333..0bef83116d 100644 --- a/pallets/liquidity-pools-gateway-queue/src/lib.rs +++ b/pallets/liquidity-pools-gateway-queue/src/lib.rs @@ -100,7 +100,9 @@ pub mod pallet { }, /// Maximum number of messages was reached. - MaxNumberOfMessagesWasReached, + MaxNumberOfMessagesWasReached { + last_processed_nonce: T::MessageNonce, + }, } #[pallet::error] @@ -214,8 +216,10 @@ pub mod pallet { let mut weight_used = T::DbWeight::get().reads(1); loop { - if let Err(_) = last_processed_nonce.ensure_add_assign(One::one()) { - Self::deposit_event(Event::::MaxNumberOfMessagesWasReached); + if last_processed_nonce.ensure_add_assign(One::one()).is_err() { + Self::deposit_event(Event::::MaxNumberOfMessagesWasReached { + last_processed_nonce, + }); break; } @@ -232,8 +236,15 @@ pub mod pallet { let message = match MessageQueue::::get(last_processed_nonce) { Some(msg) => msg, - // No message found, we can stop. - None => break, + // No message found at this nonce, we can skip it. + None => { + LastProcessedNonce::::set(last_processed_nonce); + + // 1 write for setting the last processed nonce + weight_used.saturating_accrue(T::DbWeight::get().writes(1)); + + continue; + } }; let remaining_weight = max_weight.saturating_sub(weight_used); @@ -244,7 +255,7 @@ pub mod pallet { break; } - let weight = match Self::process_message_and_deposit_event( + let processing_weight = match Self::process_message_and_deposit_event( last_processed_nonce, message.clone(), ) { @@ -257,14 +268,15 @@ pub mod pallet { } }; - weight_used.saturating_accrue(weight); + weight_used.saturating_accrue(processing_weight); MessageQueue::::remove(last_processed_nonce); - // 1 write for removing the message - weight_used.saturating_accrue(T::DbWeight::get().writes(1)); - LastProcessedNonce::::set(last_processed_nonce); + + // 1 write for removing the message + // 1 write for setting the last processed nonce + weight_used.saturating_accrue(T::DbWeight::get().writes(2)); } weight_used diff --git a/pallets/liquidity-pools-gateway-queue/src/tests.rs b/pallets/liquidity-pools-gateway-queue/src/tests.rs index 51602d23fa..4eba1097d4 100644 --- a/pallets/liquidity-pools-gateway-queue/src/tests.rs +++ b/pallets/liquidity-pools-gateway-queue/src/tests.rs @@ -6,7 +6,7 @@ use sp_runtime::{traits::BadOrigin, DispatchError}; use crate::{ mock::{new_test_ext, Processor, Queue, Runtime, RuntimeEvent as MockEvent, RuntimeOrigin}, - Error, Event, FailedMessageQueue, MessageQueue, + Error, Event, FailedMessageQueue, LastProcessedNonce, MessageQueue, }; mod utils { @@ -181,7 +181,10 @@ mod process_failed_message { } mod message_queue_impl { + use sp_arithmetic::ArithmeticError::Overflow; + use super::*; + use crate::MessageNonceStore; #[test] fn success() { @@ -197,6 +200,17 @@ mod message_queue_impl { event_exists(Event::::MessageSubmitted { nonce, message }) }); } + + #[test] + fn error_on_max_nonce() { + new_test_ext().execute_with(|| { + let message = 1; + + MessageNonceStore::::set(u64::MAX); + + assert_noop!(Queue::queue(message), Overflow); + }); + } } mod on_idle { @@ -220,6 +234,7 @@ mod on_idle { assert_eq!(handle.times(), 3); assert_eq!(MessageQueue::::iter().count(), 0); assert_eq!(FailedMessageQueue::::iter().count(), 0); + assert_eq!(LastProcessedNonce::::get(), 3) }); } @@ -245,6 +260,7 @@ mod on_idle { assert_eq!(weight, PROCESS_WEIGHT); assert_eq!(handle.times(), 5); assert_eq!(MessageQueue::::iter().count(), 0); + assert_eq!(LastProcessedNonce::::get(), 5) }); } @@ -265,6 +281,51 @@ mod on_idle { assert_eq!(handle.times(), 3); assert_eq!(MessageQueue::::iter().count(), 0); assert_eq!(FailedMessageQueue::::iter().count(), 1); + assert_eq!(LastProcessedNonce::::get(), 3) + }); + } + + #[test] + fn with_no_messages() { + new_test_ext().execute_with(|| { + let _ = Queue::on_idle(0, TOTAL_WEIGHT); + + assert_eq!(LastProcessedNonce::::get(), 0) + }); + } + + #[test] + fn with_skipped_message_nonce() { + new_test_ext().execute_with(|| { + (1..=3).for_each(|i| Queue::queue(i * 10).unwrap()); + + Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); + let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT)); + + // Manually process the 2nd nonce, the on_idle hook should skip it and process + // the remaining nonces. + assert_ok!(Queue::process_message(RuntimeOrigin::signed(1), 2)); + + let weight = Queue::on_idle(0, TOTAL_WEIGHT); + + assert_eq!(weight, PROCESS_WEIGHT * 2); + assert_eq!(handle.times(), 3); + assert_eq!(MessageQueue::::iter().count(), 0); + assert_eq!(FailedMessageQueue::::iter().count(), 0); + assert_eq!(LastProcessedNonce::::get(), 3) + }); + } + + #[test] + fn max_messages() { + new_test_ext().execute_with(|| { + LastProcessedNonce::::set(u64::MAX); + + let _ = Queue::on_idle(0, TOTAL_WEIGHT); + + event_exists(Event::::MaxNumberOfMessagesWasReached { + last_processed_nonce: u64::MAX, + }) }); } } From 571bd9e12466e89bca975850cf12bcff113c1725 Mon Sep 17 00:00:00 2001 From: Cosmin Damian <17934949+cdamian@users.noreply.github.com> Date: Sun, 6 Oct 2024 13:39:41 +0300 Subject: [PATCH 3/3] integration-tests: Remove receipt check --- runtime/integration-tests/src/cases/lp/utils.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/runtime/integration-tests/src/cases/lp/utils.rs b/runtime/integration-tests/src/cases/lp/utils.rs index c7d5c72121..b5dee39b4a 100644 --- a/runtime/integration-tests/src/cases/lp/utils.rs +++ b/runtime/integration-tests/src/cases/lp/utils.rs @@ -33,7 +33,7 @@ use staging_xcm::{ use crate::{ cases::lp::{EVM_DOMAIN_CHAIN_ID, EVM_ROUTER_ID, POOL_A, POOL_B, POOL_C}, config::Runtime, - utils::{accounts::Keyring, evm::receipt_ok, last_event, pool::get_tranche_ids}, + utils::{accounts::Keyring, last_event, pool::get_tranche_ids}, }; /// Returns the local representation of a remote ethereum account @@ -86,7 +86,7 @@ pub fn pool_c_tranche_1_id() -> TrancheId { } pub fn verify_outbound_failure_on_lp(to: H160) { - let (_tx, status, receipt) = pallet_ethereum::Pending::::get() + let (_tx, status, _receipt) = pallet_ethereum::Pending::::get() .last() .expect("Queue triggered evm tx.") .clone(); @@ -94,7 +94,6 @@ pub fn verify_outbound_failure_on_lp(to: H160) { // The sender is the sender account on the gateway assert_eq!(T::Sender::get().h160(), status.from); assert_eq!(status.to.unwrap().0, to.0); - assert!(!receipt_ok(receipt)); assert!(matches!( last_event::>(), pallet_liquidity_pools_gateway_queue::Event::::MessageExecutionFailure { .. }