Skip to content

Commit 02aaaca

Browse files
committed
Parallelize ledger application
1 parent 6bc6098 commit 02aaaca

File tree

7 files changed

+130
-32
lines changed

7 files changed

+130
-32
lines changed

cardano-db-sync/cardano-db-sync.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ library
134134
Cardano.DbSync.Sync
135135
Cardano.DbSync.Threads.Database
136136
Cardano.DbSync.Threads.EpochStake
137+
Cardano.DbSync.Threads.Ledger
137138
Cardano.DbSync.Threads.Stake
138139
Cardano.DbSync.Threads.TxInResolve
139140
Cardano.DbSync.Tracing.ToObjectOrphans

cardano-db-sync/src/Cardano/DbSync.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import Cardano.DbSync.Rollback (unsafeRollback)
4242
import Cardano.DbSync.Sync (runSyncNodeClient)
4343
import Cardano.DbSync.Threads.Database
4444
import Cardano.DbSync.Threads.EpochStake
45+
import Cardano.DbSync.Threads.Ledger
4546
import Cardano.DbSync.Threads.Stake
4647
import Cardano.DbSync.Tracing.ToObjectOrphans ()
4748
import Cardano.DbSync.Types
@@ -206,6 +207,7 @@ runSyncNode metricsSetters trce iomgr dbConnString runMigrationFnc syncNodeConfi
206207
id
207208
[ runDbThread syncEnv metricsSetters threadChannels
208209
, runSyncNodeClient metricsSetters syncEnv iomgr trce threadChannels (enpSocketPath syncNodeParams)
210+
, runLedgerThread syncEnv
209211
, runEpochStakeThread syncEnv
210212
, runStakeThread syncEnv
211213
, runFetchOffChainPoolThread syncEnv

cardano-db-sync/src/Cardano/DbSync/Api.hs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ module Cardano.DbSync.Api (
3333
getNetwork,
3434
hasLedgerState,
3535
writePrefetch,
36-
generateNewEpochEvents,
36+
addNewEventsAndSort,
3737
) where
3838

3939
import Cardano.BM.Trace (Trace, logInfo, logWarning)
@@ -48,7 +48,7 @@ import Cardano.DbSync.Config.Types
4848
import Cardano.DbSync.Error
4949
import Cardano.DbSync.Ledger.Event (LedgerEvent (..))
5050
import Cardano.DbSync.Ledger.State (mkHasLedgerEnv)
51-
import Cardano.DbSync.Ledger.Types (HasLedgerEnv (..))
51+
import Cardano.DbSync.Ledger.Types
5252
import Cardano.DbSync.LocalStateQuery
5353
import Cardano.DbSync.Types
5454
import Cardano.DbSync.Util
@@ -182,6 +182,13 @@ initCurrentEpochNo =
182182
{ cenEpochNo = Strict.Nothing
183183
}
184184

185+
addNewEventsAndSort :: SyncEnv -> ApplyResult -> IO ApplyResult
186+
addNewEventsAndSort env applyResult = do
187+
epochEvents <- liftIO $ atomically $ generateNewEpochEvents env details
188+
pure applyResult {apEvents = sort $ epochEvents <> apEvents applyResult}
189+
where
190+
details = apSlotDetails applyResult
191+
185192
generateNewEpochEvents :: SyncEnv -> SlotDetails -> STM [LedgerEvent]
186193
generateNewEpochEvents env details = do
187194
!lastEpochNo <- readTVar (envCurrentEpochNo env)

cardano-db-sync/src/Cardano/DbSync/Block.hs

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,14 @@ import Cardano.BM.Trace (logError, logInfo)
1818
import qualified Cardano.Db as DB
1919
import Cardano.DbSync.Api
2020
import Cardano.DbSync.Api.Ledger
21-
import Cardano.DbSync.Api.Types (ConsistentLevel (..), InsertOptions (..), LedgerEnv (..), SyncEnv (..))
21+
import Cardano.DbSync.Api.Types (ConsistentLevel (..), InsertOptions (..), SyncEnv (..))
2222
import Cardano.DbSync.Era.Byron.Insert (insertByronBlock)
2323
import qualified Cardano.DbSync.Era.Shelley.Generic as Generic
2424
import Cardano.DbSync.Era.Universal.Block (insertBlockUniversal, prepareBlock)
2525
import Cardano.DbSync.Era.Universal.Epoch (hasEpochStartEvent) -- , hasNewEpochEvent)
2626
import Cardano.DbSync.Era.Universal.Insert.LedgerEvent (insertNewEpochLedgerEvents)
2727
import Cardano.DbSync.Error
28-
import Cardano.DbSync.Ledger.State (applyBlockAndSnapshot, defaultApplyResult)
29-
import Cardano.DbSync.Ledger.Types (ApplyResult (..))
30-
import Cardano.DbSync.LocalStateQuery
28+
import Cardano.DbSync.Ledger.Types
3129
import Cardano.DbSync.Rollback
3230
import Cardano.DbSync.Types
3331
import Cardano.DbSync.Util
@@ -49,6 +47,8 @@ import Cardano.DbSync.Era.Universal.Insert.Grouped (insertBlockGroupedData)
4947
import Cardano.DbSync.Cache (queryPrevBlockWithCache)
5048
import Control.Monad.Extra (whenJust)
5149
import Database.Persist.Sql
50+
import Cardano.DbSync.Threads.Ledger
51+
import Control.Concurrent.Class.MonadSTM.Strict (readTMVar)
5252

