Skip to content

Commit a279b8c

Browse files
committed
[Cherrypick] Add submit_best_effort to SubmitToConsensus (#21561)
To be used for consensus submissions which do not require commit confirmation
1 parent 9186d26 commit a279b8c

File tree

3 files changed

+81
-6
lines changed

3 files changed

+81
-6
lines changed

crates/sui-core/src/authority/execution_time_estimator.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -349,10 +349,12 @@ impl ExecutionTimeObserver {
349349
let transaction = ConsensusTransaction::new_execution_time_observation(
350350
ExecutionTimeObservation::new(epoch_store.name, to_share),
351351
);
352-
if let Err(e) = self
353-
.consensus_adapter
354-
.submit_to_consensus(&[transaction], &epoch_store)
355-
{
352+
353+
if let Err(e) = self.consensus_adapter.submit_best_effort(
354+
&transaction,
355+
&epoch_store,
356+
Duration::from_secs(5),
357+
) {
356358
if !matches!(e, SuiError::EpochEnded(_)) {
357359
epoch_store
358360
.metrics

crates/sui-core/src/consensus_adapter.rs

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ pub struct ConsensusAdapterMetrics {
8080
pub sequencing_in_flight_submissions: IntGauge,
8181
pub sequencing_estimated_latency: IntGauge,
8282
pub sequencing_resubmission_interval_ms: IntGauge,
83+
pub sequencing_best_effort_timeout: IntCounterVec,
8384
}
8485

8586
impl ConsensusAdapterMetrics {
@@ -188,6 +189,12 @@ impl ConsensusAdapterMetrics {
188189
SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
189190
registry,
190191
).unwrap(),
192+
sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
193+
"sequencing_best_effort_timeout",
194+
"The number of times the best effort submission has timed out.",
195+
&["tx_type"],
196+
registry,
197+
).unwrap(),
191198
}
192199
}
193200

@@ -210,6 +217,13 @@ pub trait SubmitToConsensus: Sync + Send + 'static {
210217
transactions: &[ConsensusTransaction],
211218
epoch_store: &Arc<AuthorityPerEpochStore>,
212219
) -> SuiResult;
220+
221+
fn submit_best_effort(
222+
&self,
223+
transaction: &ConsensusTransaction,
224+
epoch_store: &Arc<AuthorityPerEpochStore>,
225+
timeout: Duration,
226+
) -> SuiResult;
213227
}
214228

215229
#[mockall::automock]
@@ -249,7 +263,7 @@ pub struct ConsensusAdapter {
249263
/// A structure to register metrics
250264
metrics: ConsensusAdapterMetrics,
251265
/// Semaphore limiting parallel submissions to consensus
252-
submit_semaphore: Semaphore,
266+
submit_semaphore: Arc<Semaphore>,
253267
latency_observer: LatencyObserver,
254268
protocol_config: ProtocolConfig,
255269
}
@@ -300,7 +314,7 @@ impl ConsensusAdapter {
300314
connection_monitor_status,
301315
low_scoring_authorities,
302316
metrics,
303-
submit_semaphore: Semaphore::new(max_pending_local_submissions),
317+
submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
304318
latency_observer: LatencyObserver::new(),
305319
consensus_throughput_profiler: ArcSwapOption::empty(),
306320
protocol_config,
@@ -1287,6 +1301,55 @@ impl SubmitToConsensus for Arc<ConsensusAdapter> {
12871301
self.submit_batch(transactions, None, epoch_store)
12881302
.map(|_| ())
12891303
}
1304+
1305+
fn submit_best_effort(
1306+
&self,
1307+
transaction: &ConsensusTransaction,
1308+
epoch_store: &Arc<AuthorityPerEpochStore>,
1309+
// timeout is required, or the spawned task can run forever
1310+
timeout: Duration,
1311+
) -> SuiResult {
1312+
let permit = match self.submit_semaphore.clone().try_acquire_owned() {
1313+
Ok(permit) => permit,
1314+
Err(_) => {
1315+
return Err(SuiError::TooManyTransactionsPendingConsensus);
1316+
}
1317+
};
1318+
1319+
let _in_flight_submission_guard =
1320+
GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1321+
1322+
let key = SequencedConsensusTransactionKey::External(transaction.key());
1323+
let tx_type = classify(transaction);
1324+
1325+
let async_stage = {
1326+
let transaction = transaction.clone();
1327+
let epoch_store = epoch_store.clone();
1328+
let this = self.clone();
1329+
1330+
async move {
1331+
let _permit = permit; // Hold permit for lifetime of task
1332+
1333+
let result = tokio::time::timeout(
1334+
timeout,
1335+
this.submit_inner(&[transaction], &epoch_store, &[key], tx_type, false),
1336+
)
1337+
.await;
1338+
1339+
if let Err(e) = result {
1340+
warn!("Consensus submission timed out: {e:?}");
1341+
this.metrics
1342+
.sequencing_best_effort_timeout
1343+
.with_label_values(&[tx_type])
1344+
.inc();
1345+
}
1346+
}
1347+
};
1348+
1349+
let epoch_store = epoch_store.clone();
1350+
spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1351+
Ok(())
1352+
}
12901353
}
12911354

12921355
pub fn position_submit_certificate(

crates/sui-core/src/mock_consensus.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::consensus_handler::SequencedConsensusTransaction;
99
use consensus_core::BlockRef;
1010
use prometheus::Registry;
1111
use std::sync::{Arc, Weak};
12+
use std::time::Duration;
1213
use sui_types::error::{SuiError, SuiResult};
1314
use sui_types::executable_transaction::VerifiedExecutableTransaction;
1415
use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
@@ -119,6 +120,15 @@ impl SubmitToConsensus for MockConsensusClient {
119120
) -> SuiResult {
120121
self.submit_impl(transactions).map(|_response| ())
121122
}
123+
124+
fn submit_best_effort(
125+
&self,
126+
transaction: &ConsensusTransaction,
127+
_epoch_store: &Arc<AuthorityPerEpochStore>,
128+
_timeout: Duration,
129+
) -> SuiResult {
130+
self.submit_impl(&[transaction.clone()]).map(|_response| ())
131+
}
122132
}
123133

124134
#[async_trait::async_trait]

0 commit comments

Comments
 (0)