From a878c90c30d0ba4d79fae8beafc57890668e8e05 Mon Sep 17 00:00:00 2001 From: Alexander Esgen Date: Mon, 30 Jun 2025 10:01:48 +0200 Subject: [PATCH 1/4] Remove LedgerDB.SnapshotPolicy test Superseded by the rework of the snapshot policy for predictable snapshots, with dedicated new tests --- ouroboros-consensus/ouroboros-consensus.cabal | 3 +- .../Test/Ouroboros/Storage/LedgerDB.hs | 4 +- .../Storage/LedgerDB/SnapshotPolicy.hs | 292 ------------------ 3 files changed, 2 insertions(+), 297 deletions(-) delete mode 100644 ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/SnapshotPolicy.hs diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 9396b428bb..44a679ac11 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -774,7 +774,6 @@ test-suite storage-test Test.Ouroboros.Storage.ImmutableDB.StateMachine Test.Ouroboros.Storage.LedgerDB Test.Ouroboros.Storage.LedgerDB.Serialisation - Test.Ouroboros.Storage.LedgerDB.SnapshotPolicy Test.Ouroboros.Storage.LedgerDB.Snapshots Test.Ouroboros.Storage.LedgerDB.StateMachine Test.Ouroboros.Storage.LedgerDB.StateMachine.TestBlock @@ -800,7 +799,7 @@ test-suite storage-test bytestring, cardano-binary, cardano-ledger-binary:testlib, - cardano-ledger-core:{cardano-ledger-core, testlib}, + cardano-ledger-core:cardano-ledger-core, cardano-slotting:{cardano-slotting, testlib}, cardano-strict-containers, cborg, diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB.hs index cdc2d6418e..3d876520a5 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB.hs @@ -6,7 +6,6 @@ module Test.Ouroboros.Storage.LedgerDB (tests) where import qualified Test.Ouroboros.Storage.LedgerDB.Serialisation as Serialisation -import qualified Test.Ouroboros.Storage.LedgerDB.SnapshotPolicy as SnapshotPolicy import qualified Test.Ouroboros.Storage.LedgerDB.Snapshots as Snapshots import qualified Test.Ouroboros.Storage.LedgerDB.StateMachine as StateMachine import qualified Test.Ouroboros.Storage.LedgerDB.V1.BackingStore as BackingStore @@ -23,8 +22,7 @@ tests = , DbChangelog.tests ] , -- Independent of the LedgerDB implementation - SnapshotPolicy.tests - , Serialisation.tests + Serialisation.tests , Snapshots.tests , -- Tests both V1 and V2 StateMachine.tests diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/SnapshotPolicy.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/SnapshotPolicy.hs deleted file mode 100644 index 7c293beb28..0000000000 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/SnapshotPolicy.hs +++ /dev/null @@ -1,292 +0,0 @@ -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE NumericUnderscores #-} -{-# LANGUAGE TypeApplications #-} - -module Test.Ouroboros.Storage.LedgerDB.SnapshotPolicy (tests) where - -import Cardano.Ledger.BaseTypes (unNonZero) -import Cardano.Ledger.BaseTypes.NonZero (nonZero) -import Data.Time.Clock - ( DiffTime - , diffTimeToPicoseconds - , picosecondsToDiffTime - , secondsToDiffTime - ) -import Data.Word -import Ouroboros.Consensus.Config.SecurityParam (SecurityParam (..)) -import Ouroboros.Consensus.Storage.LedgerDB.Snapshots -import Test.Cardano.Ledger.Core.Arbitrary () -import Test.QuickCheck -import Test.Tasty -import Test.Tasty.QuickCheck - -tests :: TestTree -tests = - testGroup - "SnapshotPolicy" - [ testGroup - "defaultSnapshotPolicy" - [ testProperty "onDiskNumSnapshots" prop_onDiskNumSnapshots - , testProperty "onDiskShouldTakeSnapshot" prop_onDiskShouldTakeSnapshot - ] - ] - -{------------------------------------------------------------------------------- - Test inputs --------------------------------------------------------------------------------} - --- | This contains sufficient inputs for each property in this module. -data TestSetup = TestSetup - { tsBlocksSince :: Word64 - -- ^ argument to 'onDiskShouldTakeSnapshot' - , tsK :: SecurityParam - -- ^ argument to 'defaultSnapshotPolicy' - , tsSnapshotInterval :: SnapshotInterval - -- ^ argument to 'defaultSnapshotPolicy' - , tsTimeSince :: Maybe DiffTime - -- ^ argument to 'onDiskShouldTakeSnapshot' - } - deriving Show - --- | The represented default 'SnapshotPolicy' -toSnapshotPolicy :: TestSetup -> SnapshotPolicy -toSnapshotPolicy ts = defaultSnapshotPolicy (tsK ts) snapshotPolicyArgs - where - snapshotPolicyArgs = - SnapshotPolicyArgs (tsSnapshotInterval ts) DefaultNumOfDiskSnapshots - --- | The result of the represented call to 'onDiskShouldTakeSnapshot' -shouldTakeSnapshot :: TestSetup -> Bool -shouldTakeSnapshot ts = - onDiskShouldTakeSnapshot - (toSnapshotPolicy ts) - (tsTimeSince ts) - (tsBlocksSince ts) - -{------------------------------------------------------------------------------- - Generator and shrinker --------------------------------------------------------------------------------} - -instance Arbitrary TestSetup where - arbitrary = do - k <- - frequency - [ (9, choose (0, 3000)) - , (1, choose (0, maxBound)) - ] - `suchThatMap` nonZero - - -- values within usual expectations - let nominal = - (,) - -- 20 k is average number in a Shelley epoch - <$> choose (0, 20 * unNonZero k) - -- a week is a defensible upper bound on the user input - <*> just95 (chooseSeconds 0 oneWeekInSeconds) - - -- values near known cutoffs - let interesting = - (,) - <$> curry - choose - (minBlocksBeforeSnapshot `div` 2) - (minBlocksBeforeSnapshot * 2) - <*> ( Just - <$> chooseSeconds - (minSecondsBeforeSnapshot `div` 2) - (minSecondsBeforeSnapshot * 2) - ) - - -- all other conceivable values - let wild = - (,) - <$> choose (0, maxBound) - <*> just95 (chooseSeconds 0 oneCenturyInSeconds) - - (b, t) <- - frequency - [ (80, nominal) - , (15, interesting) - , (5, wild) - ] - - -- this argument is provided from node via flag, we must anticipate values - -- to be completely arbitrary. However we still want to keep the distribution - -- of those values in such way that more probable values will be - -- more frequently test - tsSnapshotInterval <- - frequency - [ (45, pure DefaultSnapshotInterval) - , (45, RequestedSnapshotInterval <$> chooseSeconds 0 oneWeekInSeconds) - , (4, RequestedSnapshotInterval <$> chooseSeconds 0 (2 * oneWeekInSeconds)) - , (4, RequestedSnapshotInterval <$> chooseSeconds 0 (3 * oneWeekInSeconds)) - , (1, RequestedSnapshotInterval <$> chooseSeconds 0 (4 * oneWeekInSeconds)) - , (1, RequestedSnapshotInterval <$> chooseSeconds 0 oneCenturyInSeconds) - ] - - pure - TestSetup - { tsBlocksSince = b - , tsK = SecurityParam k - , tsSnapshotInterval - , tsTimeSince = t - } - where - -- 100 years seems a reasonable upper bound for consideration - oneCenturyInSeconds = 100 * 365 * oneDayInSeconds - -- one week seems a reasonable upper bound for relevance - oneWeekInSeconds = 7 * oneDayInSeconds - oneDayInSeconds = 24 * 60 * 60 - - just95 :: Gen a -> Gen (Maybe a) - just95 m = frequency [(5, pure Nothing), (95, Just <$> m)] - - -- both bounds are inclusive and in seconds - chooseSeconds :: Integer -> Integer -> Gen DiffTime - chooseSeconds lo hi = do - -- pick a second - s <- choose (lo, hi) - -- jitter within it - let nines = 10 ^ (12 :: Int) - 1 - offset <- choose (negate nines, nines) - pure $ picosecondsToDiffTime $ max lo $ min hi $ s + offset - - shrink (TestSetup x1 x2 x3 x4) = - mconcat - [ (\y -> TestSetup y x2 x3 x4) <$> shrink @Word64 x1 - , (\y -> TestSetup x1 y x3 x4) <$> shrinkSecurityParam x2 - , (\y -> TestSetup x1 x2 y x4) <$> shrinkSnapshotInterval x3 - , (\y -> TestSetup x1 x2 x3 y) <$> shrinkTSL shrinkDiffTime x4 - ] - where - shrinkSecurityParam = - fmap SecurityParam . shrink {-@(Word64)-} . maxRollbacks - - shrinkDiffTime = - fmap picosecondsToDiffTime - . shrink @Integer - . diffTimeToPicoseconds - - shrinkTSL shnk = \case - Nothing -> [] - Just d -> Nothing : fmap Just (shnk d) - - shrinkSnapshotInterval = \case - DisableSnapshots -> [] - DefaultSnapshotInterval -> [] - RequestedSnapshotInterval d -> - DefaultSnapshotInterval - : (RequestedSnapshotInterval <$> shrinkDiffTime d) - -{------------------------------------------------------------------------------- - Properties --------------------------------------------------------------------------------} - --- | Check 'onDiskNumSnapshots' of 'defaultSnapshotPolicy' -prop_onDiskNumSnapshots :: TestSetup -> Property -prop_onDiskNumSnapshots ts = - -- 'TestSetup' has more information than we need for this property - counterexample "should always be 2" $ - onDiskNumSnapshots (toSnapshotPolicy ts) === 2 - -minBlocksBeforeSnapshot :: Word64 -minBlocksBeforeSnapshot = 50_000 - -minSecondsBeforeSnapshot :: Integer -minSecondsBeforeSnapshot = 6 * 60 - --- | Check 'onDiskShouldTakeSnapshot' of 'defaultSnapshotPolicy' -prop_onDiskShouldTakeSnapshot :: TestSetup -> Property -prop_onDiskShouldTakeSnapshot ts = - counterexample ("decided to take snapshot? " ++ show (shouldTakeSnapshot ts)) $ - case t of - Nothing -> - counterexample "haven't taken a snapshot yet" $ - counterexample "should take snapshot if it processed at least k blocks" $ - shouldTakeSnapshot ts === (blocksSinceLast >= unNonZero k) - Just timeSinceLast -> - counterexample "have previously taken a snapshot" $ - isDisjunctionOf - (shouldTakeSnapshot ts `named` "the decision") - [ systemChecksHowMuchTimeHasPassed timeSinceLast - , systemChecksHowManyBlocksWereProcessed timeSinceLast - ] - where - TestSetup - { tsBlocksSince = blocksSinceLast - , tsK = SecurityParam k - , tsSnapshotInterval = snapshotInterval - , tsTimeSince = t - } = ts - - kTimes2 :: DiffTime - kTimes2 = secondsToDiffTime $ fromIntegral $ unNonZero k * 2 - - systemChecksHowMuchTimeHasPassed :: DiffTime -> NamedValue Bool - systemChecksHowMuchTimeHasPassed timeSinceLast = - case snapshotInterval of - DefaultSnapshotInterval -> - (timeSinceLast >= kTimes2) - `named` "time since last is greater then 2 * k seconds if snapshot interval is set to default" - RequestedSnapshotInterval interval -> - (timeSinceLast >= interval) - `named` "time since last is greater then explicitly requested interval" - DisableSnapshots -> error "Will never call this test with this value" - - systemChecksHowManyBlocksWereProcessed :: DiffTime -> NamedValue Bool - systemChecksHowManyBlocksWereProcessed timeSinceLast = - disjunct `named` msg - where - msg = - unwords - [ "we have processed" - , show minBlocksBeforeSnapshot - , "blocks and it's been more than" - , show minSecondsBeforeSnapshot - , "seconds since last snapshot was taken" - ] - - disjunct = - blocksSinceLast >= minBlocksBeforeSnapshot - && timeSinceLast >= secondsToDiffTime minSecondsBeforeSnapshot - -{------------------------------------------------------------------------------- - Auxiliary -- TODO relocate this somewhere more general --------------------------------------------------------------------------------} - --- | A value with an associated user-friendly string -data NamedValue a = NamedValue String a - -forgetName :: NamedValue a -> a -forgetName (NamedValue _s a) = a - -infix 0 `named` - -named :: a -> String -> NamedValue a -named = flip NamedValue - --- | Use this instead of @x '===' 'or' ys@ to get a 'counterexample' message --- that explains which of the disjuncts were mismatched -isDisjunctionOf :: NamedValue Bool -> [NamedValue Bool] -> Property -isDisjunctionOf (NamedValue s b) ds = - counterexample msg $ b === any forgetName ds - where - msg = - unlines $ - ( show b - <> " for " - <> s - <> ", but the " - <> show (length ds) - <> " disjuncts were: " - ) - : [ " " - <> "disjunct " - <> show (i :: Int) - <> ": " - <> show b' - <> " for " - <> s' - | (i, NamedValue s' b') <- zip [0 ..] ds - ] From 66cd5d76b79556e00365c31e286fd35a898ef84a Mon Sep 17 00:00:00 2001 From: Alexander Esgen Date: Mon, 30 Jun 2025 10:01:54 +0200 Subject: [PATCH 2/4] LedgerDB: implement predictable snapshotting --- ...0_alexander.esgen_predictable_snapshots.md | 6 + .../Consensus/Storage/ChainDB/Impl.hs | 7 +- .../Storage/ChainDB/Impl/Background.hs | 90 ++----- .../Consensus/Storage/LedgerDB/API.hs | 28 +- .../Consensus/Storage/LedgerDB/Snapshots.hs | 240 +++++++++++------- .../Consensus/Storage/LedgerDB/V1.hs | 75 ++++-- .../Consensus/Storage/LedgerDB/V2.hs | 70 ++--- .../Test/Util/ChainDB.hs | 5 +- .../Storage/LedgerDB/StateMachine.hs | 2 +- 9 files changed, 272 insertions(+), 251 deletions(-) create mode 100644 ouroboros-consensus/changelog.d/20250630_100300_alexander.esgen_predictable_snapshots.md diff --git a/ouroboros-consensus/changelog.d/20250630_100300_alexander.esgen_predictable_snapshots.md b/ouroboros-consensus/changelog.d/20250630_100300_alexander.esgen_predictable_snapshots.md new file mode 100644 index 0000000000..e73d60a6ec --- /dev/null +++ b/ouroboros-consensus/changelog.d/20250630_100300_alexander.esgen_predictable_snapshots.md @@ -0,0 +1,6 @@ +### Breaking + +- LedgerDB: implemented *predictable* snapshots, i.e. different nodes with the + same configuration will now create snapshots for the same slots. + + See 'SnapshotPolicyArgs' for more details. diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs index e5f7b21014..6c2ba869e7 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs @@ -164,7 +164,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do traceWith tracer $ TraceOpenEvent StartedOpeningLgrDB (ledgerDbGetVolatileSuffix, setGetCurrentChainForLedgerDB) <- mkLedgerDbGetVolatileSuffix - (lgrDB, replayed) <- + (lgrDB, _replayed) <- LedgerDB.openDB argsLgrDb (ImmutableDB.streamAPI immutableDB) @@ -289,8 +289,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do , intGarbageCollect = \slot -> getEnv h $ \e -> do Background.garbageCollectBlocks e slot LedgerDB.garbageCollect (cdbLedgerDB e) slot - , intTryTakeSnapshot = getEnv h $ \env' -> - void $ LedgerDB.tryTakeSnapshot (cdbLedgerDB env') Nothing maxBound + , intTryTakeSnapshot = getEnv h $ LedgerDB.tryTakeSnapshot . cdbLedgerDB , intAddBlockRunner = getEnv h (Background.addBlockRunner addBlockTestFuse) , intKillBgThreads = varKillBgThreads } @@ -301,7 +300,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do (castPoint $ AF.anchorPoint chain) (castPoint $ AF.headPoint chain) - when launchBgTasks $ Background.launchBgTasks env replayed + when launchBgTasks $ Background.launchBgTasks env return (chainDB, testing, env) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs index 37cfd65e27..4a4e4f32e9 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs @@ -1,13 +1,11 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TupleSections #-} -- | Background tasks: -- @@ -52,7 +50,6 @@ import Data.Sequence.Strict (StrictSeq (..)) import qualified Data.Sequence.Strict as Seq import Data.Time.Clock import Data.Void (Void) -import Data.Word import GHC.Generics (Generic) import GHC.Stack (HasCallStack) import Ouroboros.Consensus.Block @@ -75,7 +72,7 @@ import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB import Ouroboros.Consensus.Util import Ouroboros.Consensus.Util.Condense import Ouroboros.Consensus.Util.IOLike -import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher) +import Ouroboros.Consensus.Util.STM (Watcher (..), blockUntilJust, forkLinkedWatcher) import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..)) import qualified Ouroboros.Network.AnchoredFragment as AF @@ -92,15 +89,13 @@ launchBgTasks :: , HasHardForkHistory blk ) => ChainDbEnv m blk -> - -- | Number of immutable blocks replayed on ledger DB startup - Word64 -> m () -launchBgTasks cdb@CDB{..} replayed = do +launchBgTasks cdb@CDB{..} = do !addBlockThread <- launch "ChainDB.addBlockRunner" $ addBlockRunner cdbChainSelFuse cdb - ledgerDbTasksTrigger <- newLedgerDbTasksTrigger replayed + ledgerDbTasksTrigger <- newLedgerDbTasksTrigger !ledgerDbMaintenaceThread <- forkLinkedWatcher cdbRegistry "ChainDB.ledgerDbTaskWatcher" $ ledgerDbTaskWatcher cdb ledgerDbTasksTrigger @@ -259,12 +254,10 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do copyAndTrigger :: m () copyAndTrigger = do -- Wait for 'cdbChain' to become longer than 'getCurrentChain'. - numToWrite <- atomically $ do + atomically $ do curChain <- icWithoutTime <$> readTVar cdbChain curChainVolSuffix <- Query.getCurrentChain cdb - let numToWrite = AF.length curChain - AF.length curChainVolSuffix - check $ numToWrite > 0 - return $ fromIntegral numToWrite + check $ AF.length curChain > AF.length curChainVolSuffix -- Copy blocks to ImmutableDB -- @@ -272,7 +265,7 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do -- copied to disk (though not flushed, necessarily). gcSlotNo <- withFuse fuse (copyToImmutableDB cdb) - triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo numToWrite + triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo scheduleGC' gcSlotNo scheduleGC' :: WithOrigin SlotNo -> m () @@ -294,29 +287,10 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do -- | Trigger for the LedgerDB maintenance tasks, namely whenever the immutable -- DB tip slot advances when we finish copying blocks to it. newtype LedgerDbTasksTrigger m - = LedgerDbTasksTrigger (StrictTVar m LedgerDbTaskState) + = LedgerDbTasksTrigger (StrictTVar m (WithOrigin SlotNo)) -data LedgerDbTaskState = LedgerDbTaskState - { ldbtsImmTip :: !(WithOrigin SlotNo) - , ldbtsPrevSnapshotTime :: !(Maybe Time) - , ldbtsBlocksSinceLastSnapshot :: !Word64 - } - deriving stock Generic - deriving anyclass NoThunks - -newLedgerDbTasksTrigger :: - IOLike m => - -- | Number of blocks replayed. - Word64 -> - m (LedgerDbTasksTrigger m) -newLedgerDbTasksTrigger replayed = LedgerDbTasksTrigger <$> newTVarIO st - where - st = - LedgerDbTaskState - { ldbtsImmTip = Origin - , ldbtsPrevSnapshotTime = Nothing - , ldbtsBlocksSinceLastSnapshot = replayed - } +newLedgerDbTasksTrigger :: IOLike m => m (LedgerDbTasksTrigger m) +newLedgerDbTasksTrigger = LedgerDbTasksTrigger <$> newTVarIO Origin triggerLedgerDbTasks :: forall m. @@ -324,15 +298,9 @@ triggerLedgerDbTasks :: LedgerDbTasksTrigger m -> -- | New tip of the ImmutableDB. WithOrigin SlotNo -> - -- | Number of blocks written to the ImmutableDB. - Word64 -> m () -triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) immTip numWritten = - atomically $ modifyTVar varSt $ \st -> - st - { ldbtsImmTip = immTip - , ldbtsBlocksSinceLastSnapshot = ldbtsBlocksSinceLastSnapshot st + numWritten - } +triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) = + atomically . writeTVar varSt -- | Run LedgerDB maintenance tasks when 'LedgerDbTasksTrigger' changes. -- @@ -344,38 +312,16 @@ ledgerDbTaskWatcher :: IOLike m => ChainDbEnv m blk -> LedgerDbTasksTrigger m -> - Watcher m LedgerDbTaskState (WithOrigin SlotNo) + Watcher m SlotNo SlotNo ledgerDbTaskWatcher CDB{..} (LedgerDbTasksTrigger varSt) = Watcher - { wFingerprint = ldbtsImmTip + { wFingerprint = id , wInitial = Nothing - , wReader = readTVar varSt - , wNotify = - \LedgerDbTaskState - { ldbtsImmTip - , ldbtsBlocksSinceLastSnapshot = blocksSinceLast - , ldbtsPrevSnapshotTime = prevSnapTime - } -> - whenJust (withOriginToMaybe ldbtsImmTip) $ \slotNo -> do - LedgerDB.tryFlush cdbLedgerDB - - now <- getMonotonicTime - LedgerDB.SnapCounters - { prevSnapshotTime - , ntBlocksSinceLastSnap - } <- - LedgerDB.tryTakeSnapshot - cdbLedgerDB - ((,now) <$> prevSnapTime) - blocksSinceLast - atomically $ modifyTVar varSt $ \st -> - st - { ldbtsBlocksSinceLastSnapshot = - ldbtsBlocksSinceLastSnapshot st - blocksSinceLast + ntBlocksSinceLastSnap - , ldbtsPrevSnapshotTime = prevSnapshotTime - } - - LedgerDB.garbageCollect cdbLedgerDB slotNo + , wReader = blockUntilJust $ withOriginToMaybe <$> readTVar varSt + , wNotify = \slotNo -> do + LedgerDB.tryFlush cdbLedgerDB + LedgerDB.tryTakeSnapshot cdbLedgerDB + LedgerDB.garbageCollect cdbLedgerDB slotNo } {------------------------------------------------------------------------------- diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs index cc26e2f187..d70ff2d1af 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs @@ -147,9 +147,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.API , withPrivateTipForker , withTipForker - -- * Snapshots - , SnapCounters (..) - -- * Streaming , StreamingBackend (..) , Yield @@ -166,7 +163,6 @@ import Codec.CBOR.Decoding import Codec.CBOR.Read import Codec.Serialise import qualified Control.Monad as Monad -import Control.Monad.Class.MonadTime.SI import Control.Monad.Except import Control.ResourceRegistry import Control.Tracer @@ -271,18 +267,12 @@ data LedgerDB m l blk = LedgerDB -- * The set of previously applied points. , tryTakeSnapshot :: l ~ ExtLedgerState blk => - Maybe (Time, Time) -> - Word64 -> - m SnapCounters + m () -- ^ If the provided arguments indicate so (based on the SnapshotPolicy with -- which this LedgerDB was opened), take a snapshot and delete stale ones. -- - -- The arguments are: - -- - -- - If a snapshot has been taken already, the time at which it was taken - -- and the current time. - -- - -- - How many blocks have been processed since the last snapshot. + -- For V1, this must not be called concurrently with 'garbageCollect' and/or + -- 'tryFlush'. , tryFlush :: m () -- ^ Flush V1 in-memory LedgerDB state to disk, if possible. This is a no-op -- for implementations that do not need an explicit flush function. @@ -429,18 +419,6 @@ getReadOnlyForker :: m (Either GetForkerError (ReadOnlyForker m l blk)) getReadOnlyForker ldb rr pt = fmap readOnlyForker <$> getForkerAtTarget ldb rr pt -{------------------------------------------------------------------------------- - Snapshots --------------------------------------------------------------------------------} - --- | Counters to keep track of when we made the last snapshot. -data SnapCounters = SnapCounters - { prevSnapshotTime :: !(Maybe Time) - -- ^ When was the last time we made a snapshot - , ntBlocksSinceLastSnap :: !Word64 - -- ^ How many blocks have we processed since the last snapshot - } - {------------------------------------------------------------------------------- Initialization -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs index 1b5826e2f3..b73a2072f7 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs @@ -4,12 +4,12 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} +{-# LANGUAGE ViewPatterns #-} -- | Snapshots -- @@ -44,7 +44,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.Snapshots CRCError (..) , DiskSnapshot (..) , MetadataErr (..) - , NumOfDiskSnapshots (..) , ReadSnapshotErr (..) , SnapshotBackend (..) , SnapshotFailure (..) @@ -75,8 +74,10 @@ module Ouroboros.Consensus.Storage.LedgerDB.Snapshots , writeSnapshotMetadata -- * Policy - , SnapshotInterval (..) , SnapshotPolicy (..) + , SnapshotSelectorContext (..) + , SnapshotFrequency (..) + , SnapshotFrequencyArgs (..) , defaultSnapshotPolicy , pattern DoDiskSnapshotChecksum , pattern NoDoDiskSnapshotChecksum @@ -84,6 +85,9 @@ module Ouroboros.Consensus.Storage.LedgerDB.Snapshots -- * Tracing , TraceSnapshotEvent (..) + -- * Utility + , OverrideOrDefault (..) + -- * Re-exports , Flag (..) @@ -109,7 +113,7 @@ import qualified Data.Aeson as Aeson import Data.Aeson.Types (Parser) import Data.Functor.Identity import qualified Data.List as List -import Data.Maybe (isJust, mapMaybe) +import Data.Maybe (catMaybes, isJust, mapMaybe, maybeToList) import Data.Ord import Data.Set (Set) import qualified Data.Set as Set @@ -121,7 +125,7 @@ import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config import Ouroboros.Consensus.Ledger.Abstract (EmptyMK) import Ouroboros.Consensus.Ledger.Extended -import Ouroboros.Consensus.Util (Flag (..)) +import Ouroboros.Consensus.Util (Flag (..), lastMaybe) import Ouroboros.Consensus.Util.CBOR ( ReadIncrementalErr , decodeWithOrigin @@ -489,21 +493,6 @@ decodeLBackwardsCompatible _ decodeLedger decodeHash = Policy -------------------------------------------------------------------------------} --- | Length of time that has to pass after which a snapshot is taken. -data SnapshotInterval - = DefaultSnapshotInterval - | RequestedSnapshotInterval DiffTime - | DisableSnapshots - deriving stock (Eq, Generic, Show) - --- | Number of snapshots to be stored on disk. This is either the default value --- as determined by the @'SnapshotPolicy'@, or it is provided by the user. See the --- @'SnapshotPolicy'@ documentation for more information. -data NumOfDiskSnapshots - = DefaultNumOfDiskSnapshots - | RequestedNumOfDiskSnapshots Word - deriving stock (Eq, Generic, Show) - -- | Type-safe flag to regulate the checksum policy of the ledger state snapshots. -- -- These patterns are exposed to cardano-node and will be passed as part of @'SnapshotPolicy'@. @@ -536,95 +525,150 @@ data SnapshotPolicy = SnapshotPolicy -- the next snapshot, we delete the oldest one, leaving the middle -- one available in case of truncation of the write. This is -- probably a sane value in most circumstances. - , onDiskShouldTakeSnapshot :: Maybe DiffTime -> Word64 -> Bool - -- ^ Should we write a snapshot of the ledger state to disk? - -- - -- This function is passed two bits of information: - -- - -- * The time since the last snapshot, or 'NoSnapshotTakenYet' if none was taken yet. - -- Note that 'NoSnapshotTakenYet' merely means no snapshot had been taking yet - -- since the node was started; it does not necessarily mean that none - -- exist on disk. - -- - -- * The distance in terms of blocks applied to the /oldest/ ledger - -- snapshot in memory. During normal operation, this is the number of - -- blocks written to the ImmutableDB since the last snapshot. On - -- startup, it is computed by counting how many immutable blocks we had - -- to reapply to get to the chain tip. This is useful, as it allows the - -- policy to decide to take a snapshot /on node startup/ if a lot of - -- blocks had to be replayed. + , onDiskSnapshotSelector :: SnapshotSelectorContext -> [SlotNo] + -- ^ Select the slots to take a snapshot for, in increasing order. Must be a + -- sublist of 'sscSnapshotSlots'. -- -- See also 'defaultSnapshotPolicy' } deriving NoThunks via OnlyCheckWhnf SnapshotPolicy +data SnapshotSelectorContext = SnapshotSelectorContext + { sscTimeSinceLast :: Maybe DiffTime + -- ^ The time since the last snapshot, or 'Nothing' if none was taken yet. + -- Note that 'Nothing' merely means no snapshot had been taking yet since the + -- node was started; it does not necessarily mean that none exist on disk. + , sscSnapshotSlots :: [SlotNo] + -- ^ An increasing list of slots for which a snapshot can be taken (as the + -- corresponding ledger state is immutable). The result of + -- 'onDiskSnapshotSelector' must be a subset of this list. + } + deriving stock Show + +-- | Determines when/how often we take ledger snapshots. +-- +-- We only write snapshots for ledger states that are /immutable/. Concretely, +-- for every slot @s@ out of +-- +-- > sfaOffset, sfaOffset + sfaInterval, sfaOffset + 2 * sfaInterval, sfaOffset + 3 * sfaInterval, ... +-- +-- we write a snapshot for the most recent immutable ledger state before @s@. +-- This way, nodes with the same @sfaInterval@/@sfaOffset@ configuration create +-- snapshots for precisely the same slots. +-- +-- For example, on Cardano mainnet, where @k=2160@ and @f=1/20@, setting +-- @sfaInterval = 10*k/f = 432000@ (one epoch) and @sfaOffset = 0@ will cause +-- the node to create snapshots for the last block in every Shelley epoch. By +-- setting @sfaOffset@ to eg @5*k/f@ (half an epoch), snapshots are created just +-- before the midway point in each epoch. +-- +-- Additionally, there is an (optional, opt-out) rate limit (useful while +-- bulk-syncing). When set to a given duration, we will skip writing a snapshot +-- if less time than the given duration has passed since we finished writing the +-- previous snapshot (if any). +-- +-- To avoid skipping a snapshot write when caught-up, it is advisable to set +-- 'sfaRateLimit' to something significantly smaller than the wall-clock duration +-- of 'sfaInterval'. +data SnapshotFrequencyArgs = SnapshotFrequencyArgs + { sfaInterval :: OverrideOrDefault (NonZero Word64) + -- ^ Try to write snapshots every 'sfaInterval' many slots. + , sfaOffset :: OverrideOrDefault SlotNo + -- ^ An offset for when to write snapshots, see 'SnapshotFrequency'. + , sfaRateLimit :: OverrideOrDefault DiffTime + -- ^ Ensure (if present) that at least this amount of time passes between + -- writing snapshots. Setting this to a non-positive value disable the rate + -- limit. + } + deriving stock (Show, Eq) + +data SnapshotFrequency + = SnapshotFrequency SnapshotFrequencyArgs + | DisableSnapshots + deriving stock (Show, Eq) + data SnapshotPolicyArgs = SnapshotPolicyArgs - { spaInterval :: !SnapshotInterval - , spaNum :: !NumOfDiskSnapshots + { spaFrequency :: SnapshotFrequency + , spaNum :: OverrideOrDefault Word + -- ^ See 'onDiskNumSnapshots'. } + deriving stock (Show, Eq) defaultSnapshotPolicyArgs :: SnapshotPolicyArgs defaultSnapshotPolicyArgs = SnapshotPolicyArgs - DefaultSnapshotInterval - DefaultNumOfDiskSnapshots + (SnapshotFrequency $ SnapshotFrequencyArgs UseDefault UseDefault UseDefault) + UseDefault -- | Default on-disk policy suitable to use with cardano-node defaultSnapshotPolicy :: SecurityParam -> SnapshotPolicyArgs -> SnapshotPolicy -defaultSnapshotPolicy - (SecurityParam k) - (SnapshotPolicyArgs requestedInterval reqNumOfSnapshots) = - SnapshotPolicy - { onDiskNumSnapshots - , onDiskShouldTakeSnapshot - } - where - onDiskNumSnapshots :: Word - onDiskNumSnapshots = case reqNumOfSnapshots of - DefaultNumOfDiskSnapshots -> 2 - RequestedNumOfDiskSnapshots value -> value - - onDiskShouldTakeSnapshot :: - Maybe DiffTime -> - Word64 -> - Bool - onDiskShouldTakeSnapshot Nothing blocksSinceLast = - -- If users never leave their wallet running for long, this would mean - -- that under some circumstances we would never take a snapshot - -- So, on startup (when the 'time since the last snapshot' is `Nothing`), - -- we take a snapshot as soon as there are @k@ blocks replayed. - -- This means that even if users frequently shut down their wallet, we still - -- take a snapshot roughly every @k@ blocks. It does mean the possibility of - -- an extra unnecessary snapshot during syncing (if the node is restarted), but - -- that is not a big deal. - blocksSinceLast >= unNonZero k - onDiskShouldTakeSnapshot (Just timeSinceLast) blocksSinceLast = - snapshotInterval timeSinceLast - || substantialAmountOfBlocksWereProcessed blocksSinceLast timeSinceLast - - -- \| We want to create a snapshot after a substantial amount of blocks were - -- processed (hard-coded to 50k blocks). Given the fact that during bootstrap - -- a fresh node will see a lot of blocks over a short period of time, we want - -- to limit this condition to happen not more often then a fixed amount of - -- time (here hard-coded to 6 minutes) - substantialAmountOfBlocksWereProcessed blocksSinceLast timeSinceLast = - let minBlocksBeforeSnapshot = 50_000 - minTimeBeforeSnapshot = 6 * secondsToDiffTime 60 - in blocksSinceLast >= minBlocksBeforeSnapshot - && timeSinceLast >= minTimeBeforeSnapshot - - -- \| Requested snapshot interval can be explicitly provided by the - -- caller (RequestedSnapshotInterval) or the caller might request the default - -- snapshot interval (DefaultSnapshotInterval). If the latter then the - -- snapshot interval is defaulted to k * 2 seconds - when @k = 2160@ the interval - -- defaults to 72 minutes. - snapshotInterval t = case requestedInterval of - RequestedSnapshotInterval value -> t >= value - DefaultSnapshotInterval -> t >= secondsToDiffTime (fromIntegral $ unNonZero k * 2) - DisableSnapshots -> False +defaultSnapshotPolicy (SecurityParam k) args = + SnapshotPolicy + { onDiskNumSnapshots + , onDiskSnapshotSelector + } + where + SnapshotPolicyArgs + { spaFrequency + , spaNum = provideDefault 2 -> onDiskNumSnapshots + } = args + + onDiskSnapshotSelector :: SnapshotSelectorContext -> [SlotNo] + onDiskSnapshotSelector ctx + | Just timeSinceLast <- sscTimeSinceLast ctx + , not $ passesRateLimitCheck timeSinceLast = + [] + | otherwise = case spaFrequency of + DisableSnapshots -> [] + SnapshotFrequency + SnapshotFrequencyArgs + { sfaInterval = unNonZero . provideDefault defInterval -> interval + , sfaOffset = provideDefault 0 -> offset + , sfaRateLimit = provideDefault defRateLimit -> rateLimit + } -> + applyRateLimit $ + catMaybes $ + zipWith + shouldTakeSnapshot + (sscSnapshotSlots ctx) + (drop 1 (sscSnapshotSlots ctx)) + where + -- Test whether there is a non-negative integer @n@ such that + -- + -- > candidateSlot < offset + n * interval <= nextSlot + -- + -- If so, return @'Just' 'candidateSlot'@ for snapshotting. + shouldTakeSnapshot :: + SlotNo -> -- The slot to potentially take a snapshot for. + SlotNo -> -- The next slot in 'sscSnapshotSlots'. + Maybe SlotNo + shouldTakeSnapshot candidateSlot nextSlot + | nextSlot < offset = Nothing + | candidateSlot < offset + n * SlotNo interval = Just candidateSlot + | otherwise = Nothing + where + n = SlotNo $ unSlotNo (nextSlot - offset) `div` interval + + -- When rate limiting is enabled, only return at most one (the last) + -- of the slots satisfying 'shouldTakeSnapshot'. + applyRateLimit :: [SlotNo] -> [SlotNo] + applyRateLimit + | rateLimit > 0 = maybeToList . lastMaybe + | otherwise = id + + passesRateLimitCheck t = case spaFrequency of + SnapshotFrequency SnapshotFrequencyArgs{sfaRateLimit} -> + t >= provideDefault defRateLimit sfaRateLimit + DisableSnapshots -> False + + -- On mainnet, this is 72 min for @k=2160@ and a slot length of 1s. + defInterval = unsafeNonZero $ unNonZero k * 2 + + -- Most relevant during syncing. + defRateLimit = secondsToDiffTime $ 10 * 60 {------------------------------------------------------------------------------- Tracing snapshot events @@ -638,3 +682,15 @@ data TraceSnapshotEvent blk | -- | An old or invalid on-disk snapshot was deleted DeletedSnapshot DiskSnapshot deriving (Generic, Eq, Show) + +{------------------------------------------------------------------------------- + Utility (could live in O.C.Util.Args) +-------------------------------------------------------------------------------} + +data OverrideOrDefault a = Override !a | UseDefault + deriving stock (Show, Eq) + +provideDefault :: a -> OverrideOrDefault a -> a +provideDefault d = \case + UseDefault -> d + Override t -> t diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs index 16ea252583..9dac9ed4c3 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs @@ -22,13 +22,14 @@ import Control.Monad.Except import Control.Monad.Trans (lift) import Control.ResourceRegistry import Control.Tracer +import Data.Containers.ListUtils (nubOrd) import qualified Data.Foldable as Foldable import Data.Functor ((<&>)) import Data.Functor.Contravariant ((>$<)) import Data.Kind (Type) import Data.Map (Map) import qualified Data.Map.Strict as Map -import Data.Maybe (isJust) +import Data.Maybe (isJust, mapMaybe) import Data.Set (Set) import qualified Data.Set as Set import Data.Word @@ -127,6 +128,7 @@ mkInitDb args bss getBlock snapManager getVolatileSuffix = flushLock <- mkLedgerDBLock forkers <- newTVarIO Map.empty nextForkerKey <- newTVarIO (ForkerKey 0) + lastSnapshotWrite <- newTVarIO Nothing let env = LedgerDBEnv { ldbChangelog = varDB @@ -144,6 +146,7 @@ mkInitDb args bss getBlock snapManager getVolatileSuffix = , ldbQueryBatchSize = lgrQueryBatchSize , ldbResolveBlock = getBlock , ldbGetVolatileSuffix = getVolatileSuffix + , ldbLastSnapshotWrite = lastSnapshotWrite } h <- LDBHandle <$> newTVarIO (LedgerDBOpen env) pure $ implMkLedgerDb h snapManager @@ -189,7 +192,7 @@ implMkLedgerDb h snapManager = , validateFork = getEnv5 h (implValidate h) , getPrevApplied = getEnvSTM h implGetPrevApplied , garbageCollect = getEnv1 h implGarbageCollect - , tryTakeSnapshot = getEnv2 h (implTryTakeSnapshot snapManager) + , tryTakeSnapshot = getEnv h (implTryTakeSnapshot snapManager) , tryFlush = getEnv h implTryFlush , closeDB = implCloseDB h } @@ -313,32 +316,56 @@ implGarbageCollect env slotNo = atomically $ do Set.dropWhileAntitone ((< slotNo) . realPointSlot) implTryTakeSnapshot :: - ( l ~ ExtLedgerState blk + ( IsLedger (LedgerState blk) + , l ~ ExtLedgerState blk + , HasLedgerTables l , IOLike m ) => SnapshotManagerV1 m blk -> LedgerDBEnv m l blk -> - Maybe (Time, Time) -> - Word64 -> - m SnapCounters -implTryTakeSnapshot snapManager env mTime nrBlocks = - if onDiskShouldTakeSnapshot (ldbSnapshotPolicy env) (uncurry (flip diffTime) <$> mTime) nrBlocks - then do - void $ - withReadLock - (ldbLock env) - ( takeSnapshot - snapManager - Nothing - (ldbChangelog env, ldbBackingStore env) - ) - void $ - trimSnapshots - snapManager + m () +implTryTakeSnapshot snapManager env = do + timeSinceLastWrite <- do + mLastWrite <- readTVarIO $ ldbLastSnapshotWrite env + forM mLastWrite $ \lastWrite -> do + now <- getMonotonicTime + pure $ now `diffTime` lastWrite + -- Get all states before the volatile suffix. + immutableStates <- atomically $ do + states <- changelogStates <$> readTVar (ldbChangelog env) + volSuffix <- getVolatileSuffix (ldbGetVolatileSuffix env) + pure $ AS.dropNewest (AS.length (volSuffix states)) states + let immutableSlots :: [SlotNo] = + -- Remove duplicates due to EBBs. + nubOrd . mapMaybe (withOriginToMaybe . getTipSlot) $ + AS.anchor immutableStates : AS.toOldestFirst immutableStates + snapshotSlots = + onDiskSnapshotSelector (ldbSnapshotPolicy env) - (`SnapCounters` 0) . Just <$> maybe getMonotonicTime (pure . snd) mTime - else - pure $ SnapCounters (fst <$> mTime) nrBlocks + SnapshotSelectorContext + { sscTimeSinceLast = timeSinceLastWrite + , sscSnapshotSlots = immutableSlots + } + forM_ snapshotSlots $ \slot -> do + -- Prune the 'DbChangelog' such that the resulting anchor state has slot + -- number @slot@. + let pruneStrat = LedgerDbPruneBeforeSlot (slot + 1) + atomically $ modifyTVar (ldbChangelog env) (prune pruneStrat) + -- Flush the LedgerDB such that we can take a snapshot for the new anchor + -- state due to the previous prune. + withWriteLock + (ldbLock env) + (flushLedgerDB (ldbChangelog env) (ldbBackingStore env)) + -- Now, taking a snapshot (for the last flushed state) will do what we want. + void $ + withReadLock (ldbLock env) $ + takeSnapshot + snapManager + Nothing + (ldbChangelog env, ldbBackingStore env) + finished <- getMonotonicTime + atomically $ writeTVar (ldbLastSnapshotWrite env) (Just $! finished) + void $ trimSnapshots snapManager (ldbSnapshotPolicy env) -- If the DbChangelog in the LedgerDB can flush (based on the SnapshotPolicy -- with which this LedgerDB was opened), flush differences to the backing @@ -576,6 +603,8 @@ data LedgerDBEnv m l blk = LedgerDBEnv , ldbQueryBatchSize :: !QueryBatchSize , ldbResolveBlock :: !(ResolveBlock m blk) , ldbGetVolatileSuffix :: !(GetVolatileSuffix m blk) + , ldbLastSnapshotWrite :: !(StrictTVar m (Maybe Time)) + -- ^ When did we finish writing the last snapshot. } deriving Generic diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs index d0e7bb1c9d..5398ffa711 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs @@ -20,11 +20,14 @@ import Control.RAWLock import qualified Control.RAWLock as RAWLock import Control.ResourceRegistry import Control.Tracer +import Data.Containers.ListUtils (nubOrd) +import Data.Foldable (for_) import qualified Data.Foldable as Foldable import Data.Functor.Contravariant ((>$<)) import Data.Kind (Type) import Data.Map (Map) import qualified Data.Map.Strict as Map +import Data.Maybe (mapMaybe) import Data.Set (Set) import qualified Data.Set as Set import Data.Traversable (for) @@ -100,6 +103,7 @@ mkInitDb args getBlock snapManager getVolatileSuffix res = do lock <- RAWLock.new () forkers <- newTVarIO Map.empty nextForkerKey <- newTVarIO (ForkerKey 0) + lastSnapshotWrite <- newTVarIO Nothing let env = LedgerDBEnv { ldbSeq = varDB @@ -115,6 +119,7 @@ mkInitDb args getBlock snapManager getVolatileSuffix res = do , ldbOpenHandlesLock = lock , ldbGetVolatileSuffix = getVolatileSuffix , ldbResourceKeys = SomeResources res + , ldbLastSnapshotWrite = lastSnapshotWrite } h <- LDBHandle <$> newTVarIO (LedgerDBOpen env) pure $ implMkLedgerDb h snapManager @@ -163,7 +168,7 @@ implMkLedgerDb h snapManager = , validateFork = getEnv5 h (implValidate h) , getPrevApplied = getEnvSTM h implGetPrevApplied , garbageCollect = \s -> getEnv h (flip implGarbageCollect s) - , tryTakeSnapshot = getEnv2 h (implTryTakeSnapshot snapManager) + , tryTakeSnapshot = getEnv h (implTryTakeSnapshot snapManager) , tryFlush = getEnv h implTryFlush , closeDB = implCloseDB h } @@ -344,25 +349,38 @@ implTryTakeSnapshot :: ) => SnapshotManager m m blk (StateRef m (ExtLedgerState blk)) -> LedgerDBEnv m l blk -> - Maybe (Time, Time) -> - Word64 -> - m SnapCounters -implTryTakeSnapshot snapManager env mTime nrBlocks = - if onDiskShouldTakeSnapshot (ldbSnapshotPolicy env) (uncurry (flip diffTime) <$> mTime) nrBlocks - then do - withStateRef env (MkSolo . anchorHandle) $ \(MkSolo (st, _)) -> - Monad.void $ - takeSnapshot - snapManager - Nothing - st - Monad.void $ - trimSnapshots - snapManager - (ldbSnapshotPolicy env) - (`SnapCounters` 0) . Just <$> maybe getMonotonicTime (pure . snd) mTime - else - pure $ SnapCounters (fst <$> mTime) nrBlocks + m () +implTryTakeSnapshot snapManager env = do + timeSinceLastWrite <- do + mLastWrite <- readTVarIO $ ldbLastSnapshotWrite env + for mLastWrite $ \lastWrite -> do + now <- getMonotonicTime + pure $ now `diffTime` lastWrite + RAWLock.withReadAccess (ldbOpenHandlesLock env) $ \() -> do + lseq@(LedgerSeq immutableStates) <- atomically $ do + LedgerSeq states <- readTVar $ ldbSeq env + volSuffix <- getVolatileSuffix (ldbGetVolatileSuffix env) + pure $ LedgerSeq $ AS.dropNewest (AS.length (volSuffix states)) states + let immutableSlots :: [SlotNo] = + -- Remove duplicates due to EBBs. + nubOrd . mapMaybe (withOriginToMaybe . getTipSlot . state) $ + AS.anchor immutableStates : AS.toOldestFirst immutableStates + snapshotSlots = + onDiskSnapshotSelector + (ldbSnapshotPolicy env) + SnapshotSelectorContext + { sscTimeSinceLast = timeSinceLastWrite + , sscSnapshotSlots = immutableSlots + } + for_ snapshotSlots $ \slot -> do + -- Prune the 'DbChangelog' such that the resulting anchor state has slot + -- number @slot@. + let pruneStrat = LedgerDbPruneBeforeSlot (slot + 1) + st = anchorHandle $ snd $ prune pruneStrat lseq + Monad.void $ takeSnapshot snapManager Nothing st + finished <- getMonotonicTime + atomically $ writeTVar (ldbLastSnapshotWrite env) (Just $! finished) + Monad.void $ trimSnapshots snapManager (ldbSnapshotPolicy env) -- In the first version of the LedgerDB for UTxO-HD, there is a need to -- periodically flush the accumulated differences to the disk. However, in the @@ -447,6 +465,8 @@ data LedgerDBEnv m l blk = LedgerDBEnv -- in tests can release such resources. These are the resource keys for the -- LSM session and the resource key for the BlockIO interface. , ldbGetVolatileSuffix :: !(GetVolatileSuffix m blk) + , ldbLastSnapshotWrite :: !(StrictTVar m (Maybe Time)) + -- ^ When did we finish writing the last snapshot. } deriving Generic @@ -499,16 +519,6 @@ getEnv (LDBHandle varState) f = LedgerDBOpen env -> f env LedgerDBClosed -> throwIO $ ClosedDBError @blk prettyCallStack --- | Variant 'of 'getEnv' for functions taking two arguments. -getEnv2 :: - (IOLike m, HasCallStack, HasHeader blk) => - LedgerDBHandle m l blk -> - (LedgerDBEnv m l blk -> a -> b -> m r) -> - a -> - b -> - m r -getEnv2 h f a b = getEnv h (\env -> f env a b) - -- | Variant 'of 'getEnv' for functions taking five arguments. getEnv5 :: (IOLike m, HasCallStack, HasHeader blk) => diff --git a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs index 8c22333ac4..cf9c7775a9 100644 --- a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs +++ b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs @@ -119,10 +119,7 @@ fromMinimalChainDbArgs MinimalChainDbArgs{..} = } , cdbLgrDbArgs = LedgerDbArgs - { lgrSnapshotPolicyArgs = - LedgerDB.SnapshotPolicyArgs - LedgerDB.DefaultSnapshotInterval - LedgerDB.DefaultNumOfDiskSnapshots + { lgrSnapshotPolicyArgs = LedgerDB.defaultSnapshotPolicyArgs , -- Keep 2 ledger snapshots, and take a new snapshot at least every 2 * -- k seconds, where k is the security parameter. lgrGenesis = return mcdbInitLedger diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs index 1c45dae1be..60a67681b1 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs @@ -541,7 +541,7 @@ openLedgerDB flavArgs env cfg fs rr = do (tracer, getNumOpenHandles) <- mkTrackOpenHandles let args = LedgerDbArgs - (SnapshotPolicyArgs DisableSnapshots DefaultNumOfDiskSnapshots) + defaultSnapshotPolicyArgs (pure genesis) fs cfg From dd7491de7c8460f05150fe7bc45cd385a2c56823 Mon Sep 17 00:00:00 2001 From: Alexander Esgen Date: Mon, 30 Jun 2025 10:01:59 +0200 Subject: [PATCH 3/4] LedgerDB: remove replayed blocks counter It is no longer needed by the predictable snapshotting logic. --- ...0_alexander.esgen_predictable_snapshots.md | 0 .../Cardano/Tools/DBAnalyser/Run.hs | 2 +- .../Consensus/Storage/ChainDB/Impl.hs | 2 +- .../Ouroboros/Consensus/Storage/LedgerDB.hs | 15 +++++------- .../Consensus/Storage/LedgerDB/API.hs | 24 +++++++------------ .../MiniProtocol/LocalStateQuery/Server.hs | 13 +++++----- .../Storage/LedgerDB/StateMachine.hs | 2 +- 7 files changed, 24 insertions(+), 34 deletions(-) create mode 100644 ouroboros-consensus-cardano/changelog.d/20250630_100310_alexander.esgen_predictable_snapshots.md diff --git a/ouroboros-consensus-cardano/changelog.d/20250630_100310_alexander.esgen_predictable_snapshots.md b/ouroboros-consensus-cardano/changelog.d/20250630_100310_alexander.esgen_predictable_snapshots.md new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs index 5a2fe7776b..0b661b1337 100644 --- a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs +++ b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs @@ -73,7 +73,7 @@ openLedgerDB :: , LedgerDB.TestInternals' IO blk ) openLedgerDB args = do - (ldb, _, od) <- case LedgerDB.lgrBackendArgs args of + (ldb, od) <- case LedgerDB.lgrBackendArgs args of LedgerDB.LedgerDbBackendArgsV1 bss -> let snapManager = LedgerDB.V1.snapshotManager args initDb = diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs index 6c2ba869e7..e23888d4b7 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs @@ -164,7 +164,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do traceWith tracer $ TraceOpenEvent StartedOpeningLgrDB (ledgerDbGetVolatileSuffix, setGetCurrentChainForLedgerDB) <- mkLedgerDbGetVolatileSuffix - (lgrDB, _replayed) <- + lgrDB <- LedgerDB.openDB argsLgrDb (ImmutableDB.streamAPI immutableDB) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs index cb01fe8a8d..984513b39a 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs @@ -16,7 +16,6 @@ module Ouroboros.Consensus.Storage.LedgerDB ) where import Data.Functor.Contravariant ((>$<)) -import Data.Word import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config import Ouroboros.Consensus.HardFork.Abstract @@ -61,7 +60,7 @@ openDB :: -- | How to get blocks from the ChainDB ResolveBlock m blk -> GetVolatileSuffix m blk -> - m (LedgerDB' m blk, Word64) + m (LedgerDB' m blk) openDB args stream @@ -113,11 +112,9 @@ doOpenDB :: SnapshotManager m n blk st -> StreamAPI m blk blk -> Point blk -> - m (LedgerDB' m blk, Word64) + m (LedgerDB' m blk) doOpenDB args initDb snapManager stream replayGoal = - f <$> openDBInternal args initDb snapManager stream replayGoal - where - f (ldb, replayCounter, _) = (ldb, replayCounter) + fst <$> openDBInternal args initDb snapManager stream replayGoal -- | Open the ledger DB and expose internals for testing purposes openDBInternal :: @@ -131,10 +128,10 @@ openDBInternal :: SnapshotManager m n blk st -> StreamAPI m blk blk -> Point blk -> - m (LedgerDB' m blk, Word64, TestInternals' m blk) + m (LedgerDB' m blk, TestInternals' m blk) openDBInternal args@(LedgerDbArgs{lgrHasFS = SomeHasFS fs}) initDb snapManager stream replayGoal = do createDirectoryIfMissing fs True (mkFsPath []) - (_initLog, db, replayCounter) <- + (_initLog, db) <- initialize replayTracer snapTracer @@ -145,7 +142,7 @@ openDBInternal args@(LedgerDbArgs{lgrHasFS = SomeHasFS fs}) initDb snapManager s snapManager lgrStartSnapshot (ledgerDb, internal) <- mkLedgerDb initDb db - return (ledgerDb, replayCounter, internal) + return (ledgerDb, internal) where LedgerDbArgs { lgrConfig diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs index d70ff2d1af..e05e210af8 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs @@ -501,7 +501,7 @@ initialize :: InitDB db m blk -> SnapshotManager m n blk st -> Maybe DiskSnapshot -> - m (InitLog blk, db, Word64) + m (InitLog blk, db) initialize replayTracer snapTracer @@ -523,7 +523,6 @@ initialize m ( InitLog blk , db - , Word64 ) tryNewestFirst acc [] = do -- We're out of snapshots. Start at genesis @@ -544,7 +543,7 @@ initialize Left err -> do abortLedgerDbInit initDb error $ "Invariant violation: invalid immutable chain " <> show err - Right (db, replayed) -> return (acc InitFromGenesis, db, replayed) + Right db -> return (acc InitFromGenesis, db) tryNewestFirst acc (s : ss) = do eInitDb <- initFromSnapshot s case eInitDb of @@ -596,7 +595,7 @@ initialize Monad.when (diskSnapshotIsTemporary s) $ deleteSnapshot snapManager s abortLedgerDbInit initDb tryNewestFirst (acc . InitFailure s err) ss - Right (db, replayed) -> return (acc (InitFromSnapshot s pt), db, replayed) + Right db -> return (acc (InitFromSnapshot s pt), db) replayTracer' = decorateReplayTracerWithGoal @@ -620,32 +619,27 @@ replayStartingWith :: db -> Point blk -> InitDB db m blk -> - ExceptT (SnapshotFailure blk) m (db, Word64) + ExceptT (SnapshotFailure blk) m db replayStartingWith tracer cfg stream initDb from InitDB{initReapplyBlock, currentTip} = do streamAll stream from InitFailureTooRecent - (initDb, 0) + initDb push where - push :: - blk -> - (db, Word64) -> - m (db, Word64) - push blk (!db, !replayed) = do + push :: blk -> db -> m db + push blk !db = do !db' <- initReapplyBlock cfg blk db - let !replayed' = replayed + 1 - - events = + let events = inspectLedger (getExtLedgerCfg (ledgerDbCfg cfg)) (currentTip db) (currentTip db') traceWith tracer (ReplayedBlock (blockRealPoint blk) events) - return (db', replayed') + return db' {------------------------------------------------------------------------------- Trace replay events diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/LocalStateQuery/Server.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/LocalStateQuery/Server.hs index 8ff1e4de74..83de09a8aa 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/LocalStateQuery/Server.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/LocalStateQuery/Server.hs @@ -239,13 +239,12 @@ initLedgerDB s c = do , lgrStartSnapshot = Nothing } ldb <- - fst - <$> LedgerDB.openDB - args - streamAPI - (Chain.headPoint c) - (\rpt -> pure $ fromMaybe (error "impossible") $ Chain.findBlock ((rpt ==) . blockRealPoint) c) - (LedgerDB.praosGetVolatileSuffix s) + LedgerDB.openDB + args + streamAPI + (Chain.headPoint c) + (\rpt -> pure $ fromMaybe (error "impossible") $ Chain.findBlock ((rpt ==) . blockRealPoint) c) + (LedgerDB.praosGetVolatileSuffix s) result <- LedgerDB.validateFork diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs index 60a67681b1..5b3a369893 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs @@ -550,7 +550,7 @@ openLedgerDB flavArgs env cfg fs rr = do rr DefaultQueryBatchSize Nothing - (ldb, _, od) <- case lgrBackendArgs args of + (ldb, od) <- case lgrBackendArgs args of LedgerDbBackendArgsV1 bss -> let snapManager = V1.snapshotManager args initDb = From 2500bdefa546b4fe2d47620f14cd98cc92b6c5bc Mon Sep 17 00:00:00 2001 From: Alexander Esgen Date: Mon, 7 Jul 2025 03:42:09 +0200 Subject: [PATCH 4/4] Add ChainDB test for ledger snapshots --- ouroboros-consensus/ouroboros-consensus.cabal | 2 + .../Test/Ouroboros/Storage/ChainDB.hs | 2 + .../Storage/ChainDB/LedgerSnapshots.hs | 399 ++++++++++++++++++ 3 files changed, 403 insertions(+) create mode 100644 ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/LedgerSnapshots.hs diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 44a679ac11..e16d0c2f71 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -761,6 +761,7 @@ test-suite storage-test Test.Ouroboros.Storage.ChainDB.FollowerPromptness Test.Ouroboros.Storage.ChainDB.GcSchedule Test.Ouroboros.Storage.ChainDB.Iterator + Test.Ouroboros.Storage.ChainDB.LedgerSnapshots Test.Ouroboros.Storage.ChainDB.Model Test.Ouroboros.Storage.ChainDB.Model.Test Test.Ouroboros.Storage.ChainDB.Paths @@ -796,6 +797,7 @@ test-suite storage-test aeson, base, bifunctors, + blockio:sim, bytestring, cardano-binary, cardano-ledger-binary:testlib, diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB.hs index 087c7e35ea..af95656d17 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB.hs @@ -25,6 +25,7 @@ import System.Info (os) import qualified Test.Ouroboros.Storage.ChainDB.FollowerPromptness as FollowerPromptness import qualified Test.Ouroboros.Storage.ChainDB.GcSchedule as GcSchedule import qualified Test.Ouroboros.Storage.ChainDB.Iterator as Iterator +import qualified Test.Ouroboros.Storage.ChainDB.LedgerSnapshots as LedgerSnapshots import qualified Test.Ouroboros.Storage.ChainDB.Model.Test as Model import qualified Test.Ouroboros.Storage.ChainDB.Paths as Paths import qualified Test.Ouroboros.Storage.ChainDB.StateMachine as StateMachine @@ -36,6 +37,7 @@ tests = testGroup "ChainDB" $ [ Iterator.tests , FollowerPromptness.tests + , LedgerSnapshots.tests , GcSchedule.tests , Model.tests , Paths.tests diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/LedgerSnapshots.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/LedgerSnapshots.hs new file mode 100644 index 0000000000..3f7011abb9 --- /dev/null +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/LedgerSnapshots.hs @@ -0,0 +1,399 @@ +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE ViewPatterns #-} + +-- | Test that ledger snapshots are performed at /predictable/ points on the +-- immutable chain (modulo rate limiting). +-- +-- We open a ChainDB and add to it a (shuffled) list of blocks such that the +-- immutable chain is predetermined. Then, we check that ledger snapshots were +-- created for precisely the points we expect given the configured +-- 'SnapshotFrequencyArgs'. +module Test.Ouroboros.Storage.ChainDB.LedgerSnapshots (tests) where + +import Cardano.Ledger.BaseTypes.NonZero +import Control.Monad (replicateM) +import Control.Monad.IOSim (runSim) +import Control.ResourceRegistry +import Control.Tracer +import Data.Foldable (for_) +import qualified Data.List.NonEmpty as NE +import Data.Maybe (mapMaybe) +import qualified Data.Set as Set +import Data.Time (secondsToDiffTime) +import Data.Traversable (for) +import Data.Word (Word64) +import Ouroboros.Consensus.Block +import Ouroboros.Consensus.Config +import qualified Ouroboros.Consensus.Storage.ChainDB as ChainDB +import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB) +import qualified Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment as Punishment +import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.Args as ChainDB +import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB +import Ouroboros.Consensus.Storage.LedgerDB.Args (LedgerDbBackendArgs) +import Ouroboros.Consensus.Storage.LedgerDB.Snapshots +import qualified Ouroboros.Consensus.Storage.LedgerDB.Snapshots as LedgerDB +import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Args as LedgerDB.V1 +import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore as LedgerDB.V1 +import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.InMemory as LedgerDB.V1.InMemory +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.Backend as LedgerDB.V2 +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory as LedgerDB.V2.InMemory +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LSM as LedgerDB.V2.LSM +import Ouroboros.Consensus.Util (dropLast) +import Ouroboros.Consensus.Util.Condense +import Ouroboros.Consensus.Util.Enclose (Enclosing' (FallingEdgeWith)) +import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Network.AnchoredFragment (Anchor, AnchoredFragment) +import qualified Ouroboros.Network.AnchoredFragment as AF +import System.FS.API (SomeHasFS) +import System.FS.API.Types (mkFsPath) +import System.FS.BlockIO.Sim (simHasBlockIO') +import qualified System.FS.Sim.MockFS as MockFS +import Test.Tasty +import Test.Tasty.QuickCheck hiding (NonZero) +import Test.Util.ChainDB +import Test.Util.Orphans.IOLike () +import Test.Util.QuickCheck +import Test.Util.TestBlock +import Test.Util.Tracer (recordingTracerTVar) + +tests :: TestTree +tests = + testGroup + "LedgerSnapshots" + [ testProperty "InMemV1" $ prop_ledgerSnapshots inMemV1 + , testProperty "InMemV2" $ prop_ledgerSnapshots inMemV2 + , testProperty "LSM" $ \salt -> prop_ledgerSnapshots (lsm salt) + ] + where + inMemV1, inMemV2 :: IOLike m => LedgerDbBackendArgs m TestBlock + inMemV1 = + LedgerDB.LedgerDbBackendArgsV1 $ + LedgerDB.V1.V1Args LedgerDB.V1.DisableFlushing $ + LedgerDB.V1.SomeBackendArgs LedgerDB.V1.InMemory.InMemArgs + inMemV2 = + LedgerDB.LedgerDbBackendArgsV2 $ + LedgerDB.V2.SomeBackendArgs LedgerDB.V2.InMemory.InMemArgs + + lsm :: + IOLike m => + LedgerDB.V2.LSM.Salt -> + LedgerDbBackendArgs m TestBlock + lsm salt = + LedgerDB.LedgerDbBackendArgsV2 $ + LedgerDB.V2.SomeBackendArgs $ + LedgerDB.V2.LSM.LSMArgs (mkFsPath []) salt mkSimBlockIOFS + where + mkSimBlockIOFS registry = allocate registry (\_ -> mk) (\_ -> pure ()) + where + mk = + uncurry LedgerDB.V2.LSM.SomeHasFSAndBlockIO + <$> simHasBlockIO' MockFS.empty + +prop_ledgerSnapshots :: + (forall m. IOLike m => LedgerDbBackendArgs m TestBlock) -> + TestSetup -> + Property +prop_ledgerSnapshots lgrDbBackendArgs testSetup = + case runSim (runTest lgrDbBackendArgs testSetup) of + Right testOutcome -> checkTestOutcome testSetup testOutcome + Left err -> counterexample ("Failure: " <> show err) False + +{------------------------------------------------------------------------------- + Test setup +-------------------------------------------------------------------------------} + +data TestSetup = TestSetup + { tsSecParam :: SecurityParam + , tsMainChain :: AnchoredFragment TestBlock + , tsForks :: [AnchoredFragment TestBlock] + -- ^ Forks anchored in the immutable prefix of the main chain. Must be of + -- length at most @k@. + , tsPerm :: Permutation + -- ^ Shuffle the blocks when adding them to the ChainDB, see 'tsBlocksToAdd'. + , tsTestSnapshotPolicyArgs :: TestSnapshotPolicyArgs + } + deriving stock Show + +data TestSnapshotPolicyArgs = TestSnapshotPolicyArgs + { tspaNum :: Word + , tspaInterval :: NonZero Word64 + , tspaOffset :: SlotNo + , tspaRateLimit :: DiffTime + } + deriving stock Show + +instance Arbitrary TestSnapshotPolicyArgs where + arbitrary = do + tspaNum <- choose (1, 10) + tspaInterval <- choose (1, 10) `suchThatMap` nonZero + tspaOffset <- SlotNo <$> choose (1, 20) + tspaRateLimit <- + frequency + [ (2, pure 0) + , (1, secondsToDiffTime <$> choose (1, 10)) + ] + pure + TestSnapshotPolicyArgs + { tspaNum + , tspaInterval + , tspaOffset + , tspaRateLimit + } + +-- | Add blocks to the ChainDB in this order. +tsBlocksToAdd :: TestSetup -> [TestBlock] +tsBlocksToAdd testSetup = + permute tsPerm $ + foldMap AF.toOldestFirst (tsMainChain : tsForks) + where + TestSetup{tsMainChain, tsForks, tsPerm} = testSetup + +tsSnapshotPolicyArgs :: TestSetup -> SnapshotPolicyArgs +tsSnapshotPolicyArgs TestSetup{tsTestSnapshotPolicyArgs} = + SnapshotPolicyArgs + { spaFrequency + , spaNum = Override $ tspaNum tsTestSnapshotPolicyArgs + } + where + spaFrequency = + SnapshotFrequency + SnapshotFrequencyArgs + { sfaInterval = Override $ tspaInterval tsTestSnapshotPolicyArgs + , sfaOffset = Override $ tspaOffset tsTestSnapshotPolicyArgs + , sfaRateLimit = Override $ tspaRateLimit tsTestSnapshotPolicyArgs + } + +instance Arbitrary TestSetup where + arbitrary = do + k <- choose (1, 6) + let + -- Generate an anchored fragment of the given length starting from the + -- given block, with random slot gaps. + genChain :: + Int -> -- Length of the chain + Word64 -> -- Fork number + Anchor TestBlock -> + Gen (AnchoredFragment TestBlock) + genChain len forkNo anchor = + go 0 (AF.Empty anchor) + where + go n acc + | n >= len = pure acc + | otherwise = do + slotOffset <- SlotNo <$> choose (1, 10) + let blk = modifyFork (\_ -> forkNo) $ + (\b -> b{tbSlot = tbSlot b + slotOffset}) $ + case AF.headPoint acc of + GenesisPoint -> firstBlock forkNo + BlockPoint slot hash -> + (successorBlockWithPayload hash slot ()) + go (n + 1) (acc AF.:> blk) + + immutableLength <- choose (0, 20) + tsMainChain <- genChain (immutableLength + k) 0 AF.AnchorGenesis + let immChain = AF.dropNewest k tsMainChain + immAnchors = AF.anchor immChain : (AF.anchorFromBlock <$> AF.toOldestFirst immChain) + numForks <- choose (0, 5) + forkAnchors <- replicateM numForks $ elements immAnchors + tsForks <- for ([1 ..] `zip` forkAnchors) $ \(forkNo, forkAnchor) -> do + forkLength <- choose (1, k) + genChain forkLength forkNo forkAnchor + + tsPerm <- arbitrary + tsTestSnapshotPolicyArgs <- arbitrary + pure + TestSetup + { tsSecParam = SecurityParam $ unsafeNonZero $ fromIntegral k + , tsMainChain + , tsForks + , tsPerm + , tsTestSnapshotPolicyArgs + } + + shrink testSetup@TestSetup{tsSecParam, tsMainChain, tsForks} = + [ testSetup + { tsMainChain = tsMainChain' + , tsForks = filter isStillAnchoredOnImmChain tsForks + } + | tsMainChain' <- [AF.dropNewest 1 tsMainChain | not $ AF.null tsMainChain] + , let k = unNonZero $ maxRollbacks tsSecParam + immChain' = AF.dropNewest (fromIntegral k) tsMainChain' + isStillAnchoredOnImmChain f = + AF.withinFragmentBounds (AF.anchorPoint f) immChain' + ] + +{------------------------------------------------------------------------------- + Run test +-------------------------------------------------------------------------------} + +data TestOutcome = TestOutcome + { toutImmutableTip :: Anchor TestBlock + , toutTrace :: [(Time, ChainDB.TraceEvent TestBlock)] + , toutFinalSnapshots :: [DiskSnapshot] + } + deriving stock Show + +runTest :: + forall m. + IOLike m => + LedgerDbBackendArgs m TestBlock -> + TestSetup -> + m TestOutcome +runTest lgrDbBackendArgs testSetup = withRegistry \registry -> do + (withTime -> tracer, getTrace) <- recordingTracerTVar + + (chainDB, lgrHasFS) <- openChainDB registry tracer + + for_ (tsBlocksToAdd testSetup) \blk -> do + ChainDB.addBlock_ chainDB Punishment.noPunishment blk + threadDelay 1 + + toutImmutableTip <- + AF.castAnchor . AF.anchor <$> atomically (ChainDB.getCurrentChain chainDB) + toutTrace <- getTrace + toutFinalSnapshots <- LedgerDB.defaultListSnapshots lgrHasFS -- TODO + pure + TestOutcome + { toutImmutableTip + , toutTrace + , toutFinalSnapshots + } + where + openChainDB :: + ResourceRegistry m -> + Tracer m (ChainDB.TraceEvent TestBlock) -> + m (ChainDB m TestBlock, SomeHasFS m) + openChainDB registry cdbTracer = do + chainDbArgs <- do + mcdbNodeDBs <- emptyNodeDBs + let mcdbTopLevelConfig = singleNodeTestConfigWithK (tsSecParam testSetup) + cdbArgs = + fromMinimalChainDbArgs + MinimalChainDbArgs + { mcdbTopLevelConfig + , mcdbNodeDBs + , mcdbChunkInfo = mkTestChunkInfo mcdbTopLevelConfig + , mcdbInitLedger = testInitExtLedger + , mcdbRegistry = registry + } + updLgrDbArgs a = + a + { ChainDB.cdbLgrDbArgs = + (ChainDB.cdbLgrDbArgs a) + { LedgerDB.lgrBackendArgs = lgrDbBackendArgs + , LedgerDB.lgrSnapshotPolicyArgs = tsSnapshotPolicyArgs testSetup + } + } + pure $ updLgrDbArgs $ ChainDB.updateTracer cdbTracer cdbArgs + (_, chainDB) <- + allocate + registry + (\_ -> ChainDB.openDB chainDbArgs) + (ChainDB.closeDB) + pure (chainDB, LedgerDB.lgrHasFS . ChainDB.cdbLgrDbArgs $ chainDbArgs) + + withTime = contramapM \ev -> (,ev) <$> getMonotonicTime + +{------------------------------------------------------------------------------- + Assess a test outcome +-------------------------------------------------------------------------------} + +checkTestOutcome :: TestSetup -> TestOutcome -> Property +checkTestOutcome testSetup testOutcome = + withLabelling . withTrace $ + conjoin + [ counterexample "Unexpected immutable tip" $ + toutImmutableTip === AF.headAnchor immChain + , counterexample "Snapshots not strictly increasing" $ + strictlyIncreasing (snd <$> actualSnapshots) + , counterexample ("Unexpected number of on-disk snapshots " <> show toutFinalSnapshots) $ + length toutFinalSnapshots + === min (length actualSnapshots) (fromIntegral tspaNum) + , counterexample ("Rate limit not respected...") $ + conjoin + [ counterexample ("...between " <> condense pt1 <> " and " <> condense pt2) $ + tspaRateLimit `le` diffTime t2 t1 + | ((t1, pt1), (t2, pt2)) <- actualSnapshots `zip` drop 1 actualSnapshots + ] + , counterexample "Unexpected snapshots performed" $ + counterexample ("Policy: " <> show policyArgs) $ do + let actual = Set.fromList (snd <$> actualSnapshots) + expect = Set.fromList expectedSnapshots + counterexample ("Not expected: " <> condense (actual Set.\\ expect)) $ + if tspaRateLimit <= 0 + then + counterexample ("Expected, but missing: " <> condense (expect Set.\\ actual)) $ + actual === expect + else + property $ actual `Set.isSubsetOf` expect + ] + where + TestSetup + { tsSecParam = unNonZero . maxRollbacks -> k + , tsMainChain + , tsTestSnapshotPolicyArgs = + policyArgs@TestSnapshotPolicyArgs + { tspaNum + , tspaInterval + , tspaOffset + , tspaRateLimit + } + } = testSetup + + immChain = AF.dropNewest (fromIntegral k) tsMainChain + + ppTrace (time, ev) = show time <> ": " <> show ev + + isTookSnapshot :: ChainDB.TraceEvent blk -> Maybe SlotNo + isTookSnapshot = \case + ChainDB.TraceLedgerDBEvent + ( LedgerDB.LedgerDBSnapshotEvent + (LedgerDB.TookSnapshot _ pt FallingEdgeWith{}) + ) -> pure $ realPointSlot pt + _ -> Nothing + + TestOutcome + { toutImmutableTip + , toutTrace + , toutFinalSnapshots + } = testOutcome + + actualSnapshots :: [(Time, SlotNo)] + actualSnapshots = mapMaybe (traverse isTookSnapshot) toutTrace + + -- Group on @(s1 - offset) / interval@ and take the last entry from each group + -- (apart from the last one). + expectedSnapshots :: [SlotNo] + expectedSnapshots = + fmap NE.last + -- For the last group, it is not yet necessarily clear what the last + -- immutable block will be. (If there is a block in the last slot of a + -- group, ie the predecessor of @offset + n * interval@ for some @n@, + -- there can't be, but it doesn't seem important to handle this case in a + -- special way.) + . dropLast 1 + . NE.groupWith snapshotGroup + . fmap blockSlot + . AF.toOldestFirst + $ immChain + where + snapshotGroup s1 + | s1 < tspaOffset = Nothing + | otherwise = Just $ unSlotNo (s1 - tspaOffset) `div` unNonZero tspaInterval + + withTrace = + counterexample ("Trace:\n" <> unlines (ppTrace <$> toutTrace)) + . counterexample ("Actual snapshots: " <> condense actualSnapshots) + . counterexample ("Actual immutable tip: " <> condense (AF.anchorToPoint toutImmutableTip)) + . counterexample ("Immutable chain: " <> condense immChain) + + withLabelling = + tabulate "# actual snapshots" [show (length actualSnapshots)] + . tabulate "length of immutable chain" [show (AF.anchorToBlockNo toutImmutableTip)]