Skip to content

Commit

Permalink
Use extendAndPrune and delete old checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
HeinrichApfelmus committed Jun 7, 2023
1 parent 6de3c53 commit bce22c4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 479 deletions.
32 changes: 13 additions & 19 deletions lib/wallet/src/Cardano/Wallet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ import Cardano.Wallet.Address.Keys.WalletKey
import Cardano.Wallet.Address.States.IsOwned
( isOwned )
import Cardano.Wallet.Checkpoints
( DeltaCheckpoints (..), extendCheckpoints, pruneCheckpoints )
( DeltaCheckpoints (..), extendAndPrune )
import Cardano.Wallet.Checkpoints.Policy
( sparseArithmetic )
import Cardano.Wallet.DB
( DBFresh (..)
, DBLayer (..)
Expand All @@ -349,6 +351,7 @@ import Cardano.Wallet.DB.WalletState
, DeltaWalletState1 (..)
, WalletState (..)
, fromWallet
, getBlockHeight
, getLatest
, getSlot
)
Expand Down Expand Up @@ -1187,22 +1190,16 @@ restoreBlocks ctx tr blocks nodeTip = db & \DBLayer{..} -> atomically $ do
let finalitySlot = nodeTip ^. #slotNo
- stabilityWindowShelley slottingParams

