Skip to content

Commit 79f4559

Browse files
authored
[Cherrypick] Add submit_best_effort to SubmitToConsensus (#21561) (#21563)
To be used for consensus submissions which do not require commit confirmation ## Description Fix for issue which can cause all consensus submission permits to be consumed ## Test plan - Load test in PTN - Antithesis tests (ongoing) --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [x] Nodes (Validators and Full nodes): Upgrade to new version required for validators. Fullnodes unaffected - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK:
1 parent 9186d26 commit 79f4559

File tree

4 files changed

+82
-7
lines changed

4 files changed

+82
-7
lines changed

crates/sui-config/src/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,7 @@ impl ConsensusConfig {
783783
}
784784

785785
pub fn max_pending_transactions(&self) -> usize {
786-
self.max_pending_transactions.unwrap_or(20_000)
786+
self.max_pending_transactions.unwrap_or(40_000)
787787
}
788788

789789
pub fn submit_delay_step_override(&self) -> Option<Duration> {

crates/sui-core/src/authority/execution_time_estimator.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -349,10 +349,12 @@ impl ExecutionTimeObserver {
349349
let transaction = ConsensusTransaction::new_execution_time_observation(
350350
ExecutionTimeObservation::new(epoch_store.name, to_share),
351351
);
352-
if let Err(e) = self
353-
.consensus_adapter
354-
.submit_to_consensus(&[transaction], &epoch_store)
355-
{
352+
353+
if let Err(e) = self.consensus_adapter.submit_best_effort(
354+
&transaction,
355+
&epoch_store,
356+
Duration::from_secs(5),
357+
) {
356358
if !matches!(e, SuiError::EpochEnded(_)) {
357359
epoch_store
358360
.metrics

crates/sui-core/src/consensus_adapter.rs

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ pub struct ConsensusAdapterMetrics {
8080
pub sequencing_in_flight_submissions: IntGauge,
8181
pub sequencing_estimated_latency: IntGauge,
8282
pub sequencing_resubmission_interval_ms: IntGauge,
83+
pub sequencing_best_effort_timeout: IntCounterVec,
8384
}
8485

8586
impl ConsensusAdapterMetrics {
@@ -188,6 +189,12 @@ impl ConsensusAdapterMetrics {
188189
SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
189190
registry,
190191
).unwrap(),
192+
sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
193+
"sequencing_best_effort_timeout",
194+
"The number of times the best effort submission has timed out.",
195+
&["tx_type"],
196+
registry,
197+
).unwrap(),
191198
}
192199
}
193200

@@ -210,6 +217,13 @@ pub trait SubmitToConsensus: Sync + Send + 'static {
210217
transactions: &[ConsensusTransaction],
211218
epoch_store: &Arc<AuthorityPerEpochStore>,
212219
) -> SuiResult;
220+
221+
fn submit_best_effort(
222+
&self,
223+
transaction: &ConsensusTransaction,
224+
epoch_store: &Arc<AuthorityPerEpochStore>,
225+
timeout: Duration,
226+
) -> SuiResult;
213227
}
214228

215229
#[mockall::automock]
@@ -249,7 +263,7 @@ pub struct ConsensusAdapter {
249263
/// A structure to register metrics
250264
metrics: ConsensusAdapterMetrics,
251265
/// Semaphore limiting parallel submissions to consensus
252-
submit_semaphore: Semaphore,
266+
submit_semaphore: Arc<Semaphore>,
253267
latency_observer: LatencyObserver,
254268
protocol_config: ProtocolConfig,
255269
}
@@ -300,7 +314,7 @@ impl ConsensusAdapter {
300314
connection_monitor_status,
301315
low_scoring_authorities,
302316
metrics,
303-
submit_semaphore: Semaphore::new(max_pending_local_submissions),
317+
submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
304318
latency_observer: LatencyObserver::new(),
305319
consensus_throughput_profiler: ArcSwapOption::empty(),
306320
protocol_config,
@@ -1287,6 +1301,55 @@ impl SubmitToConsensus for Arc<ConsensusAdapter> {
12871301
self.submit_batch(transactions, None, epoch_store)
12881302
.map(|_| ())
12891303
}
1304+
1305+
fn submit_best_effort(
1306+
&self,
1307+
transaction: &ConsensusTransaction,
1308+
epoch_store: &Arc<AuthorityPerEpochStore>,
1309+
// timeout is required, or the spawned task can run forever
1310+
timeout: Duration,
1311+
) -> SuiResult {
1312+
let permit = match self.submit_semaphore.clone().try_acquire_owned() {
1313+
Ok(permit) => permit,
1314+
Err(_) => {
1315+
return Err(SuiError::TooManyTransactionsPendingConsensus);
1316+
}
1317+
};
1318+
1319+
let _in_flight_submission_guard =
1320+
GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1321+
1322+
let key = SequencedConsensusTransactionKey::External(transaction.key());
1323+
let tx_type = classify(transaction);
1324+
1325+
let async_stage = {
1326+
let transaction = transaction.clone();
1327+
let epoch_store = epoch_store.clone();
1328+
let this = self.clone();
1329+
1330+
async move {
1331+
let _permit = permit; // Hold permit for lifetime of task
1332+
1333+
let result = tokio::time::timeout(
1334+
timeout,
1335+
this.submit_inner(&[transaction], &epoch_store, &[key], tx_type, false),
1336+
)
1337+
.await;
1338+
1339+
if let Err(e) = result {
1340+
warn!("Consensus submission timed out: {e:?}");
1341+
this.metrics
1342+
.sequencing_best_effort_timeout
1343+
.with_label_values(&[tx_type])
1344+
.inc();
1345+
}
1346+
}
1347+
};
1348+
1349+
let epoch_store = epoch_store.clone();
1350+
spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1351+
Ok(())
1352+
}
12901353
}
12911354

12921355
pub fn position_submit_certificate(

crates/sui-core/src/mock_consensus.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::consensus_handler::SequencedConsensusTransaction;
99
use consensus_core::BlockRef;
1010
use prometheus::Registry;
1111
use std::sync::{Arc, Weak};
12+
use std::time::Duration;
1213
use sui_types::error::{SuiError, SuiResult};
1314
use sui_types::executable_transaction::VerifiedExecutableTransaction;
1415
use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
@@ -119,6 +120,15 @@ impl SubmitToConsensus for MockConsensusClient {
119120
) -> SuiResult {
120121
self.submit_impl(transactions).map(|_response| ())
121122
}
123+
124+
fn submit_best_effort(
125+
&self,
126+
transaction: &ConsensusTransaction,
127+
_epoch_store: &Arc<AuthorityPerEpochStore>,
128+
_timeout: Duration,
129+
) -> SuiResult {
130+
self.submit_impl(&[transaction.clone()]).map(|_response| ())
131+
}
122132
}
123133

124134
#[async_trait::async_trait]

0 commit comments

Comments
 (0)