5353
insertListBlocks ::
5454
SyncEnv ->
@@ -84,7 +84,7 @@ applyAndInsertBlocksMaybe syncEnv = go
8484
liftIO $ setConsistentLevel syncEnv Consistent
8585
pure $ Just ls
8686
Right _ -> do
87-
(applyRes, _) <- liftIO (mkApplyResult syncEnv cblk False)
87+
applyRes <- fst <$> liftIO (mkApplyResult syncEnv cblk)
8888
whenJust (getNewEpoch applyRes) $ \epochNo ->
8989
liftIO $ logInfo tracer $ "Reached " <> textShow epochNo
9090
go rest
@@ -120,7 +120,7 @@ applyAndInsertByronBlock ::
120120
((DB.BlockId, Bool), ByronBlock) ->
121121
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
122122
applyAndInsertByronBlock syncEnv ((_blockId, firstAfterRollback), blk) = do
123-
(applyResult, tookSnapshot) <- liftIO (mkApplyResult syncEnv (BlockByron blk) True)
123+
(applyResult, tookSnapshot) <- liftIO (mkApplyResult syncEnv (BlockByron blk)) -- TODO use writeLedgerAction here as well for better performance
124124
let isStartEventOrRollback = hasEpochStartEvent (apEvents applyResult) || firstAfterRollback
125125
let details = apSlotDetails applyResult
126126
insertNewEpochLedgerEvents syncEnv (sdEpochNo (apSlotDetails applyResult)) (apEvents applyResult)
@@ -135,31 +135,50 @@ applyAndInsertBlock ::
135135
((DB.BlockId, Bool), CardanoBlock) ->
136136
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
137137
applyAndInsertBlock syncEnv ((blockId, firstAfterRollback), cblock) = do
138-
(applyResult, tookSnapshot) <- liftIO (mkApplyResult syncEnv cblock True)
139-
insertNewEpochLedgerEvents syncEnv (sdEpochNo (apSlotDetails applyResult)) (apEvents applyResult)
138+
applyRessultVar <- liftIO (asyncApplyResult syncEnv cblock)
139+
-- insertNewEpochLedgerEvents syncEnv (sdEpochNo (apSlotDetails applyResult)) (apEvents applyResult)
140140
whenGeneric $ \blk ->
141-
insertBlock syncEnv (blockId, blk) applyResult firstAfterRollback tookSnapshot
141+
prepareInsertBlock syncEnv (blockId, blk) applyRessultVar firstAfterRollback
142142
where
143143
tracer = getTrace syncEnv
144144
iopts = getInsertOptions syncEnv
145145
whenGeneric action =
146146
maybe (liftIO $ logError tracer "Found Byron Block after Shelley") action (toGenericBlock iopts cblock)
147147

148-
insertBlock ::
148+
prepareInsertBlock ::
149149
SyncEnv ->
150150
(DB.BlockId, Generic.Block) ->
151-
ApplyResult ->
152-
Bool ->
151+
LedgerResultResTMVar ->
153152
Bool ->
154153
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
155-
insertBlock syncEnv (blockId, blk) applyResult firstAfterRollback tookSnapshot = do
154+
prepareInsertBlock syncEnv (blockId, blk) applyRessultVar firstAfterRollback = do
156155
(blockDB, preparedTxs) <-
157156
liftIO $ concurrently
158157
(runOrThrowIO $ runExceptT $ DB.runDbLoggingExceptT backend tracer $ prepareBlock syncEnv blk)
159158
(mapConcurrently prepareTxWithPool (Generic.blkTxs blk))
160159

