Skip to content

Commit 8b779fb

Browse files
committed
Move config parameters for ExecutionTimeEstimate mode into protocol and node configs.
1 parent 64af425 commit 8b779fb

File tree

10 files changed

+242
-145
lines changed

10 files changed

+242
-145
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/sui-benchmark/tests/simtest.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ mod test {
485485
target_utilization: rng.gen_range(1..=100),
486486
allowed_txn_cost_overage_burst_limit_us: rng.gen_range(0..500_000),
487487
max_txn_cost_overage_per_object_in_commit_us: 10_000_000_000,
488+
max_estimate_ms: 1_500,
488489
},
489490
),
490491
]

crates/sui-config/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ bcs.workspace = true
1616
csv.workspace = true
1717
dirs.workspace = true
1818
fastcrypto.workspace = true
19+
nonzero_ext.workspace = true
1920
once_cell.workspace = true
2021
rand.workspace = true
2122
serde = { workspace = true, features = ["derive", "rc"] }

crates/sui-config/src/node.rs

Lines changed: 86 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ use crate::Config;
1010
use anyhow::Result;
1111
use consensus_config::Parameters as ConsensusParameters;
1212
use mysten_common::fatal;
13+
use nonzero_ext::nonzero;
1314
use once_cell::sync::OnceCell;
1415
use rand::rngs::OsRng;
1516
use serde::{Deserialize, Serialize};
1617
use serde_with::serde_as;
1718
use std::collections::{BTreeMap, BTreeSet};
1819
use std::net::SocketAddr;
19-
use std::num::NonZeroUsize;
20+
use std::num::{NonZeroU32, NonZeroUsize};
2021
use std::path::{Path, PathBuf};
2122
use std::sync::Arc;
2223
use std::time::Duration;
@@ -33,7 +34,7 @@ use sui_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
3334

3435
use sui_types::crypto::{get_key_pair_from_rng, AccountKeyPair, AuthorityKeyPair};
3536
use sui_types::multiaddr::Multiaddr;
36-
use tracing::{error, info};
37+
use tracing::info;
3738

3839
// Default max number of concurrent requests served
3940
pub const DEFAULT_GRPC_CONCURRENCY_LIMIT: usize = 20000000000;
@@ -202,17 +203,95 @@ pub struct NodeConfig {
202203
#[serde(skip_serializing_if = "Option::is_none")]
203204
pub enable_db_write_stall: Option<bool>,
204205

206+
#[serde(skip_serializing_if = "Option::is_none")]
207+
pub execution_time_observer_config: Option<ExecutionTimeObserverConfig>,
208+
}
209+
210+
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
211+
#[serde(rename_all = "kebab-case")]
212+
pub struct ExecutionTimeObserverConfig {
205213
/// Size of the channel used for buffering local execution time observations.
206214
///
207215
/// If unspecified, this will default to `128`.
208-
#[serde(default = "default_local_execution_time_channel_capacity")]
209-
pub local_execution_time_channel_capacity: usize,
216+
pub observation_channel_capacity: Option<NonZeroUsize>,
210217

211218
/// Size of the LRU cache used for storing local execution time observations.
212219
///
213-
/// If unspecified, this will default to `10000`.
214-
#[serde(default = "default_local_execution_time_cache_size")]
215-
pub local_execution_time_cache_size: usize,
220+
/// If unspecified, this will default to `10_000`.
221+
pub observation_cache_size: Option<NonZeroUsize>,
222+
223+
/// Size of the LRU cache used for tracking object utilization.
224+
///
225+
/// If unspecified, this will default to `50_000`.
226+
pub object_utilization_cache_size: Option<NonZeroUsize>,
227+
228+
/// Unless target object utilization is exceeded by at least this amount, no observation
229+
/// will be shared with consensus.
230+
///
231+
/// If unspecified, this will default to `100` milliseconds.
232+
pub observation_sharing_object_utilization_threshold: Option<Duration>,
233+
234+
/// Unless the current local observation differs from the last one we shared by at least this
235+
/// percentage, no observation will be shared with consensus.
236+
///
237+
/// If unspecified, this will default to `0.05`.
238+
pub observation_sharing_diff_threshold: Option<f64>,
239+
240+
/// Minimum interval between sharing multiple observations of the same key.
241+
///
242+
/// If unspecified, this will default to `5` seconds.
243+
pub observation_sharing_min_interval: Option<Duration>,
244+
245+
/// Global per-second rate limit for sharing observations. This is a safety valve and
246+
/// should not trigger during normal operation.
247+
///
248+
/// If unspecified, this will default to `10` observations per second.
249+
pub observation_sharing_rate_limit: Option<NonZeroU32>,
250+
251+
/// Global burst limit for sharing observations.
252+
///
253+
/// If unspecified, this will default to `100` observations.
254+
pub observation_sharing_burst_limit: Option<NonZeroU32>,
255+
}
256+
257+
impl ExecutionTimeObserverConfig {
258+
pub fn observation_channel_capacity(&self) -> NonZeroUsize {
259+
self.observation_channel_capacity
260+
.unwrap_or(nonzero!(128usize))
261+
}
262+
263+
pub fn observation_cache_size(&self) -> NonZeroUsize {
264+
self.observation_cache_size.unwrap_or(nonzero!(10_000usize))
265+
}
266+
267+
pub fn object_utilization_cache_size(&self) -> NonZeroUsize {
268+
self.object_utilization_cache_size
269+
.unwrap_or(nonzero!(50_000usize))
270+
}
271+
272+
pub fn observation_sharing_object_utilization_threshold(&self) -> Duration {
273+
self.observation_sharing_object_utilization_threshold
274+
.unwrap_or(Duration::from_millis(100))
275+
}
276+
277+
pub fn observation_sharing_diff_threshold(&self) -> f64 {
278+
self.observation_sharing_diff_threshold.unwrap_or(0.05)
279+
}
280+
281+
pub fn observation_sharing_min_interval(&self) -> Duration {
282+
self.observation_sharing_min_interval
283+
.unwrap_or(Duration::from_secs(5))
284+
}
285+
286+
pub fn observation_sharing_rate_limit(&self) -> NonZeroU32 {
287+
self.observation_sharing_rate_limit
288+
.unwrap_or(nonzero!(10u32))
289+
}
290+
291+
pub fn observation_sharing_burst_limit(&self) -> NonZeroU32 {
292+
self.observation_sharing_burst_limit
293+
.unwrap_or(nonzero!(100u32))
294+
}
216295
}
217296

