Skip to content

Commit

Permalink
Merge pull request #39 from firstbatchxyz/erhant/handle-tx-underpriced
Browse files Browse the repository at this point in the history
feat: handle tx underpriced
  • Loading branch information
erhant authored Jan 14, 2025
2 parents 188bfd4 + 3b7dc49 commit d5cd12e
Show file tree
Hide file tree
Showing 12 changed files with 498 additions and 432 deletions.
689 changes: 351 additions & 338 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ default-members = ["core"]

[workspace.package]
edition = "2021"
version = "0.2.32"
version = "0.2.33"
license = "Apache-2.0"
readme = "README.md"
authors = ["erhant"]
Expand Down
44 changes: 24 additions & 20 deletions contracts/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,24 @@ pub fn contract_error_report(error: Error) -> ErrReport {
// here we try to parse the error w.r.t provided contract interfaces
// or return a default one in the end if it was not parsed successfully
if let Some(payload) = error.as_error_resp() {
payload
.as_decoded_error(false)
.map(ERC20Errors::into)
.or_else(|| {
payload
.as_decoded_error(false)
.map(OracleRegistryErrors::into)
})
.or_else(|| {
payload
.as_decoded_error(false)
.map(OracleCoordinatorErrors::into)
})
.unwrap_or(eyre!("Unhandled contract error: {:#?}", error))
// an ERC20 error
if let Some(erc_20_error) = payload.as_decoded_error::<ERC20Errors>(false) {
return erc_20_error.into();
} else
// an OracleRegistry error
if let Some(registry_error) =
payload.as_decoded_error::<OracleRegistryErrors>(false)
{
return registry_error.into();
} else
// an OracleCoordinator error
if let Some(coordinator_error) =
payload.as_decoded_error::<OracleCoordinatorErrors>(false)
{
return coordinator_error.into();
} else {
return eyre!("Unhandled error response: {:#?}", error);
}
} else {
eyre!("Unknown transport error: {:#?}", error)
}
Expand Down Expand Up @@ -103,13 +107,13 @@ impl From<OracleRegistryErrors> for ErrReport {
}
// generic
OracleRegistryErrors::FailedCall(_) => {
eyre!("Failed call",)
eyre!("Failed call")
}
OracleRegistryErrors::ERC1967InvalidImplementation(e) => {
eyre!("Invalid implementation: {}", e.implementation)
}
OracleRegistryErrors::UUPSUnauthorizedCallContext(_) => {
eyre!("Unauthorized UUPS call context",)
eyre!("Unauthorized UUPS call context")
}
OracleRegistryErrors::UUPSUnsupportedProxiableUUID(e) => {
eyre!("Unsupported UUPS proxiable UUID: {}", e.slot)
Expand All @@ -124,7 +128,7 @@ impl From<OracleRegistryErrors> for ErrReport {
eyre!("Address {} is empty", e.target)
}
OracleRegistryErrors::NotInitializing(_) => {
eyre!("Not initializing",)
eyre!("Not initializing")
}
}
}
Expand Down Expand Up @@ -170,13 +174,13 @@ impl From<OracleCoordinatorErrors> for ErrReport {
}
// generic
OracleCoordinatorErrors::FailedInnerCall(_) => {
eyre!("Failed inner call",)
eyre!("Failed inner call")
}
OracleCoordinatorErrors::ERC1967InvalidImplementation(e) => {
eyre!("Invalid implementation: {}", e.implementation)
}
OracleCoordinatorErrors::UUPSUnauthorizedCallContext(_) => {
eyre!("Unauthorized UUPS call context",)
eyre!("Unauthorized UUPS call context")
}
OracleCoordinatorErrors::UUPSUnsupportedProxiableUUID(e) => {
eyre!("Unsupported UUPS proxiable UUID: {}", e.slot)
Expand All @@ -191,7 +195,7 @@ impl From<OracleCoordinatorErrors> for ErrReport {
eyre!("Address {} is empty", e.target)
}
OracleCoordinatorErrors::NotInitializing(_) => {
eyre!("Not initializing",)
eyre!("Not initializing")
}
}
}
Expand Down
8 changes: 1 addition & 7 deletions core/src/cli/commands/coordinator/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,7 @@ impl DriaOracle {

let status = TaskStatus::try_from(request.status)?;
match handle_request(self, status, task_id, request.protocol).await {
Ok(Some(receipt)) => {
log::info!(
"Task {} processed successfully. (tx: {})",
task_id,
receipt.transaction_hash
)
}
Ok(Some(_receipt)) => {}
Ok(None) => {
log::info!("Task {} ignored.", task_id)
}
Expand Down
7 changes: 4 additions & 3 deletions core/src/cli/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub enum Commands {
Serve {
#[arg(help = "The oracle kinds to handle tasks as, if omitted will default to all registered kinds.", value_parser = parse_oracle_kind)]
kinds: Vec<OracleKind>,
#[arg(short, long = "model", help = "The models to serve.", required = true, value_parser = parse_model)]
#[arg(short, long = "model", help = "The model(s) to serve.", required = true, value_parser = parse_model)]
models: Vec<Model>,
#[arg(
long,
Expand All @@ -49,6 +49,7 @@ pub enum Commands {
)]
to: Option<BlockNumberOrTag>,
#[arg(
short,
long,
help = "Optional task id to serve specifically.",
required = false
Expand All @@ -61,14 +62,14 @@ pub enum Commands {
from: Option<BlockNumberOrTag>,
#[arg(long, help = "Ending block number, defaults to 'latest'.", value_parser = parse_block_number_or_tag)]
to: Option<BlockNumberOrTag>,
#[arg(long, help = "Task id to view.")]
#[arg(short, long, help = "Task id to view.")]
task_id: Option<U256>,
},
/// Request a task.
Request {
#[arg(help = "The input to request a task with.", required = true)]
input: String,
#[arg(help = "The models to accept.", required = true, value_parser=parse_model)]
#[arg(help = "The model(s) to accept.", required = true, value_parser=parse_model)]
models: Vec<Model>,
#[arg(long, help = "The difficulty of the task.", default_value_t = 2)]
difficulty: u8,
Expand Down
2 changes: 1 addition & 1 deletion core/src/cli/commands/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl DriaOracle {
}

// calculate the required approval for registration
let stake = self.registry_stake_amount(kind).await?;
let stake = self.get_registry_stake_amount(kind).await?;
let allowance = self
.allowance(self.address(), self.addresses.registry)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion core/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Cli {
}

pub fn read_tx_timeout() -> Result<u64> {
let timeout = env::var("TX_TIMEOUT_SECS").unwrap_or("30".to_string());
let timeout = env::var("TX_TIMEOUT_SECS").unwrap_or("100".to_string());
timeout.parse().map_err(Into::into)
}
}
Expand Down
1 change: 1 addition & 0 deletions core/src/node/anvil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl DriaOracle {
.map_err(contract_error_report)
.wrap_err("could not add to whitelist")?;

// TODO: use common command wait_for_tx
log::info!("Hash: {:?}", tx.tx_hash());
let receipt = tx
.with_timeout(self.config.tx_timeout)
Expand Down
30 changes: 12 additions & 18 deletions core/src/node/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use alloy::eips::BlockNumberOrTag;
use alloy::primitives::aliases::U40;
use alloy::primitives::{Bytes, U256};
use alloy::rpc::types::{Log, TransactionReceipt};
use dria_oracle_contracts::string_to_bytes32;
use dria_oracle_contracts::LLMOracleTask::{TaskResponse, TaskValidation};
use dria_oracle_contracts::{contract_error_report, string_to_bytes32};
use eyre::{eyre, Context, Result};
use eyre::{eyre, Result};

use dria_oracle_contracts::OracleCoordinator::{
self, getFeeReturn, getResponsesReturn, getValidationsReturn, requestsReturn,
Expand All @@ -32,13 +32,9 @@ impl DriaOracle {
numGenerations: U40::from(num_gens),
numValidations: U40::from(num_vals),
};
let req = coordinator.request(string_to_bytes32(protocol)?, input, models, parameters);
let tx = req
.send()
.await
.map_err(contract_error_report)
.wrap_err("could not request task")?;

let req = coordinator.request(string_to_bytes32(protocol)?, input, models, parameters);
let tx = self.send_with_gas_hikes(req).await?;
self.wait_for_tx(tx).await
}

Expand Down Expand Up @@ -78,6 +74,7 @@ impl DriaOracle {
Ok(responses._0)
}

/// Responds to a generation request with the response, metadata, and a valid nonce.
pub async fn respond_generation(
&self,
task_id: U256,
Expand All @@ -88,16 +85,12 @@ impl DriaOracle {
let coordinator = OracleCoordinator::new(self.addresses.coordinator, &self.provider);

let req = coordinator.respond(task_id, nonce, response, metadata);
let tx = req.send().await.map_err(contract_error_report)?;

log::info!("Hash: {:?}", tx.tx_hash());
let receipt = tx
.with_timeout(self.config.tx_timeout)
.get_receipt()
.await?;
Ok(receipt)
let tx = self.send_with_gas_hikes(req).await?;
self.wait_for_tx(tx).await
}

/// Responds to a validation request with the score, metadata, and a valid nonce.
#[inline]
pub async fn respond_validation(
&self,
task_id: U256,
Expand All @@ -108,12 +101,12 @@ impl DriaOracle {
let coordinator = OracleCoordinator::new(self.addresses.coordinator, &self.provider);

let req = coordinator.validate(task_id, nonce, scores, metadata);
let tx = req.send().await.map_err(contract_error_report)?;

let tx = self.send_with_gas_hikes(req).await?;
self.wait_for_tx(tx).await
}

/// Subscribes to events & processes tasks.
#[inline]
pub async fn subscribe_to_tasks(
&self,
) -> Result<EventPoller<DriaOracleProviderTransport, StatusUpdate>> {
Expand Down Expand Up @@ -163,6 +156,7 @@ impl DriaOracle {
}

/// Returns the next task id.
#[inline]
pub async fn get_next_task_id(&self) -> Result<U256> {
let coordinator = OracleCoordinator::new(self.addresses.coordinator, &self.provider);

Expand Down
83 changes: 77 additions & 6 deletions core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ mod token;
mod anvil;

use super::DriaOracleConfig;
use alloy::contract::CallBuilder;
use alloy::hex::FromHex;
use alloy::providers::fillers::{
BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller, WalletFiller,
};
use alloy::providers::{PendingTransactionBuilder, WalletProvider};
use alloy::rpc::types::TransactionReceipt;
use alloy::transports::RpcError;
use alloy::{
network::{Ethereum, EthereumWallet},
primitives::Address,
Expand All @@ -21,8 +22,8 @@ use alloy::{
use alloy_chains::Chain;
use dkn_workflows::{DriaWorkflowsConfig, Model, ModelProvider};
use dria_oracle_contracts::{
get_coordinator_address, ContractAddresses, OracleCoordinator, OracleKind, OracleRegistry,
TokenBalance,
contract_error_report, get_coordinator_address, ContractAddresses, OracleCoordinator,
OracleKind, OracleRegistry, TokenBalance,
};
use eyre::{eyre, Context, Result};
use std::env;
Expand Down Expand Up @@ -259,17 +260,87 @@ impl DriaOracle {
}

/// Waits for a transaction to be mined, returning the receipt.
async fn wait_for_tx(
#[inline]
async fn wait_for_tx<T, N>(
&self,
tx: PendingTransactionBuilder<Http<Client>, Ethereum>,
) -> Result<TransactionReceipt> {
tx: PendingTransactionBuilder<T, N>,
) -> Result<N::ReceiptResponse>
where
T: alloy::transports::Transport + Clone,
N: alloy::network::Network,
{
log::info!("Waiting for tx: {:?}", tx.tx_hash());
let receipt = tx
.with_timeout(self.config.tx_timeout)
.get_receipt()
.await?;
Ok(receipt)
}

/// Given a request, retries sending it with increasing gas prices to avoid
/// the "tx underpriced" errors.
#[inline]
async fn send_with_gas_hikes<T, P, D, N>(
&self,
req: CallBuilder<T, P, D, N>,
) -> Result<PendingTransactionBuilder<T, N>>
where
T: alloy::transports::Transport + Clone,
P: alloy::providers::Provider<T, N> + Clone,
D: alloy::contract::CallDecoder + Clone,
N: alloy::network::Network,
{
// gas price hikes to try in increasing order, first is 0 to simply use the
// initial gas fee for the first attempt
const GAS_PRICE_HIKES: [u128; 4] = [0, 12, 24, 36];

// try and send tx, with increasing gas prices for few attempts
let initial_gas_price = self.provider.get_gas_price().await?;
for (attempt_no, increase_percentage) in GAS_PRICE_HIKES.iter().enumerate() {
// set gas price
let gas_price = initial_gas_price + (initial_gas_price / 100) * increase_percentage;

// try to send tx with gas price
match req
.clone()
.gas_price(gas_price) // TODO: very low gas price to get an error deliberately
.send()
.await
{
// if all is well, we can return the tx
Ok(tx) => {
return Ok(tx);
}
// if we get an RPC error; specifically, if the tx is underpriced, we try again with higher gas
Err(alloy::contract::Error::TransportError(RpcError::ErrorResp(err))) => {
// TODO: kind of a code-smell, can we do better check here?
if err.message.contains("underpriced") {
log::warn!(
"{} with gas {} in attempt {}",
err.message,
gas_price,
attempt_no + 1,
);

// wait just a little bit
tokio::time::sleep(std::time::Duration::from_millis(300)).await;

continue;
} else {
// otherwise let it be handled by the error report
return Err(contract_error_report(
alloy::contract::Error::TransportError(RpcError::ErrorResp(err)),
));
}
}
// if we get any other error, we report it
Err(err) => return Err(contract_error_report(err)),
};
}

// all attempts failed
Err(eyre!("Failed to send transaction."))
}
}

impl core::fmt::Display for DriaOracle {
Expand Down
Loading

0 comments on commit d5cd12e

Please sign in to comment.