161160
_minIds <- insertBlockGroupedData syncEnv $ mconcat (snd <$> preparedTxs)
162-
mapM_ (uncurry3 $ insertTxRest syncEnv blockId epochNo slotNo applyResult) (fst <$> preparedTxs)
161+
(applyResult, tookSnapshot) <- liftIO $ atomically $ readTMVar applyRessultVar
162+
insertBlockWithLedger syncEnv blockId blockDB blk (fst <$> preparedTxs) applyResult firstAfterRollback tookSnapshot
163+
where
164+
prepareTxWithPool tx = runOrThrowIO $ runSqlPoolNoTransaction (prepTx tx) (envPool syncEnv) Nothing
165+
prepTx = runExceptT . prepareTxGrouped syncEnv [] blockId
166+
167+
backend = envBackend syncEnv
168+
tracer = getTrace syncEnv
169+
170+
insertBlockWithLedger ::
171+
SyncEnv ->
172+
DB.BlockId ->
173+
DB.Block ->
174+
Generic.Block ->
175+
[(DB.TxId, DB.Tx, Generic.Tx)] ->
176+
ApplyResult ->
177+
Bool ->
178+
Bool ->
179+
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
180+
insertBlockWithLedger syncEnv blockId blockDB blk txs applyResult firstAfterRollback tookSnapshot = do
181+
mapM_ (uncurry3 $ insertTxRest syncEnv blockId epochNo slotNo applyResult) txs
163182
insertBlockUniversal
164183
syncEnv
165184
blockId
@@ -174,11 +193,6 @@ insertBlock syncEnv (blockId, blk) applyResult firstAfterRollback tookSnapshot =
174193
epochNo = sdEpochNo details
175194
slotNo = sdSlotNo details
176195
blkNo = Generic.blkBlockNo blk
177-
backend = envBackend syncEnv
178-
tracer = getTrace syncEnv
179-
180-
prepareTxWithPool tx = runOrThrowIO $ runSqlPoolNoTransaction (prepTx tx) (envPool syncEnv) Nothing
181-
prepTx = runExceptT . prepareTxGrouped syncEnv [] blockId
182196

183197
insertBlockRest ::
184198
SyncEnv ->
@@ -240,17 +254,6 @@ insertBlockRest syncEnv blkNo applyResult tookSnapshot = do
240254
tracer = getTrace syncEnv
241255
txOutTableType = getTxOutTableType syncEnv
242256

243-
mkApplyResult :: SyncEnv -> CardanoBlock -> Bool -> IO (ApplyResult, Bool)
244-
mkApplyResult syncEnv cblk isCons = do
245-
(applyRes, tookSnapshot) <- case envLedgerEnv syncEnv of
246-
HasLedger hle -> applyBlockAndSnapshot hle cblk isCons
247-
NoLedger nle -> do
248-
slotDetails <- getSlotDetailsNode nle (cardanoBlockSlotNo cblk)
249-
pure (defaultApplyResult slotDetails, False)
250-
let details = apSlotDetails applyRes
251-
epochEvents <- liftIO $ atomically $ generateNewEpochEvents syncEnv details
252-
pure (applyRes {apEvents = sort $ epochEvents <> apEvents applyRes}, tookSnapshot)
253-
254257
takeWhileByron :: [(a, CardanoBlock)] -> ([(a, ByronBlock)], [(a, CardanoBlock)])
255258
takeWhileByron = go []
256259
where

cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ mkHasLedgerEnv trce protoInfo dir nw systemStart syncOptions = do
176176
intervar <- newTVarIO Strict.Nothing
177177
swQueue <- newTBQueueIO 5 -- Should be relatively shallow.
178178
stakeChans <- newEpochStakeChannels
179+
applyQueue <- newTBQueueIO 10
179180
pure
180181
HasLedgerEnv
181182
{ leTrace = trce
@@ -191,6 +192,7 @@ mkHasLedgerEnv trce protoInfo dir nw systemStart syncOptions = do
191192
, leInterpreter = intervar
192193
, leStateVar = svar
193194
, leStateWriteQueue = swQueue
195+
, leApplyQueue = applyQueue
194196
, leEpochStakeChans = stakeChans
195197
}
196198

cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import Cardano.Slotting.Slot (
3737
WithOrigin (..),
3838
)
3939
import Control.Concurrent.Class.MonadSTM.Strict (
40+
StrictTMVar,
4041
StrictTVar,
4142
)
4243
import Control.Concurrent.STM.TBQueue (TBQueue)
@@ -73,6 +74,7 @@ data HasLedgerEnv = HasLedgerEnv
7374
, leInterpreter :: !(StrictTVar IO (Strict.Maybe CardanoInterpreter))
7475
, leStateVar :: !(StrictTVar IO (Strict.Maybe LedgerDB))
7576
, leStateWriteQueue :: !(TBQueue (FilePath, CardanoLedgerState))
77+
, leApplyQueue :: TBQueue LedgerAction
7678
, leEpochStakeChans :: EpochStakeChannels
7779
}
7880

@@ -196,6 +198,9 @@ instance Anchorable (WithOrigin SlotNo) CardanoLedgerState CardanoLedgerState wh
196198