218297
#[derive(Clone, Debug, Deserialize, Serialize)]
@@ -552,14 +631,6 @@ pub fn default_end_of_epoch_broadcast_channel_capacity() -> usize {
552631
128
553632
}
554633

555-
pub fn default_local_execution_time_channel_capacity() -> usize {
556-
128
557-
}
558-
559-
pub fn default_local_execution_time_cache_size() -> usize {
560-
10000
561-
}
562-
563634
pub fn bool_true() -> bool {
564635
true
565636
}
@@ -655,13 +726,6 @@ impl NodeConfig {
655726
pub fn rpc(&self) -> Option<&sui_rpc_api::Config> {
656727
self.rpc.as_ref()
657728
}
658-
659-
pub fn local_execution_time_cache_size(&self) -> NonZeroUsize {
660-
NonZeroUsize::new(self.local_execution_time_cache_size).unwrap_or_else(|| {
661-
error!("local_execution_time_cache_size must be non-zero - defaulting to 10000");
662-
NonZeroUsize::new(10000).unwrap()
663-
})
664-
}
665729
}
666730

667731
#[derive(Debug, Clone, Deserialize, Serialize)]

crates/sui-core/src/authority/authority_per_epoch_store.rs

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ pub struct AuthorityPerEpochStore {
404404
randomness_reporter: OnceCell<RandomnessReporter>,
405405

406406
/// Manages recording execution time observations and generating estimates.
407-
execution_time_estimator: tokio::sync::Mutex<ExecutionTimeEstimator>,
407+
execution_time_estimator: tokio::sync::Mutex<Option<ExecutionTimeEstimator>>,
408408
tx_local_execution_time:
409409
OnceCell<mpsc::Sender<(ProgrammableTransaction, Vec<ExecutionTiming>, Duration)>>,
410410
// Saved at end of epoch for propagating observations to the next.
@@ -892,23 +892,31 @@ impl AuthorityPerEpochStore {
892892
.execution_time_observations
893893
.safe_iter()
894894
.collect::<Result<Vec<_>, _>>()?;
895-
let execution_time_estimator = ExecutionTimeEstimator::new(
896-
committee.clone(),
897-
// Load observations stored at end of previous epoch.
898-
Self::get_stored_execution_time_observations(
899-
&protocol_config,
900-
committee.clone(),
901-
&*object_store,
902-
)
903-
// Load observations stored during the current epoch.
904-
.chain(execution_time_observations.into_iter().flat_map(
905-
|((generation, source), observations)| {
906-
observations
907-
.into_iter()
908-
.map(move |(key, duration)| (source, generation, key, duration))
909-
},
910-
)),
911-
);
895+
let execution_time_estimator =
896+
if let PerObjectCongestionControlMode::ExecutionTimeEstimate(protocol_params) =
897+
protocol_config.per_object_congestion_control_mode()
898+
{
899+
Some(ExecutionTimeEstimator::new(
900+
committee.clone(),
901+
protocol_params,
902+
// Load observations stored at end of previous epoch.
903+
Self::get_stored_execution_time_observations(
904+
&protocol_config,
905+
committee.clone(),
906+
&*object_store,
907+
)
908+
// Load observations stored during the current epoch.
909+
.chain(execution_time_observations.into_iter().flat_map(
910+
|((generation, source), observations)| {
911+
observations
912+
.into_iter()
913+
.map(move |(key, duration)| (source, generation, key, duration))
914+
},
915+
)),
916+
))
917+
} else {
918+
None
919+
};
912920

913921
let s = Arc::new(Self {
914922
name,
@@ -1993,7 +2001,7 @@ impl AuthorityPerEpochStore {
19932001

19942002
fn should_defer(
19952003
&self,
1996-
execution_time_estimator: &ExecutionTimeEstimator,
2004+
execution_time_estimator: Option<&ExecutionTimeEstimator>,
19972005
cert: &VerifiedExecutableTransaction,
19982006
commit_info: &ConsensusCommitInfo,
19992007
dkg_failed: bool,
@@ -3001,12 +3009,12 @@ impl AuthorityPerEpochStore {
30013009
estimates,
30023010
} in execution_time_observations
30033011
{
3012+
let Some(estimator) = execution_time_estimator.as_mut() else {
3013+
error!("dropping ExecutionTimeObservation from possibly-Byzantine authority {authority:?} sent when ExecutionTimeEstimate mode is not enabled");
3014+
continue;
3015+
};
30043016
let authority_index = self.committee.authority_index(&authority).unwrap();
3005-
execution_time_estimator.process_observations_from_consensus(
3006-
authority_index,
3007-
generation,
3008-
&estimates,
3009-
);
3017+
estimator.process_observations_from_consensus(authority_index, generation, &estimates);
30103018
output.insert_execution_time_observation(authority_index, generation, estimates);
30113019
}
30123020

@@ -3063,7 +3071,7 @@ impl AuthorityPerEpochStore {
30633071
randomness_manager.as_deref_mut(),
30643072
dkg_failed,
30653073
randomness_round,
3066-
&execution_time_estimator,
3074+
execution_time_estimator.as_ref(),
30673075
authority_metrics,
30683076
)
30693077
.await?;
@@ -3073,11 +3081,13 @@ impl AuthorityPerEpochStore {
30733081
// If this is the final round, record execution time observations for storage in the
30743082
// end-of-epoch tx.
30753083
if final_round {
3076-
self.end_of_epoch_execution_time_observations
3077-
.set(execution_time_estimator.take_observations())
3084+
if let Some(estimator) = execution_time_estimator.as_mut() {
3085+
self.end_of_epoch_execution_time_observations
3086+
.set(estimator.take_observations())
30783087
.expect(
30793088
"`stored_execution_time_observations` should only be set once at end of epoch",
30803089
);
3090+
}
30813091
drop(execution_time_estimator); // make sure this is not used after `take_observations`
30823092
}
30833093

@@ -3404,7 +3414,7 @@ impl AuthorityPerEpochStore {
34043414
mut randomness_manager: Option<&mut RandomnessManager>,
34053415
dkg_failed: bool,
34063416
randomness_round: Option<RandomnessRound>,
3407-
execution_time_estimator: &ExecutionTimeEstimator,
3417+
execution_time_estimator: Option<&ExecutionTimeEstimator>,
34083418
authority_metrics: &Arc<AuthorityMetrics>,
34093419
) -> SuiResult<(
34103420
Vec<VerifiedExecutableTransaction>, // transactions to schedule
@@ -3707,7 +3717,7 @@ impl AuthorityPerEpochStore {
37073717
dkg_failed: bool,
37083718
generating_randomness: bool,
37093719
shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
3710-
execution_time_estimator: &ExecutionTimeEstimator,
3720+
execution_time_estimator: Option<&ExecutionTimeEstimator>,
37113721
authority_metrics: &Arc<AuthorityMetrics>,
37123722
) -> SuiResult<ConsensusCertificateResult> {
37133723
let _scope = monitored_scope("ConsensusCommitHandler::process_consensus_transaction");
@@ -3979,7 +3989,7 @@ impl AuthorityPerEpochStore {
39793989
dkg_failed: bool,
39803990
generating_randomness: bool,
39813991
shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
3982-
execution_time_estimator: &ExecutionTimeEstimator,
3992+
execution_time_estimator: Option<&ExecutionTimeEstimator>,
39833993
authority_metrics: &Arc<AuthorityMetrics>,
39843994
) -> SuiResult<ConsensusCertificateResult> {
39853995
let _scope = monitored_scope("ConsensusCommitHandler::process_consensus_user_transaction");

0 commit comments

Comments
 (0)