11{-# LANGUAGE BangPatterns #-}
22{-# LANGUAGE DeriveAnyClass #-}
33{-# LANGUAGE DeriveGeneric #-}
4+ {-# LANGUAGE DerivingStrategies #-}
45{-# LANGUAGE FlexibleContexts #-}
56{-# LANGUAGE LambdaCase #-}
67{-# LANGUAGE NamedFieldPuns #-}
@@ -19,11 +20,10 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Background
1920 launchBgTasks
2021
2122 -- * Copying blocks from the VolatileDB to the ImmutableDB
22- , copyAndSnapshotRunner
2323 , copyToImmutableDB
2424
2525 -- * Executing garbage collection
26- , garbageCollect
26+ , garbageCollectBlocks
2727
2828 -- * Scheduling garbage collections
2929 , GcParams (.. )
@@ -76,6 +76,7 @@ import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
7676import Ouroboros.Consensus.Util
7777import Ouroboros.Consensus.Util.Condense
7878import Ouroboros.Consensus.Util.IOLike
79+ import Ouroboros.Consensus.Util.STM (Watcher (.. ), forkLinkedWatcher )
7980import Ouroboros.Network.AnchoredFragment (AnchoredSeq (.. ))
8081import qualified Ouroboros.Network.AnchoredFragment as AF
8182
@@ -99,17 +100,30 @@ launchBgTasks cdb@CDB{..} replayed = do
99100 ! addBlockThread <-
100101 launch " ChainDB.addBlockRunner" $
101102 addBlockRunner cdbChainSelFuse cdb
103+
104+ ledgerDbTasksTrigger <- newLedgerDbTasksTrigger replayed
105+ ! ledgerDbMaintenaceThread <-
106+ forkLinkedWatcher cdbRegistry " ChainDB.ledgerDbTaskWatcher" $
107+ ledgerDbTaskWatcher cdb ledgerDbTasksTrigger
108+
102109 gcSchedule <- newGcSchedule
103110 ! gcThread <-
104- launch " ChainDB.gcScheduleRunner " $
111+ launch " ChainDB.gcBlocksScheduleRunner " $
105112 gcScheduleRunner gcSchedule $
106- garbageCollect cdb
107- ! copyAndSnapshotThread <-
108- launch " ChainDB.copyAndSnapshotRunner" $
109- copyAndSnapshotRunner cdb gcSchedule replayed cdbCopyFuse
113+ garbageCollectBlocks cdb
114+
115+ ! copyToImmutableDBThread <-
116+ launch " ChainDB.copyToImmutableDBRunner" $
117+ copyToImmutableDBRunner cdb ledgerDbTasksTrigger gcSchedule cdbCopyFuse
118+
110119 atomically $
111120 writeTVar cdbKillBgThreads $
112- sequence_ [addBlockThread, gcThread, copyAndSnapshotThread]
121+ sequence_
122+ [ addBlockThread
123+ , cancelThread ledgerDbMaintenaceThread
124+ , gcThread
125+ , copyToImmutableDBThread
126+ ]
113127 where
114128 launch :: String -> m Void -> m (m () )
115129 launch = fmap cancelThread .: forkLinkedThread cdbRegistry
@@ -198,22 +212,18 @@ copyToImmutableDB CDB{..} = electric $ do
198212 _ -> error " header to remove not on the current chain"
199213
200214{- ------------------------------------------------------------------------------
201- Snapshotting
215+ Copy to ImmutableDB
202216-------------------------------------------------------------------------------}
203217
204- -- | Copy blocks from the VolatileDB to ImmutableDB and take snapshots of the
205- -- LedgerDB
218+ -- | Copy blocks from the VolatileDB to ImmutableDB and trigger further tasks in
219+ -- other threads.
206220--
207221-- We watch the chain for changes. Whenever the chain is longer than @k@, then
208222-- the headers older than @k@ are copied from the VolatileDB to the ImmutableDB
209223-- (using 'copyToImmutableDB'). Once that is complete,
210224--
211- -- * We periodically take a snapshot of the LedgerDB (depending on its config).
212- -- When enough blocks (depending on its config) have been replayed during
213- -- startup, a snapshot of the replayed LedgerDB will be written to disk at the
214- -- start of this function. NOTE: After this initial snapshot we do not take a
215- -- snapshot of the LedgerDB until the chain has changed again, irrespective of
216- -- the LedgerDB policy.
225+ -- * Trigger LedgerDB maintenance tasks, namely flushing, taking snapshots and
226+ -- garbage collection.
217227--
218228-- * Schedule GC of the VolatileDB ('scheduleGC') for the 'SlotNo' of the most
219229-- recent block that was copied.
@@ -228,32 +238,26 @@ copyToImmutableDB CDB{..} = electric $ do
228238-- GC can happen, when we restart the node and schedule the /next/ GC, it will
229239-- /imply/ any previously scheduled GC, since GC is driven by slot number
230240-- ("garbage collect anything older than @x@").
231- copyAndSnapshotRunner ::
241+ copyToImmutableDBRunner ::
232242 forall m blk .
233243 ( IOLike m
234244 , LedgerSupportsProtocol blk
235245 ) =>
236246 ChainDbEnv m blk ->
247+ LedgerDbTasksTrigger m ->
237248 GcSchedule m ->
238- -- | Number of immutable blocks replayed on ledger DB startup
239- Word64 ->
240249 Fuse m ->
241250 m Void
242- copyAndSnapshotRunner cdb@ CDB {.. } gcSchedule replayed fuse = do
251+ copyToImmutableDBRunner cdb@ CDB {.. } ledgerDbTasksTrigger gcSchedule fuse = do
243252 -- this first flush will persist the differences that come from the initial
244253 -- chain selection.
245254 LedgerDB. tryFlush cdbLedgerDB
246- loop =<< LedgerDB. tryTakeSnapshot cdbLedgerDB Nothing replayed
255+ forever copyAndTrigger
247256 where
248257 SecurityParam k = configSecurityParam cdbTopLevelConfig
249258
250- loop :: LedgerDB. SnapCounters -> m Void
251- loop counters = do
252- let LedgerDB. SnapCounters
253- { prevSnapshotTime
254- , ntBlocksSinceLastSnap
255- } = counters
256-
259+ copyAndTrigger :: m ()
260+ copyAndTrigger = do
257261 -- Wait for the chain to grow larger than @k@
258262 numToWrite <- atomically $ do
259263 curChain <- icWithoutTime <$> readTVar cdbChain
@@ -264,14 +268,10 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do
264268 --
265269 -- This is a synchronous operation: when it returns, the blocks have been
266270 -- copied to disk (though not flushed, necessarily).
267- withFuse fuse (copyToImmutableDB cdb) >>= scheduleGC'
271+ gcSlotNo <- withFuse fuse (copyToImmutableDB cdb)
268272
269- LedgerDB. tryFlush cdbLedgerDB
270-
271- now <- getMonotonicTime
272- let ntBlocksSinceLastSnap' = ntBlocksSinceLastSnap + numToWrite
273-
274- loop =<< LedgerDB. tryTakeSnapshot cdbLedgerDB ((,now) <$> prevSnapshotTime) ntBlocksSinceLastSnap'
273+ triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo numToWrite
274+ scheduleGC' gcSlotNo
275275
276276 scheduleGC' :: WithOrigin SlotNo -> m ()
277277 scheduleGC' Origin = return ()
@@ -285,16 +285,104 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do
285285 }
286286 gcSchedule
287287
288+ {- ------------------------------------------------------------------------------
289+ LedgerDB maintenance tasks
290+ -------------------------------------------------------------------------------}
291+
292+ -- | Trigger for the LedgerDB maintenance tasks, namely whenever the immutable
293+ -- DB tip slot advances when we finish copying blocks to it.
294+ newtype LedgerDbTasksTrigger m
295+ = LedgerDbTasksTrigger (StrictTVar m LedgerDbTaskState )
296+
297+ data LedgerDbTaskState = LedgerDbTaskState
298+ { ldbtsImmTip :: ! (WithOrigin SlotNo )
299+ , ldbtsPrevSnapshotTime :: ! (Maybe Time )
300+ , ldbtsBlocksSinceLastSnapshot :: ! Word64
301+ }
302+ deriving stock Generic
303+ deriving anyclass NoThunks
304+
305+ newLedgerDbTasksTrigger ::
306+ IOLike m =>
307+ -- | Number of blocks replayed.
308+ Word64 ->
309+ m (LedgerDbTasksTrigger m )
310+ newLedgerDbTasksTrigger replayed = LedgerDbTasksTrigger <$> newTVarIO st
311+ where
312+ st =
313+ LedgerDbTaskState
314+ { ldbtsImmTip = Origin
315+ , ldbtsPrevSnapshotTime = Nothing
316+ , ldbtsBlocksSinceLastSnapshot = replayed
317+ }
318+
319+ triggerLedgerDbTasks ::
320+ forall m .
321+ IOLike m =>
322+ LedgerDbTasksTrigger m ->
323+ -- | New tip of the ImmutableDB.
324+ WithOrigin SlotNo ->
325+ -- | Number of blocks written to the ImmutableDB.
326+ Word64 ->
327+ m ()
328+ triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) immTip numWritten =
329+ atomically $ modifyTVar varSt $ \ st ->
330+ st
331+ { ldbtsImmTip = immTip
332+ , ldbtsBlocksSinceLastSnapshot = ldbtsBlocksSinceLastSnapshot st + numWritten
333+ }
334+
335+ -- | Run LedgerDB maintenance tasks when 'LedgerDbTasksTrigger' changes.
336+ --
337+ -- * Flushing of differences.
338+ -- * Taking snapshots.
339+ -- * Garbage collection.
340+ ledgerDbTaskWatcher ::
341+ forall m blk .
342+ IOLike m =>
343+ ChainDbEnv m blk ->
344+ LedgerDbTasksTrigger m ->
345+ Watcher m LedgerDbTaskState (WithOrigin SlotNo )
346+ ledgerDbTaskWatcher CDB {.. } (LedgerDbTasksTrigger varSt) =
347+ Watcher
348+ { wFingerprint = ldbtsImmTip
349+ , wInitial = Nothing
350+ , wReader = readTVar varSt
351+ , wNotify =
352+ \ LedgerDbTaskState
353+ { ldbtsImmTip
354+ , ldbtsBlocksSinceLastSnapshot = blocksSinceLast
355+ , ldbtsPrevSnapshotTime = prevSnapTime
356+ } ->
357+ whenJust (withOriginToMaybe ldbtsImmTip) $ \ slotNo -> do
358+ LedgerDB. tryFlush cdbLedgerDB
359+
360+ now <- getMonotonicTime
361+ LedgerDB. SnapCounters
362+ { prevSnapshotTime
363+ , ntBlocksSinceLastSnap
364+ } <-
365+ LedgerDB. tryTakeSnapshot
366+ cdbLedgerDB
367+ ((,now) <$> prevSnapTime)
368+ blocksSinceLast
369+ atomically $ modifyTVar varSt $ \ st ->
370+ st
371+ { ldbtsBlocksSinceLastSnapshot =
372+ ldbtsBlocksSinceLastSnapshot st - blocksSinceLast + ntBlocksSinceLastSnap
373+ , ldbtsPrevSnapshotTime = prevSnapshotTime
374+ }
375+
376+ LedgerDB. garbageCollect cdbLedgerDB slotNo
377+ }
378+
288379{- ------------------------------------------------------------------------------
289380 Executing garbage collection
290381-------------------------------------------------------------------------------}
291382
292383-- | Trigger a garbage collection for blocks older than the given 'SlotNo' on
293384-- the VolatileDB.
294385--
295- -- Also removes the corresponding cached "previously applied points" from the
296- -- LedgerDB.
297- --
298386-- This is thread-safe as the VolatileDB locks itself while performing a GC.
299387--
300388-- When calling this function it is __critical__ that the blocks that will be
@@ -304,11 +392,10 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do
304392--
305393-- TODO will a long GC be a bottleneck? It will block any other calls to
306394-- @putBlock@ and @getBlock@.
307- garbageCollect :: forall m blk . IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
308- garbageCollect CDB {.. } slotNo = do
395+ garbageCollectBlocks :: forall m blk . IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
396+ garbageCollectBlocks CDB {.. } slotNo = do
309397 VolatileDB. garbageCollect cdbVolatileDB slotNo
310398 atomically $ do
311- LedgerDB. garbageCollect cdbLedgerDB slotNo
312399 modifyTVar cdbInvalid $ fmap $ Map. filter ((>= slotNo) . invalidBlockSlotNo)
313400 traceWith cdbTracer $ TraceGCEvent $ PerformedGC slotNo
314401
0 commit comments