197199
data SnapshotPoint = OnDisk LedgerStateFile | InMemory CardanoPoint
198200

201+
data LedgerAction = LedgerAction CardanoBlock LedgerResultResTMVar
202+
type LedgerResultResTMVar = StrictTMVar IO (ApplyResult, Bool)
203+
199204
data EpochStakeDBAction = EpochStakeDBAction
200205
{ esaEpochNo :: EpochNo
201206
, esaSnapShot :: Ledger.SnapShot StandardCrypto
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE OverloadedStrings #-}
3+
{-# LANGUAGE TupleSections #-}
4+
5+
module Cardano.DbSync.Threads.Ledger where
6+
7+
import Cardano.BM.Trace (logInfo)
8+
import Cardano.DbSync.Api
9+
import Cardano.DbSync.Api.Types
10+
import Cardano.DbSync.Ledger.State
11+
import Cardano.DbSync.Ledger.Types
12+
import Cardano.DbSync.LocalStateQuery
13+
import Cardano.DbSync.Types
14+
import Cardano.DbSync.Util
15+
import Control.Concurrent.Class.MonadSTM.Strict (
16+
atomically,
17+
newEmptyTMVarIO,
18+
newTMVarIO,
19+
writeTMVar,
20+
)
21+
import qualified Control.Concurrent.STM.TBQueue as TBQ
22+
import Control.Monad (forever)
23+
import Control.Monad.IO.Class (liftIO)
24+
25+
runLedgerThread ::
26+
SyncEnv ->
27+
IO ()
28+
runLedgerThread syncEnv =
29+
case envLedgerEnv syncEnv of
30+
NoLedger _ -> pure ()
31+
HasLedger le -> do
32+
logInfo trce "Running Event thread"
33+
logException trce "runEpochStakeThread: " (runLedgerLoop syncEnv le)
34+
logInfo trce "Shutting Event thread"
35+
where
36+
trce = getTrace syncEnv
37+
38+
runLedgerLoop :: SyncEnv -> HasLedgerEnv -> IO ()
39+
runLedgerLoop syncEnv lenv = forever $ do
40+
LedgerAction cblk resVar <- liftIO $ atomically $ TBQ.readTBQueue (leApplyQueue lenv)
41+
res <- applyBlockAction syncEnv lenv cblk True
42+
atomically $ writeTMVar resVar res
43+
44+
-- May be used by 2 different thread. Not at the same time.
45+
applyBlockAction :: SyncEnv -> HasLedgerEnv -> CardanoBlock -> Bool -> IO (ApplyResult, Bool)
46+
applyBlockAction syncEnv lenv cblk isCons = do
47+
(applyRes, tookSnapshot) <- applyBlockAndSnapshot lenv cblk isCons
48+
applyRes' <- addNewEventsAndSort syncEnv applyRes
49+
pure (applyRes', tookSnapshot)
50+
51+
-- Not used by the Ledger thread
52+
noLedgerAction :: SyncEnv -> NoLedgerEnv -> CardanoBlock -> IO ApplyResult
53+
noLedgerAction syncEnv nle cblk = do
54+
slotDetails <- getSlotDetailsNode nle (cardanoBlockSlotNo cblk)
55+
addNewEventsAndSort syncEnv $ defaultApplyResult slotDetails
56+
57+
-- Not used by the Ledger thread
58+
writeLedgerAction :: HasLedgerEnv -> CardanoBlock -> IO LedgerResultResTMVar
59+
writeLedgerAction lenv cblock = do
60+
resVar <- newEmptyTMVarIO
61+
atomically $ TBQ.writeTBQueue (leApplyQueue lenv) $ LedgerAction cblock resVar
62+
pure resVar
63+
64+
-- Not used by the Ledger thread
65+
asyncApplyResult :: SyncEnv -> CardanoBlock -> IO LedgerResultResTMVar
66+
asyncApplyResult syncEnv cblk =
67+
case envLedgerEnv syncEnv of
68+
HasLedger hle -> writeLedgerAction hle cblk
69+
NoLedger nle -> do
70+
applyRes <- noLedgerAction syncEnv nle cblk
71+
newTMVarIO (applyRes, False)
72+
73+
-- Not used by the Ledger thread. This doesn't even send something to the thread.
74+
mkApplyResult :: SyncEnv -> CardanoBlock -> IO (ApplyResult, Bool)
75+
mkApplyResult syncEnv cblk = do
76+
case envLedgerEnv syncEnv of
77+
HasLedger hle -> applyBlockAction syncEnv hle cblk False
78+
NoLedger nle -> (,False) <$> noLedgerAction syncEnv nle cblk

0 commit comments

Comments
 (0)