Skip to content

Commit

Permalink
lp-gateway: Attempt to execute message during recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
cdamian committed Aug 14, 2024
1 parent 80e7e48 commit 221001f
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 73 deletions.
31 changes: 24 additions & 7 deletions pallets/liquidity-pools-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ mod tests;

#[frame_support::pallet]
pub mod pallet {
use frame_support::dispatch::PostDispatchInfo;

use super::*;

const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
Expand Down Expand Up @@ -291,12 +293,15 @@ pub mod pallet {

/// Not enough routers are stored for a domain.
NotEnoughRoutersForDomain,

/// First router for a domain was not found.
FirstRouterNotFound,
}

#[pallet::call]
impl<T: Config> Pallet<T> {
/// Sets the router IDs used for a specific domain,
#[pallet::weight(T::WeightInfo::set_domain_routers())]
#[pallet::weight(T::WeightInfo::set_routers())]
#[pallet::call_index(0)]
pub fn set_routers(
origin: OriginFor<T>,
Expand Down Expand Up @@ -451,22 +456,29 @@ pub mod pallet {
#[pallet::call_index(11)]
pub fn execute_message_recovery(
origin: OriginFor<T>,
domain_address: DomainAddress,
proof: Proof,
router_id: T::RouterId,
) -> DispatchResult {
) -> DispatchResultWithPostInfo {
T::AdminOrigin::ensure_origin(origin)?;

let session_id = SessionIdStore::<T>::get().ok_or(Error::<T>::SessionIdNotFound)?;
let mut weight = Weight::default();

let routers = Routers::<T>::get().ok_or(Error::<T>::RoutersNotFound)?;
let inbound_processing_info =
Self::get_inbound_processing_info(domain_address, &mut weight)?;

ensure!(
routers.iter().any(|x| x == &router_id),
inbound_processing_info
.router_ids
.iter()
.any(|x| x == &router_id),
Error::<T>::UnknownRouter
);

weight.saturating_accrue(T::DbWeight::get().writes(1));

PendingInboundEntries::<T>::try_mutate(
session_id,
inbound_processing_info.current_session_id,
(proof, router_id.clone()),
|storage_entry| match storage_entry {
Some(entry) => match entry {
Expand All @@ -485,9 +497,14 @@ pub mod pallet {
},
)?;

Self::execute_if_requirements_are_met(&inbound_processing_info, proof, &mut weight)?;

Self::deposit_event(Event::<T>::MessageRecoveryExecuted { proof, router_id });

Ok(())
Ok(PostDispatchInfo {
actual_weight: Some(weight),
pays_fee: Pays::Yes,
})
}
}

Expand Down
74 changes: 39 additions & 35 deletions pallets/liquidity-pools-gateway/src/message_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ pub enum InboundEntry<T: Config> {
/// Type used when processing inbound messages.
#[derive(Clone)]
pub struct InboundProcessingInfo<T: Config> {
domain_address: DomainAddress,
router_ids: Vec<T::RouterId>,
current_session_id: T::SessionId,
expected_proof_count_per_message: u32,
pub domain_address: DomainAddress,
pub router_ids: Vec<T::RouterId>,
pub current_session_id: T::SessionId,
pub expected_proof_count_per_message: u32,
}

impl<T: Config> Pallet<T> {
/// Retrieves all available routers for a domain and then filters them based
/// on the routers that we have in storage.
fn get_router_ids_for_domain(domain: Domain) -> Result<Vec<T::RouterId>, DispatchError> {
pub fn get_router_ids_for_domain(domain: Domain) -> Result<Vec<T::RouterId>, DispatchError> {
let all_routers_for_domain = T::RouterProvider::routers_for_domain(domain);

let stored_routers = Routers::<T>::get().ok_or(Error::<T>::RoutersNotFound)?;
Expand All @@ -68,10 +68,9 @@ impl<T: Config> Pallet<T> {

/// Calculates and returns the proof count required for processing one
/// inbound message.
fn get_expected_proof_count(domain: Domain) -> Result<u32, DispatchError> {
let routers_count = Self::get_router_ids_for_domain(domain)?.len();

let expected_proof_count = routers_count
fn get_expected_proof_count(router_ids: &Vec<T::RouterId>) -> Result<u32, DispatchError> {
let expected_proof_count = router_ids
.len()
.ensure_sub(1)
.map_err(|_| Error::<T>::NotEnoughRoutersForDomain)?;

Expand Down Expand Up @@ -216,12 +215,13 @@ impl<T: Config> Pallet<T> {
}

/// Checks if the number of proofs required for executing one message
/// were received, and returns the message if so.
fn get_executable_message(
/// were received, and if so, decreases the counts accordingly and executes
/// the message.
pub(crate) fn execute_if_requirements_are_met(
inbound_processing_info: &InboundProcessingInfo<T>,
message_proof: Proof,
weight: &mut Weight,
) -> Option<T::Message> {
) -> DispatchResult {
let mut message = None;
let mut votes = 0;

Expand All @@ -234,7 +234,7 @@ impl<T: Config> Pallet<T> {
) {
// We expected one InboundEntry for each router, if that's not the case,
// we can return.
None => return None,
None => return Ok(()),
Some(inbound_entry) => match inbound_entry {
InboundEntry::Message {
message: stored_message,
Expand All @@ -249,11 +249,25 @@ impl<T: Config> Pallet<T> {
};
}

if votes == inbound_processing_info.expected_proof_count_per_message {
return message;
if votes < inbound_processing_info.expected_proof_count_per_message {
return Ok(());
}

None
match message {
Some(msg) => {
Self::decrease_pending_entries_counts(
&inbound_processing_info,
message_proof,
weight,
)?;

T::InboundMessageHandler::handle(
inbound_processing_info.domain_address.clone(),
msg,
)
}
None => Ok(()),
}
}

/// Decreases the counts for inbound entries and removes them if the
Expand Down Expand Up @@ -312,7 +326,7 @@ impl<T: Config> Pallet<T> {

/// Retrieves the information required for processing an inbound
/// message.
fn get_inbound_processing_info(
pub(crate) fn get_inbound_processing_info(
domain_address: DomainAddress,
weight: &mut Weight,
) -> Result<InboundProcessingInfo<T>, DispatchError> {
Expand All @@ -324,7 +338,7 @@ impl<T: Config> Pallet<T> {

weight.saturating_accrue(T::DbWeight::get().reads(1));

let expected_proof_count = Self::get_expected_proof_count(domain_address.domain())?;
let expected_proof_count = Self::get_expected_proof_count(&router_ids)?;

weight.saturating_accrue(T::DbWeight::get().reads(1));

Expand Down Expand Up @@ -373,23 +387,13 @@ impl<T: Config> Pallet<T> {
return (Err(e), weight);
}

match Self::get_executable_message(&inbound_processing_info, message_proof, &mut weight)
{
Some(m) => {
if let Err(e) = Self::decrease_pending_entries_counts(
&inbound_processing_info,
message_proof,
&mut weight,
) {
return (Err(e), weight.saturating_mul(count));
}

if let Err(e) = T::InboundMessageHandler::handle(domain_address.clone(), m) {
// We only consume the processed weight if error during the batch
return (Err(e), weight.saturating_mul(count));
}
}
None => continue,
match Self::execute_if_requirements_are_met(
&inbound_processing_info,
message_proof,
&mut weight,
) {
Err(e) => return (Err(e), weight.saturating_mul(count)),
Ok(_) => continue,
}
}

Expand Down
Loading

0 comments on commit 221001f

Please sign in to comment.