Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lp-gateway-queue: Ensure messages are processed in order #1992

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 54 additions & 12 deletions pallets/liquidity-pools-gateway-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use parity_scale_codec::FullCodec;
use scale_info::TypeInfo;
use sp_arithmetic::traits::BaseArithmetic;
use sp_runtime::traits::{EnsureAddAssign, One};
use sp_std::vec::Vec;

#[cfg(test)]
mod mock;
Expand Down Expand Up @@ -61,6 +60,12 @@ pub mod pallet {
#[pallet::getter(fn message_nonce_store)]
pub type MessageNonceStore<T: Config> = StorageValue<_, T::MessageNonce, ValueQuery>;

/// Storage that is used for keeping track of the last nonce that was
/// processed.
#[pallet::storage]
#[pallet::getter(fn last_processed_nonce)]
pub type LastProcessedNonce<T: Config> = StorageValue<_, T::MessageNonce, ValueQuery>;

/// Storage for messages that will be processed during the `on_idle` hook.
#[pallet::storage]
#[pallet::getter(fn message_queue)]
Expand Down Expand Up @@ -93,6 +98,11 @@ pub mod pallet {
message: T::Message,
error: DispatchError,
},

/// Maximum number of messages was reached.
MaxNumberOfMessagesWasReached {
last_processed_nonce: T::MessageNonce,
},
}

#[pallet::error]
Expand Down Expand Up @@ -200,17 +210,43 @@ pub mod pallet {
}

fn service_message_queue(max_weight: Weight) -> Weight {
let mut weight_used = Weight::zero();
let mut last_processed_nonce = LastProcessedNonce::<T>::get();

// 1 read for the last processed nonce
let mut weight_used = T::DbWeight::get().reads(1);

let mut nonces = MessageQueue::<T>::iter_keys().collect::<Vec<_>>();
nonces.sort();
loop {
if last_processed_nonce.ensure_add_assign(One::one()).is_err() {
Self::deposit_event(Event::<T>::MaxNumberOfMessagesWasReached {
last_processed_nonce,
});

for nonce in nonces {
let message =
MessageQueue::<T>::get(nonce).expect("valid nonce ensured by `iter_keys`");
break;
}

// 1 read for the nonce
weight_used.saturating_accrue(T::DbWeight::get().reads(1));

if last_processed_nonce > MessageNonceStore::<T>::get() {
break;
}

// 1 read for the message
weight_used.saturating_accrue(T::DbWeight::get().reads(1));

let message = match MessageQueue::<T>::get(last_processed_nonce) {
Some(msg) => msg,
// No message found at this nonce, we can skip it.
None => {
LastProcessedNonce::<T>::set(last_processed_nonce);

// 1 write for setting the last processed nonce
weight_used.saturating_accrue(T::DbWeight::get().writes(1));

continue;
Comment on lines +241 to +246
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I think this fixes the issue of having "holes" in the queue.

}
};

let remaining_weight = max_weight.saturating_sub(weight_used);
let next_weight = T::MessageProcessor::max_processing_weight(&message);

Expand All @@ -219,22 +255,28 @@ pub mod pallet {
break;
}

let weight = match Self::process_message_and_deposit_event(nonce, message.clone()) {
let processing_weight = match Self::process_message_and_deposit_event(
last_processed_nonce,
message.clone(),
) {
(Ok(()), weight) => weight,
(Err(e), weight) => {
FailedMessageQueue::<T>::insert(nonce, (message, e));
FailedMessageQueue::<T>::insert(last_processed_nonce, (message, e));

// 1 write for the failed message
weight.saturating_add(T::DbWeight::get().writes(1))
}
};

weight_used.saturating_accrue(weight);
weight_used.saturating_accrue(processing_weight);

MessageQueue::<T>::remove(last_processed_nonce);

MessageQueue::<T>::remove(nonce);
LastProcessedNonce::<T>::set(last_processed_nonce);

// 1 write for removing the message
weight_used.saturating_accrue(T::DbWeight::get().writes(1));
// 1 write for setting the last processed nonce
weight_used.saturating_accrue(T::DbWeight::get().writes(2));
}

weight_used
Expand Down
69 changes: 65 additions & 4 deletions pallets/liquidity-pools-gateway-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sp_runtime::{traits::BadOrigin, DispatchError};

use crate::{
mock::{new_test_ext, Processor, Queue, Runtime, RuntimeEvent as MockEvent, RuntimeOrigin},
Error, Event, FailedMessageQueue, MessageQueue,
Error, Event, FailedMessageQueue, LastProcessedNonce, MessageQueue,
};

mod utils {
Expand Down Expand Up @@ -181,7 +181,10 @@ mod process_failed_message {
}

mod message_queue_impl {
use sp_arithmetic::ArithmeticError::Overflow;

use super::*;
use crate::MessageNonceStore;

#[test]
fn success() {
Expand All @@ -197,6 +200,17 @@ mod message_queue_impl {
event_exists(Event::<Runtime>::MessageSubmitted { nonce, message })
});
}

#[test]
fn error_on_max_nonce() {
new_test_ext().execute_with(|| {
let message = 1;

MessageNonceStore::<Runtime>::set(u64::MAX);

assert_noop!(Queue::queue(message), Overflow);
});
}
}

mod on_idle {
Expand All @@ -209,7 +223,7 @@ mod on_idle {
#[test]
fn success_all() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));
Expand All @@ -220,13 +234,14 @@ mod on_idle {
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn not_all_messages_fit_in_the_block() {
new_test_ext().execute_with(|| {
(1..=5).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=5).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));
Expand All @@ -245,13 +260,14 @@ mod on_idle {
assert_eq!(weight, PROCESS_WEIGHT);
assert_eq!(handle.times(), 5);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 5)
});
}

#[test]
fn with_failed_messages() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|msg| match msg {
Expand All @@ -265,6 +281,51 @@ mod on_idle {
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 1);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn with_no_messages() {
new_test_ext().execute_with(|| {
let _ = Queue::on_idle(0, TOTAL_WEIGHT);

assert_eq!(LastProcessedNonce::<Runtime>::get(), 0)
});
}

#[test]
fn with_skipped_message_nonce() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));

