Skip to content

Commit

Permalink
lp-gateway-queue: Ensure processed messages are skipped
Browse files Browse the repository at this point in the history
  • Loading branch information
cdamian committed Oct 6, 2024
1 parent a619b37 commit 8652784
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 11 deletions.
32 changes: 22 additions & 10 deletions pallets/liquidity-pools-gateway-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ pub mod pallet {
},

/// Maximum number of messages was reached.
MaxNumberOfMessagesWasReached,
MaxNumberOfMessagesWasReached {
last_processed_nonce: T::MessageNonce,
},
}

#[pallet::error]
Expand Down Expand Up @@ -214,8 +216,10 @@ pub mod pallet {
let mut weight_used = T::DbWeight::get().reads(1);

loop {
if let Err(_) = last_processed_nonce.ensure_add_assign(One::one()) {
Self::deposit_event(Event::<T>::MaxNumberOfMessagesWasReached);
if last_processed_nonce.ensure_add_assign(One::one()).is_err() {
Self::deposit_event(Event::<T>::MaxNumberOfMessagesWasReached {
last_processed_nonce,
});

break;
}
Expand All @@ -232,8 +236,15 @@ pub mod pallet {

let message = match MessageQueue::<T>::get(last_processed_nonce) {
Some(msg) => msg,
// No message found, we can stop.
None => break,
// No message found at this nonce, we can skip it.
None => {
LastProcessedNonce::<T>::set(last_processed_nonce);

// 1 write for setting the last processed nonce
weight_used.saturating_accrue(T::DbWeight::get().writes(1));

continue;
}
};

let remaining_weight = max_weight.saturating_sub(weight_used);
Expand All @@ -244,7 +255,7 @@ pub mod pallet {
break;
}

let weight = match Self::process_message_and_deposit_event(
let processing_weight = match Self::process_message_and_deposit_event(
last_processed_nonce,
message.clone(),
) {
Expand All @@ -257,14 +268,15 @@ pub mod pallet {
}
};

weight_used.saturating_accrue(weight);
weight_used.saturating_accrue(processing_weight);

MessageQueue::<T>::remove(last_processed_nonce);

// 1 write for removing the message
weight_used.saturating_accrue(T::DbWeight::get().writes(1));

LastProcessedNonce::<T>::set(last_processed_nonce);

// 1 write for removing the message
// 1 write for setting the last processed nonce
weight_used.saturating_accrue(T::DbWeight::get().writes(2));
}

weight_used
Expand Down
63 changes: 62 additions & 1 deletion pallets/liquidity-pools-gateway-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sp_runtime::{traits::BadOrigin, DispatchError};

use crate::{
mock::{new_test_ext, Processor, Queue, Runtime, RuntimeEvent as MockEvent, RuntimeOrigin},
Error, Event, FailedMessageQueue, MessageQueue,
Error, Event, FailedMessageQueue, LastProcessedNonce, MessageQueue,
};

mod utils {
Expand Down Expand Up @@ -181,7 +181,10 @@ mod process_failed_message {
}

mod message_queue_impl {
use sp_arithmetic::ArithmeticError::Overflow;

use super::*;
use crate::MessageNonceStore;

#[test]
fn success() {
Expand All @@ -197,6 +200,17 @@ mod message_queue_impl {
event_exists(Event::<Runtime>::MessageSubmitted { nonce, message })
});
}

#[test]
fn error_on_max_nonce() {
new_test_ext().execute_with(|| {
let message = 1;

MessageNonceStore::<Runtime>::set(u64::MAX);

assert_noop!(Queue::queue(message), Overflow);
});
}
}

mod on_idle {
Expand All @@ -220,6 +234,7 @@ mod on_idle {
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

Expand All @@ -245,6 +260,7 @@ mod on_idle {
assert_eq!(weight, PROCESS_WEIGHT);
assert_eq!(handle.times(), 5);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 5)
});
}

Expand All @@ -265,6 +281,51 @@ mod on_idle {
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 1);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn with_no_messages() {
new_test_ext().execute_with(|| {
let _ = Queue::on_idle(0, TOTAL_WEIGHT);

assert_eq!(LastProcessedNonce::<Runtime>::get(), 0)
});
}

#[test]
fn with_skipped_message_nonce() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));

// Manually process the 2nd nonce, the on_idle hook should skip it and process
// the remaining nonces.
assert_ok!(Queue::process_message(RuntimeOrigin::signed(1), 2));

let weight = Queue::on_idle(0, TOTAL_WEIGHT);

assert_eq!(weight, PROCESS_WEIGHT * 2);
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn max_messages() {
new_test_ext().execute_with(|| {
LastProcessedNonce::<Runtime>::set(u64::MAX);

let _ = Queue::on_idle(0, TOTAL_WEIGHT);

event_exists(Event::<Runtime>::MaxNumberOfMessagesWasReached {
last_processed_nonce: u64::MAX,
})
});
}
}

0 comments on commit 8652784

Please sign in to comment.