Skip to content

Commit f8de064

Browse files
committed
Add debt-based trigger for observation sharing
Without this, some entrypoints could get stuck at high estimates with no way of adjusting back down.
1 parent 6f0e9c0 commit f8de064

File tree

3 files changed

+72
-19
lines changed

3 files changed

+72
-19
lines changed

crates/sui-config/src/node.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,11 @@ pub struct ExecutionTimeObserverConfig {
220220
/// If unspecified, this will default to `10_000`.
221221
pub observation_cache_size: Option<NonZeroUsize>,
222222

223+
/// Size of the channel used for buffering object debt updates from consensus handler.
224+
///
225+
/// If unspecified, this will default to `128`.
226+
pub object_debt_channel_capacity: Option<NonZeroUsize>,
227+
223228
/// Size of the LRU cache used for tracking object utilization.
224229
///
225230
/// If unspecified, this will default to `50_000`.
@@ -264,6 +269,11 @@ impl ExecutionTimeObserverConfig {
264269
self.observation_cache_size.unwrap_or(nonzero!(10_000usize))
265270
}
266271

272+
pub fn object_debt_channel_capacity(&self) -> NonZeroUsize {
273+
self.object_debt_channel_capacity
274+
.unwrap_or(nonzero!(128usize))
275+
}
276+
267277
pub fn object_utilization_cache_size(&self) -> NonZeroUsize {
268278
self.object_utilization_cache_size
269279
.unwrap_or(nonzero!(50_000usize))

crates/sui-core/src/authority/authority_per_epoch_store.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ pub struct AuthorityPerEpochStore {
407407
execution_time_estimator: tokio::sync::Mutex<Option<ExecutionTimeEstimator>>,
408408
tx_local_execution_time:
409409
OnceCell<mpsc::Sender<(ProgrammableTransaction, Vec<ExecutionTiming>, Duration)>>,
410+
tx_object_debts: OnceCell<mpsc::Sender<Vec<ObjectID>>>,
410411
// Saved at end of epoch for propagating observations to the next.
411412
end_of_epoch_execution_time_observations: OnceCell<StoredExecutionTimeObservations>,
412413
}
@@ -948,6 +949,7 @@ impl AuthorityPerEpochStore {
948949
randomness_reporter: OnceCell::new(),
949950
execution_time_estimator: tokio::sync::Mutex::new(execution_time_estimator),
950951
tx_local_execution_time: OnceCell::new(),
952+
tx_object_debts: OnceCell::new(),
951953
end_of_epoch_execution_time_observations: OnceCell::new(),
952954
});
953955

@@ -1199,19 +1201,23 @@ impl AuthorityPerEpochStore {
11991201
&self.execution_component.executor
12001202
}
12011203

1202-
pub fn set_local_execution_time_channel(
1204+
pub fn set_local_execution_time_channels(
12031205
&self,
12041206
tx_local_execution_time: mpsc::Sender<(
12051207
ProgrammableTransaction,
12061208
Vec<ExecutionTiming>,
12071209
Duration,
12081210
)>,
1211+
tx_object_debts: mpsc::Sender<Vec<ObjectID>>,
12091212
) {
12101213
if let Err(e) = self.tx_local_execution_time.set(tx_local_execution_time) {
12111214
debug_fatal!(
12121215
"failed to set tx_local_execution_time channel on AuthorityPerEpochStore: {e:?}"
12131216
);
12141217
}
1218+
if let Err(e) = self.tx_object_debts.set(tx_object_debts) {
1219+
debug_fatal!("failed to set tx_object_debts channel on AuthorityPerEpochStore: {e:?}");
1220+
}
12151221
}
12161222

12171223
pub fn record_local_execution_time(
@@ -3566,13 +3572,23 @@ impl AuthorityPerEpochStore {
35663572
.with_label_values(&["randomness_commit"])
35673573
.set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
35683574

3569-
output.set_congestion_control_object_debts(
3570-
shared_object_congestion_tracker.accumulated_debts(consensus_commit_info),
3571-
);
3572-
output.set_congestion_control_randomness_object_debts(
3573-
shared_object_using_randomness_congestion_tracker
3574-
.accumulated_debts(consensus_commit_info),
3575-
);
3575+
let object_debts =
3576+
shared_object_congestion_tracker.accumulated_debts(consensus_commit_info);
3577+
let randomness_object_debts = shared_object_using_randomness_congestion_tracker
3578+
.accumulated_debts(consensus_commit_info);
3579+
if let Some(tx_object_debts) = self.tx_object_debts.get() {
3580+
if let Err(e) = tx_object_debts.try_send(
3581+
object_debts
3582+
.iter()
3583+
.map(|(id, _)| *id)
3584+
.chain(randomness_object_debts.iter().map(|(id, _)| *id))
3585+
.collect(),
3586+
) {
3587+
info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
3588+
}
3589+
}
3590+
output.set_congestion_control_object_debts(object_debts);
3591+
output.set_congestion_control_randomness_object_debts(randomness_object_debts);
35763592

35773593
if randomness_state_updated {
35783594
if let Some(randomness_manager) = randomness_manager.as_mut() {

crates/sui-core/src/authority/execution_time_estimator.rs

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ pub struct ExecutionTimeObserver {
5151
// via consensus.
5252
object_utilization_tracker: LruCache<ObjectID, ObjectUtilization>,
5353

54+
// Sorted list of recently indebted objects, updated by consensus handler.
55+
indebted_objects: Vec<ObjectID>,
56+
5457
sharing_rate_limiter: RateLimiter<
5558
governor::state::NotKeyed,
5659
governor::state::InMemoryState,
@@ -87,14 +90,17 @@ impl ExecutionTimeObserver {
8790

8891
let (tx_local_execution_time, mut rx_local_execution_time) =
8992
mpsc::channel(config.observation_channel_capacity().into());
90-
epoch_store.set_local_execution_time_channel(tx_local_execution_time);
93+
let (tx_object_debts, mut rx_object_debts) =
94+
mpsc::channel(config.object_debt_channel_capacity().into());
95+
epoch_store.set_local_execution_time_channels(tx_local_execution_time, tx_object_debts);
9196

9297
// TODO: pre-populate local observations with stored data from prior epoch.
9398
let mut observer = Self {
9499
epoch_store: Arc::downgrade(&epoch_store),
95100
consensus_adapter,
96101
local_observations: LruCache::new(config.observation_cache_size()),
97102
object_utilization_tracker: LruCache::new(config.object_utilization_cache_size()),
103+
indebted_objects: Vec::new(),
98104
sharing_rate_limiter: RateLimiter::direct(
99105
Quota::per_second(config.observation_sharing_rate_limit())
100106
.allow_burst(config.observation_sharing_burst_limit()),
@@ -103,10 +109,19 @@ impl ExecutionTimeObserver {
103109
config,
104110
};
105111
spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
106-
while let Some((tx, timings, total_duration)) = rx_local_execution_time.recv().await {
107-
observer
108-
.record_local_observations(&tx, &timings, total_duration)
109-
.await;
112+
loop {
113+
tokio::select! {
114+
// TODO: add metrics for messages received.
115+
Some(object_debts) = rx_object_debts.recv() => {
116+
observer.update_indebted_objects(object_debts);
117+
}
118+
Some((tx, timings, total_duration)) = rx_local_execution_time.recv() => {
119+
observer
120+
.record_local_observations(&tx, &timings, total_duration)
121+
.await;
122+
}
123+
else => { break }
124+
}
110125
}
111126
info!("shutting down ExecutionTimeObserver");
112127
}));
@@ -136,6 +151,7 @@ impl ExecutionTimeObserver {
136151
},
137152
local_observations: LruCache::new(NonZeroUsize::new(10000).unwrap()),
138153
object_utilization_tracker: LruCache::new(NonZeroUsize::new(50000).unwrap()),
154+
indebted_objects: Vec::new(),
139155
sharing_rate_limiter: RateLimiter::direct(Quota::per_hour(std::num::NonZeroU32::MAX)),
140156
}
141157
}
@@ -154,12 +170,19 @@ impl ExecutionTimeObserver {
154170

155171
assert!(tx.commands.len() >= timings.len());
156172

173+
let mut uses_indebted_object = false;
174+
157175
// Update the accumulated excess execution time for each mutable shared object
158176
// used in this transaction, and determine the max overage.
159177
let max_excess_per_object_execution_time = tx
160178
.shared_input_objects()
161179
.filter_map(|obj| obj.mutable.then_some(obj.id))
162180
.map(|id| {
181+
// Mark if any object used in the tx is indebted.
182+
if !uses_indebted_object && self.indebted_objects.binary_search(&id).is_ok() {
183+
uses_indebted_object = true;
184+
}
185+
163186
// For each object:
164187
// - add the execution time of the current transaction to the tracker
165188
// - subtract the maximum amount of time available for execution according
@@ -181,11 +204,9 @@ impl ExecutionTimeObserver {
181204
utilization.excess_execution_time.saturating_sub(
182205
utilization
183206
.last_measured
184-
.map(|last_measured| {
185-
now.duration_since(last_measured)
186-
.mul_f64(self.protocol_params.target_utilization as f64 / 100.0)
187-
})
188-
.unwrap_or(Duration::MAX),
207+
.map(|last_measured| now.duration_since(last_measured))
208+
.unwrap_or(Duration::MAX)
209+
.mul_f64(self.protocol_params.target_utilization as f64 / 100.0),
189210
);
190211
utilization.last_measured = Some(now);
191212
utilization.excess_execution_time
@@ -252,7 +273,7 @@ impl ExecutionTimeObserver {
252273
>= self
253274
.config
254275
.observation_sharing_object_utilization_threshold();
255-
if diff_exceeds_threshold && utilization_exceeds_threshold {
276+
if diff_exceeds_threshold && (utilization_exceeds_threshold || uses_indebted_object) {
256277
debug!("sharing new execution time observation for {key:?}: {new_average:?}");
257278
to_share.push((key, new_average));
258279
local_observation.last_shared = Some((new_average, Instant::now()));
@@ -297,6 +318,12 @@ impl ExecutionTimeObserver {
297318
}
298319
}
299320
}
321+
322+
fn update_indebted_objects(&mut self, mut object_debts: Vec<ObjectID>) {
323+
object_debts.sort_unstable();
324+
object_debts.dedup();
325+
self.indebted_objects = object_debts;
326+
}
300327
}
301328

302329
// Key used to save StoredExecutionTimeObservations in the Sui system state object's

0 commit comments

Comments
 (0)