-- Checkpoint deltas
let wcps = snd . fromWallet <$> cps
deltaPutCheckpoints =
extendCheckpoints
epochStability' = fromIntegral $ getQuantity epochStability
deltaCheckpoints wallet =
extendAndPrune
getSlot
(view $ #currentTip . #blockHeight)
epochStability
(nodeTip ^. #blockHeight)
(fromIntegral . getBlockHeight)
(sparseArithmetic epochStability')
(fromIntegral $ getQuantity $ nodeTip ^. #blockHeight)
wcps

deltaPruneCheckpoints wallet =
pruneCheckpoints
(view $ #currentTip . #blockHeight)
epochStability
(localTip ^. #blockHeight)
(wallet ^. #checkpoints)
(checkpoints wallet)

let
-- NOTE: We have to update the 'Prologue' as well,
Expand Down Expand Up @@ -1231,14 +1228,11 @@ restoreBlocks ctx tr blocks nodeTip = db & \DBLayer{..} -> atomically $ do
liftIO $ logDelegation delegation
putDelegationCertificate cert slotNo

Delta.onDBVar walletState $ Delta.update $ \_wallet ->
Delta.onDBVar walletState $ Delta.update $ \wallet ->
deltaPrologue
<> [ UpdateCheckpoints deltaPutCheckpoints ]
<> [ UpdateCheckpoints $ deltaCheckpoints wallet ]
<> deltaPruneSubmissions

Delta.onDBVar walletState $ Delta.update $ \wallet ->
[ UpdateCheckpoints $ deltaPruneCheckpoints wallet ]

liftIO $ do
traceWith tr $ MsgDiscoveredTxs txs
traceWith tr $ MsgDiscoveredTxsContent txs
Expand Down
216 changes: 5 additions & 211 deletions lib/wallet/src/Cardano/Wallet/Checkpoints.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE TypeFamilies #-}

Expand All @@ -25,23 +24,15 @@ module Cardano.Wallet.Checkpoints
, DeltasCheckpoints

-- * Checkpoint hygiene
, BlockHeight
, CheckpointPolicy
, extendAndPrune
, BlockHeight
, extendCheckpoints
, pruneCheckpoints

-- * Checkpoint creation
, SparseCheckpointsConfig (..)
, defaultSparseCheckpointsConfig
, sparseCheckpoints
, gapSize
) where

import Prelude

import Cardano.Wallet.Checkpoints.Policy
( CheckpointPolicy, keepWhereTip )
( BlockHeight, CheckpointPolicy, keepWhereTip )
import Data.Delta
( Delta (..) )
import Data.Generics.Internal.VL.Lens
Expand All @@ -50,18 +41,12 @@ import Data.Map.Strict
( Map )
import Data.Maybe
( fromMaybe )
import Data.Quantity
( Quantity (..) )
import Data.Word
( Word32, Word8 )
import Fmt
( Buildable (..), listF )
import GHC.Generics
( Generic )

import qualified Cardano.Wallet.Checkpoints.Policy as CP
import qualified Cardano.Wallet.Primitive.Types as W
import qualified Data.List as L
import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
Expand Down Expand Up @@ -164,7 +149,6 @@ instance Buildable (DeltaCheckpoints a) where
{-------------------------------------------------------------------------------
Checkpoint hygiene
-------------------------------------------------------------------------------}
type BlockHeight = Quantity "block" Word32

{- Note [extendAndPrune]
Expand All @@ -184,15 +168,16 @@ It's ok to supply a list of new checkpoints that is denser than required.
extendAndPrune
:: (a -> W.Slot)
-- ^ Convert checkpoint to slot.
-> (a -> CP.BlockHeight)
-> (a -> BlockHeight)
-- ^ Convert checkpoint to block height.
-> CheckpointPolicy
-- ^ Policy to use for pruning checkpoints.
-> CP.BlockHeight
-> BlockHeight
-- ^ Current tip of the blockchain,
-- which is *different* from block height of the latest checkpoint.
-> NE.NonEmpty a
-- ^ New checkpoints, ordered by increasing @Slot@.
-- See Note [extendAndPrune].
-> Checkpoints a
-- ^ Current checkpoints.
-> DeltasCheckpoints a
Expand All @@ -211,194 +196,3 @@ extendAndPrune getSlot getHeight policy nodeTip xs (Checkpoints cps) =

willKeep x = isLatest x || keepWhereTip policy (getHeight x) nodeTip
-- We must keep the most recent checkpoint or nothing will be extended

{- Note [Checkpoints-SummaryVsList]
The 'extendCheckpoints' is designed for the case where the blocks are
given as a 'List', not as a 'Summary'.
In this 'Summary' case, it could happen that the current
scheme fails to create sufficiently many checkpoint as
it was never able to touch the corresponding block.
For now, we avoid this situation by being always supplied a 'List'
in the unstable region close to the tip.
Another solution is to use 'nextCheckpoint' from the
'CheckpointPolicy' in order to drive the checkpoint collection in 'Summary'.
-}

-- | Extend the known checkpoints.
extendCheckpoints
:: (a -> W.Slot)
-- ^ Convert checkpoint to slot.
-> (a -> BlockHeight)
-- ^ Convert checkpoint to block height.
-> BlockHeight
-- ^ Epoch stability window = length of the deepest rollback.
-> BlockHeight
-- ^ Current tip of the blockchain,
-- which is *different* from block height of the latest checkpoint.
-> NE.NonEmpty a
-- ^ New checkpoints, ordered by increasing @Slot@.
-> DeltasCheckpoints a
extendCheckpoints getSlot getBlockHeight epochStability nodeTip cps =
reverse
[ PutCheckpoint (getSlot wcp) wcp
| wcp <- cpsKeep
]
where
unstable = Set.map Quantity $ Set.fromList $ sparseCheckpoints cfg nodeTip
where
-- NOTE
-- The edge really is an optimization to avoid rolling back too
-- "far" in the past. Yet, we let the edge construct itself
-- organically once we reach the tip of the chain and start
-- processing blocks one by one.
--
-- This prevents the wallet from trying to create too many
-- checkpoints at once during restoration which causes massive
-- performance degradation on large wallets.
--
-- Rollback may still occur during this short period, but
-- rolling back from a few hundred blocks is relatively fast
-- anyway.
cfg = (defaultSparseCheckpointsConfig epochStability) { edgeSize = 0 }
willKeep cp = getBlockHeight cp `Set.member` unstable
cpsKeep = filter willKeep (NE.init cps) <> [NE.last cps]

-- | Compute a delta to prune the 'Checkpoints'
-- according to 'defaultSparseCheckpointsConfig'.
pruneCheckpoints
:: (a -> BlockHeight)
-- ^ Retrieve 'BlockHeight' from checkpoint data.
-> BlockHeight
-- ^ Epoch stability window = length of the deepest rollback.
-> BlockHeight
-- ^ Block height of the latest checkpoint.
-> Checkpoints a
-> DeltasCheckpoints a
pruneCheckpoints getHeight epochStability tip (Checkpoints cps) =
[ RestrictTo slots ]
where
willKeep cp = getQuantity (getHeight cp) `Set.member` heights
slots = Map.keys $ Map.filter willKeep cps
heights = Set.fromList $ sparseCheckpoints
(defaultSparseCheckpointsConfig epochStability)
tip

{-------------------------------------------------------------------------------
Checkpoint creation
-------------------------------------------------------------------------------}
-- | Storing EVERY checkpoints in the database is quite expensive and useless.
-- We make the following assumptions:
--
-- - We can't rollback for more than `k=epochStability` blocks in the past
-- - It is pretty fast to re-sync a few hundred blocks
-- - Small rollbacks may occur more often than deep ones
--
-- So, as we insert checkpoints, we make sure to:
--
-- - Prune any checkpoint that more than `k` blocks in the past
-- - Keep only one checkpoint every 100 blocks
-- - But still keep ~10 most recent checkpoints to cope with small rollbacks
--
-- __Example 1__: Inserting `cp153`
--
-- ℹ: `cp142` is discarded and `cp153` inserted.
--
-- @
-- Currently in DB:
-- ┌───┬───┬───┬─ ──┬───┐
-- │cp000 │cp100 │cp142 │.. ..│cp152 │
-- └───┴───┴───┴─ ──┴───┘
-- Want in DB:
-- ┌───┬───┬───┬─ ──┬───┐
-- │cp000 │cp100 │cp143 │.. ..│cp153 │
-- └───┴───┴───┴─ ──┴───┘
-- @
--
--
-- __Example 2__: Inserting `cp111`
--
-- ℹ: `cp100` is kept and `cp111` inserted.
--
-- @
-- Currently in DB:
-- ┌───┬───┬───┬─ ──┬───┐
-- │cp000 │cp100 │cp101 │.. ..│cp110 │
-- └───┴───┴───┴─ ──┴───┘
-- Want in DB:
-- ┌───┬───┬───┬─ ──┬───┐
-- │cp000 │cp100 │cp101 │.. ..│cp111 │
-- └───┴───┴───┴─ ──┴───┘
-- @
--
-- NOTE: There might be cases where the chain following "fails" (because, for
-- example, the node has switched to a different chain, different by more than k),
-- and in such cases, we have no choice but rolling back from genesis.
-- Therefore, we need to keep the very first checkpoint in the database, no
-- matter what.
sparseCheckpoints
:: SparseCheckpointsConfig
-- ^ Parameters for the function.
-> BlockHeight
-- ^ A given block height
-> [Word32]
-- ^ The list of checkpoint heights that should be kept in DB.
sparseCheckpoints cfg blkH =
let
SparseCheckpointsConfig{edgeSize,epochStability} = cfg
g = gapSize cfg
h = getQuantity blkH
e = fromIntegral edgeSize

minH =
let x = if h < epochStability + g then 0 else h - epochStability - g
in g * (x `div` g)

initial = 0
longTerm = [minH,minH+g..h]
shortTerm = if h < e
then [0..h]
else [h-e,h-e+1..h]
in
L.sort (L.nub $ initial : (longTerm ++ shortTerm))

-- | Captures the configuration for the `sparseCheckpoints` function.
--
-- NOTE: large values of 'edgeSize' aren't recommended as they would mean
-- storing many unnecessary checkpoints. In Ouroboros Praos, there's a
-- reasonable probability for small forks each a few blocks deep so it makes sense to
-- maintain a small part that is denser near the edge.
data SparseCheckpointsConfig = SparseCheckpointsConfig
{ edgeSize :: Word8
, epochStability :: Word32
} deriving Show

-- | A sensible default to use in production. See also 'SparseCheckpointsConfig'
defaultSparseCheckpointsConfig :: BlockHeight -> SparseCheckpointsConfig
defaultSparseCheckpointsConfig (Quantity epochStability) =
SparseCheckpointsConfig
{ edgeSize = 5
, epochStability
}

-- | A reasonable gap size used internally in 'sparseCheckpoints'.
--
-- 'Reasonable' means that it's not _too frequent_ and it's not too large. A
-- value that is too small in front of k would require generating much more
-- checkpoints than necessary.
--
-- A value that is larger than `k` may have dramatic consequences in case of
-- deep rollbacks.
--
-- As a middle ground, we current choose `k / 3`, which is justified by:
--
-- - The current speed of the network layer (several thousands blocks per seconds)
-- - The current value of k = 2160
--
-- So, `k / 3` = 720, which should remain around a second of time needed to catch
-- up in case of large rollbacks.
gapSize :: SparseCheckpointsConfig -> Word32
gapSize SparseCheckpointsConfig{epochStability} =
epochStability `div` 3
Loading

0 comments on commit bce22c4

Please sign in to comment.