Skip to content

Commit 5ee2b92

Browse files
authored
feat: more lru cache metrics (#5848)
1 parent 095bf1e commit 5ee2b92

File tree

10 files changed

+88
-69
lines changed

10 files changed

+88
-69
lines changed

src/beacon/beacon_entries.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use crate::utils::encoding::serde_byte_array;
55
use byteorder::{BigEndian, ByteOrder as _};
66
use digest::Digest as _;
7+
use get_size2::GetSize;
78
use serde_tuple::{self, Deserialize_tuple, Serialize_tuple};
89

910
/// The result from getting an entry from `Drand`.
@@ -12,7 +13,17 @@ use serde_tuple::{self, Deserialize_tuple, Serialize_tuple};
1213
/// This beacon entry is stored on chain in the block header.
1314
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
1415
#[derive(
15-
Clone, Debug, Default, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize_tuple, Deserialize_tuple,
16+
Clone,
17+
Debug,
18+
Default,
19+
Eq,
20+
PartialEq,
21+
Hash,
22+
Ord,
23+
PartialOrd,
24+
Serialize_tuple,
25+
Deserialize_tuple,
26+
GetSize,
1627
)]
1728
pub struct BeaconEntry {
1829
round: u64,

src/beacon/drand.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,13 @@ use super::{
1212
};
1313
use crate::shim::clock::ChainEpoch;
1414
use crate::shim::version::NetworkVersion;
15+
use crate::utils::cache::SizeTrackingLruCache;
1516
use crate::utils::net::global_http_client;
1617
use anyhow::Context as _;
1718
use async_trait::async_trait;
1819
use backon::{ExponentialBuilder, Retryable};
1920
use bls_signatures::Serialize as _;
2021
use itertools::Itertools as _;
21-
use lru::LruCache;
22-
use parking_lot::RwLock;
2322
use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
2423
use tracing::debug;
2524
use url::Url;
@@ -244,7 +243,7 @@ pub struct DrandBeacon {
244243
fil_round_time: u64,
245244

246245
/// Keeps track of verified beacon entries.
247-
verified_beacons: RwLock<LruCache<u64, BeaconEntry>>,
246+
verified_beacons: SizeTrackingLruCache<u64, BeaconEntry>,
248247
}
249248

250249
impl DrandBeacon {
@@ -262,14 +261,15 @@ impl DrandBeacon {
262261
drand_gen_time: config.chain_info.genesis_time as u64,
263262
fil_round_time: interval,
264263
fil_gen_time: genesis_ts,
265-
verified_beacons: RwLock::new(LruCache::new(
264+
verified_beacons: SizeTrackingLruCache::new_with_default_metrics_registry(
265+
"verified_beacons_cache".into(),
266266
NonZeroUsize::new(CACHE_SIZE).expect("Infallible"),
267-
)),
267+
),
268268
}
269269
}
270270

271271
fn is_verified(&self, entry: &BeaconEntry) -> bool {
272-
let cache = self.verified_beacons.read();
272+
let cache = self.verified_beacons.cache().read();
273273
cache.peek(&entry.round()) == Some(entry)
274274
}
275275
}
@@ -333,20 +333,20 @@ impl Beacon for DrandBeacon {
333333
};
334334

335335
if is_valid && !validated.is_empty() {
336-
let mut cache = self.verified_beacons.write();
337-
if cache.cap().get() < validated.len() {
338-
tracing::warn!(cap=%cache.cap().get(), validated_len=%validated.len(), "verified_beacons.cap() is too small");
336+
let cap = self.verified_beacons.cap();
337+
if cap < validated.len() {
338+
tracing::warn!(%cap, validated_len=%validated.len(), "verified_beacons.cap() is too small");
339339
}
340340
for entry in validated {
341-
cache.put(entry.round(), entry.clone());
341+
self.verified_beacons.push(entry.round(), entry.clone());
342342
}
343343
}
344344

345345
Ok(is_valid)
346346
}
347347