// Manually process the 2nd nonce, the on_idle hook should skip it and process
// the remaining nonces.
assert_ok!(Queue::process_message(RuntimeOrigin::signed(1), 2));

let weight = Queue::on_idle(0, TOTAL_WEIGHT);

assert_eq!(weight, PROCESS_WEIGHT * 2);
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn max_messages() {
new_test_ext().execute_with(|| {
LastProcessedNonce::<Runtime>::set(u64::MAX);

let _ = Queue::on_idle(0, TOTAL_WEIGHT);

event_exists(Event::<Runtime>::MaxNumberOfMessagesWasReached {
last_processed_nonce: u64::MAX,
})
});
}
}
5 changes: 2 additions & 3 deletions runtime/integration-tests/src/cases/lp/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use staging_xcm::{
use crate::{
cases::lp::{EVM_DOMAIN_CHAIN_ID, EVM_ROUTER_ID, POOL_A, POOL_B, POOL_C},
config::Runtime,
utils::{accounts::Keyring, evm::receipt_ok, last_event, pool::get_tranche_ids},
utils::{accounts::Keyring, last_event, pool::get_tranche_ids},
};

/// Returns the local representation of a remote ethereum account
Expand Down Expand Up @@ -86,15 +86,14 @@ pub fn pool_c_tranche_1_id<T: Runtime>() -> TrancheId {
}

pub fn verify_outbound_failure_on_lp<T: Runtime>(to: H160) {
let (_tx, status, receipt) = pallet_ethereum::Pending::<T>::get()
let (_tx, status, _receipt) = pallet_ethereum::Pending::<T>::get()
.last()
.expect("Queue triggered evm tx.")
.clone();

// The sender is the sender account on the gateway
assert_eq!(T::Sender::get().h160(), status.from);
assert_eq!(status.to.unwrap().0, to.0);
assert!(!receipt_ok(receipt));
Copy link
Contributor Author

@cdamian cdamian Oct 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note - given that the message processor implementation of the LP gateway is transactional, this check is no longer required.

assert!(matches!(
last_event::<T, pallet_liquidity_pools_gateway_queue::Event::<T>>(),
pallet_liquidity_pools_gateway_queue::Event::<T>::MessageExecutionFailure { .. }
Expand Down
Loading