348348
async fn entry(&self, round: u64) -> anyhow::Result<BeaconEntry> {
349-
let cached: Option<BeaconEntry> = self.verified_beacons.read().peek(&round).cloned();
349+
let cached: Option<BeaconEntry> = self.verified_beacons.peek_cloned(&round);
350350
match cached {
351351
Some(cached_entry) => Ok(cached_entry),
352352
None => {

src/message_pool/msgpool/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
1616
use crate::message::{Message as MessageTrait, SignedMessage};
1717
use crate::networks::ChainConfig;
1818
use crate::shim::{address::Address, crypto::Signature};
19+
use crate::utils::cache::SizeTrackingLruCache;
20+
use crate::utils::get_size::CidWrapper;
1921
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
2022
use cid::Cid;
2123
use fvm_ipld_encoding::to_vec;
22-
use lru::LruCache;
2324
use parking_lot::{Mutex, RwLock as SyncRwLock};
2425
use tracing::error;
2526
use utils::{get_base_fee_lower_bound, recover_sig};
@@ -211,7 +212,7 @@ where
211212
#[allow(clippy::too_many_arguments)]
212213
pub async fn head_change<T>(
213214
api: &T,
214-
bls_sig_cache: &Mutex<LruCache<Cid, Signature>>,
215+
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
215216
repub_trigger: Arc<flume::Sender<()>>,
216217
republished: &SyncRwLock<HashSet<Cid>>,
217218
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
@@ -233,7 +234,7 @@ where
233234
let (umsg, smsgs) = api.messages_for_block(block)?;
234235
msgs.extend(smsgs);
235236
for msg in umsg {
236-
let smsg = recover_sig(&mut bls_sig_cache.lock(), msg)?;
237+
let smsg = recover_sig(bls_sig_cache, msg)?;
237238
msgs.push(smsg)
238239
}
239240
}
@@ -420,7 +421,7 @@ pub mod tests {
420421
let sig = Signature::new_secp256k1(vec![]);
421422
let signed = SignedMessage::new_unchecked(umsg, sig);
422423
let cid = signed.cid();
423-
pool.sig_val_cache.lock().put(cid, ());
424+
pool.sig_val_cache.push(cid.into(), ());
424425
signed
425426
}
426427

src/message_pool/msgpool/msg_pool.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@ use crate::shim::{
2323
gas::{Gas, price_list_by_network_version},
2424
};
2525
use crate::state_manager::is_valid_for_sending;
26+
use crate::utils::cache::SizeTrackingLruCache;
27+
use crate::utils::get_size::CidWrapper;
2628
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
2729
use anyhow::Context as _;
2830
use cid::Cid;
2931
use futures::StreamExt;
3032
use fvm_ipld_encoding::to_vec;
3133
use itertools::Itertools;
32-
use lru::LruCache;
3334
use nonzero_ext::nonzero;
3435
use parking_lot::{Mutex, RwLock as SyncRwLock};
3536
use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
@@ -182,9 +183,9 @@ pub struct MessagePool<T> {
182183
/// Sender half to send messages to other components
183184
pub network_sender: flume::Sender<NetworkMessage>,
184185
/// A cache for BLS signature keyed by Cid
185-
pub bls_sig_cache: Arc<Mutex<LruCache<Cid, Signature>>>,
186+
pub bls_sig_cache: Arc<SizeTrackingLruCache<CidWrapper, Signature>>,
186187
/// A cache for BLS signature keyed by Cid
187-
pub sig_val_cache: Arc<Mutex<LruCache<Cid, ()>>>,
188+
pub sig_val_cache: Arc<SizeTrackingLruCache<CidWrapper, ()>>,
188189
/// A set of republished messages identified by their Cid
189190
pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
190191
/// Acts as a signal to republish messages from the republished set of
@@ -261,14 +262,14 @@ where
261262
fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> {
262263
let cid = msg.cid();
263264

264-
if let Some(()) = self.sig_val_cache.lock().get(&cid) {
265+
if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) {
265266
return Ok(());
266267
}
267268

268269
msg.verify(self.chain_config.eth_chain_id)
269270
.map_err(|e| Error::Other(e.to_string()))?;
270271

271-
self.sig_val_cache.lock().put(cid, ());
272+
self.sig_val_cache.push(cid.into(), ());
272273

273274
Ok(())
274275
}
@@ -413,7 +414,7 @@ where
413414

414415
msg_vec.append(smsgs.as_mut());
415416
for msg in umsg {
416-
let smsg = recover_sig(&mut self.bls_sig_cache.lock(), msg)?;
417+
let smsg = recover_sig(self.bls_sig_cache.as_ref(), msg)?;
417418
msg_vec.push(smsg)
418419
}
419420
}
@@ -471,8 +472,14 @@ where
471472
let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
472473
let pending = Arc::new(SyncRwLock::new(HashMap::new()));
473474
let tipset = Arc::new(Mutex::new(api.get_heaviest_tipset()));
474-
let bls_sig_cache = Arc::new(Mutex::new(LruCache::new(BLS_SIG_CACHE_SIZE)));
475-
let sig_val_cache = Arc::new(Mutex::new(LruCache::new(SIG_VAL_CACHE_SIZE)));
475+
let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_default_metrics_registry(
476+
"bls_sig_cache".into(),
477+
BLS_SIG_CACHE_SIZE,
478+
));
479+
let sig_val_cache = Arc::new(SizeTrackingLruCache::new_with_default_metrics_registry(
480+
"sig_val_cache".into(),
481+
SIG_VAL_CACHE_SIZE,
482+
));
476483
let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
477484
let republished = Arc::new(SyncRwLock::new(HashSet::new()));
478485
let block_delay = chain_config.block_delay_secs;
@@ -583,7 +590,7 @@ where
583590
/// hash-map.
584591
pub(in crate::message_pool) fn add_helper<T>(
585592
api: &T,
586-
bls_sig_cache: &Mutex<LruCache<Cid, Signature>>,
593+
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
587594
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
588595
msg: SignedMessage,
589596
sequence: u64,
@@ -592,7 +599,7 @@ where
592599
T: Provider,
593600
{
594601
if msg.signature().signature_type() == SignatureType::Bls {
595-
bls_sig_cache.lock().put(msg.cid(), msg.signature().clone());
602+
bls_sig_cache.push(msg.cid().into(), msg.signature().clone());
596603
}
597604

598605
if msg.message().gas_limit > 100_000_000 {

src/message_pool/msgpool/utils.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
use crate::chain::MINIMUM_BASE_FEE;
55
use crate::message::{Message as MessageTrait, SignedMessage};
66
use crate::shim::{crypto::Signature, econ::TokenAmount, message::Message};
7-
use cid::Cid;
8-
use lru::LruCache;
7+
use crate::utils::cache::SizeTrackingLruCache;
8+
use crate::utils::get_size::CidWrapper;
99
use num_rational::BigRational;
1010
use num_traits::ToPrimitive;
1111

@@ -46,12 +46,12 @@ pub(in crate::message_pool) fn get_gas_perf(gas_reward: &TokenAmount, gas_limit:
4646
/// Attempt to get a signed message that corresponds to an unsigned message in
4747
/// `bls_sig_cache`.
4848
pub(in crate::message_pool) fn recover_sig(
49-
bls_sig_cache: &mut LruCache<Cid, Signature>,
49+
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
5050
msg: Message,
5151
) -> Result<SignedMessage, Error> {
5252
let val = bls_sig_cache
53-
.get(&msg.cid())
53+
.get_cloned(&(msg.cid()).into())
5454
.ok_or_else(|| Error::Other("Could not recover sig".to_owned()))?;
55-
let smsg = SignedMessage::new_from_parts(msg, val.clone())?;
55+
let smsg = SignedMessage::new_from_parts(msg, val)?;
5656
Ok(smsg)
5757
}

src/rpc/methods/f3.rs

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ pub use self::types::{
1515
};
1616
use self::{types::*, util::*};
1717
use super::wallet::WalletSign;
18+
use crate::shim::actors::{
19+
convert::{
20+
from_policy_v13_to_v9, from_policy_v13_to_v10, from_policy_v13_to_v11,
21+
from_policy_v13_to_v12, from_policy_v13_to_v14, from_policy_v13_to_v15,
22+
from_policy_v13_to_v16,
23+
},
24+
miner, power,
25+
};
1826
use crate::{
1927
blocks::Tipset,
2028
chain::index::ResolveNullTipset,
@@ -33,24 +41,12 @@ use crate::{
3341
},
3442
utils::misc::env::is_env_set_and_truthy,
3543
};
36-
use crate::{
37-
blocks::TipsetKey,
38-
shim::actors::{
39-
convert::{
40-
from_policy_v13_to_v9, from_policy_v13_to_v10, from_policy_v13_to_v11,
41-
from_policy_v13_to_v12, from_policy_v13_to_v14, from_policy_v13_to_v15,
42-
from_policy_v13_to_v16,
43-
},
44-
miner, power,
45-
},
46-
};
4744
use ahash::{HashMap, HashSet};
4845
use anyhow::Context as _;
4946
use enumflags2::BitFlags;
5047
use fvm_ipld_blockstore::Blockstore;
5148
use jsonrpsee::core::{client::ClientT as _, params::ArrayParams};
5249
use libp2p::PeerId;
53-
use lru::LruCache;
5450
use num::Signed as _;
5551
use parking_lot::RwLock;
5652
use std::{
@@ -473,21 +469,11 @@ impl RpcMethod<1> for GetPowerTable {
473469
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
474470
(f3_tsk,): Self::Params,
475471
) -> Result<Self::Ok, ServerError> {
476-
static CACHE: LazyLock<tokio::sync::Mutex<LruCache<TipsetKey, Vec<F3PowerEntry>>>> =
477-
LazyLock::new(|| {
478-
tokio::sync::Mutex::new(LruCache::new(32.try_into().expect("Infallible")))
479-
});
480472
let tsk = f3_tsk.try_into()?;
481-
let mut cache = CACHE.lock().await;
482-
if let Some(v) = cache.get(&tsk) {
483-
return Ok(v.clone());
484-
}
485-
486473
let start = std::time::Instant::now();
487474
let ts = ctx.chain_index().load_required_tipset(&tsk)?;
488475
let power_entries = Self::compute(&ctx, &ts).await?;
489476
tracing::debug!(epoch=%ts.epoch(), %tsk, "F3.GetPowerTable, took {}", humantime::format_duration(start.elapsed()));
490-
cache.push(tsk, power_entries.clone());
491477
Ok(power_entries)
492478
}
493479
}

src/shim/crypto.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@ use fvm_ipld_encoding::{
1515
ser, strict_bytes,
1616
};
1717
pub use fvm_shared3::TICKET_RANDOMNESS_LOOKBACK;
18+
use get_size2::GetSize;
1819
use num::FromPrimitive;
1920
use num_derive::FromPrimitive;
2021
use schemars::JsonSchema;
2122
use std::borrow::Cow;
2223

2324
/// A cryptographic signature, represented in bytes, of any key protocol.
24-
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
25+
#[derive(Clone, Debug, PartialEq, Eq, Hash, GetSize)]
2526
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
2627
pub struct Signature {
2728
pub sig_type: SignatureType,
@@ -307,6 +308,7 @@ pub fn cid_to_replica_commitment_v1(c: &Cid) -> Result<Commitment, &'static str>
307308
strum::Display,
308309
strum::EnumString,
309310
JsonSchema,
311+
GetSize,
310312
)]
311313
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
312314
#[repr(u8)]

0 commit comments

Comments
 (0)