Skip to content

Commit 632137c

Browse files
committed
Intert in Bulk
1 parent bb1e64a commit 632137c

File tree

4 files changed

+79
-33
lines changed

4 files changed

+79
-33
lines changed

cardano-db-sync/src/Cardano/DbSync/DbAction.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ newThreadChannels =
7878
-- The pipeline queue in the LocalChainSync machinery is 50 elements long
7979
-- so we should not exceed that.
8080
ThreadChannels
81-
<$> TBQ.newTBQueueIO 47
81+
<$> TBQ.newTBQueueIO 300
8282
<*> newTVarIO False
8383

8484
writeDbActionQueue :: ThreadChannels -> DbAction -> STM ()

cardano-db-sync/src/Cardano/DbSync/Default.hs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import Cardano.DbSync.Era.Shelley.Adjust (adjustEpochRewards)
2424
import qualified Cardano.DbSync.Era.Shelley.Generic as Generic
2525
import Cardano.DbSync.Era.Shelley.Insert (insertShelleyBlock, mkAdaPots)
2626
import Cardano.DbSync.Era.Shelley.Insert.Epoch (insertPoolDepositRefunds, insertRewards)
27+
import Cardano.DbSync.Era.Shelley.Insert.Grouped
2728
import Cardano.DbSync.Era.Shelley.Validate (validateEpochRewards)
2829
import Cardano.DbSync.Error
2930
import Cardano.DbSync.Fix.EpochStake
@@ -57,22 +58,24 @@ insertListBlocks ::
5758
[CardanoBlock] ->
5859
IO (Either SyncNodeError ())
5960
insertListBlocks synEnv blocks = do
60-
DB.runDbIohkLogging (envBackend synEnv) tracer
61-
. runExceptT
62-
$ traverse_ (applyAndInsertBlockMaybe synEnv) blocks
61+
DB.runDbIohkLogging (envBackend synEnv) tracer $ runExceptT $ do
62+
groups <- foldM (applyAndInsertBlockMaybe synEnv) [] blocks
63+
unless (null groups) $
64+
void $ insertBlockGroupedData synEnv $ mconcat $ reverse groups
6365
where
6466
tracer = getTrace synEnv
6567

6668
applyAndInsertBlockMaybe ::
6769
SyncEnv ->
70+
[BlockGroupedData] ->
6871
CardanoBlock ->
69-
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
70-
applyAndInsertBlockMaybe syncEnv cblk = do
72+
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) [BlockGroupedData]
73+
applyAndInsertBlockMaybe syncEnv groups cblk = do
7174
bl <- liftIO $ isConsistent syncEnv
7275
(!applyRes, !tookSnapshot) <- liftIO (mkApplyResult bl)
7376
if bl
7477
then -- In the usual case it will be consistent so we don't need to do any queries. Just insert the block
75-
insertBlock syncEnv cblk applyRes False tookSnapshot
78+
insertBlock syncEnv groups cblk applyRes False False tookSnapshot
7679
else do
7780
eiBlockInDbAlreadyId <- lift (DB.queryBlockId (SBS.fromShort . Consensus.getOneEraHash $ blockHash cblk))
7881
-- If the block is already in db, do nothing. If not, delete all blocks with greater 'BlockNo' or
@@ -88,7 +91,7 @@ applyAndInsertBlockMaybe syncEnv cblk = do
8891
]
8992
rollbackFromBlockNo syncEnv (blockNo cblk)
9093
void $ migrateStakeDistr syncEnv (apOldLedger applyRes)
91-
insertBlock syncEnv cblk applyRes True tookSnapshot
94+
_ <- insertBlock syncEnv groups cblk applyRes True True tookSnapshot
9295
liftIO $ setConsistentLevel syncEnv Consistent
9396
Right blockId | Just (adaPots, slotNo, epochNo) <- getAdaPots applyRes -> do
9497
replaced <- lift $ DB.replaceAdaPots blockId $ mkAdaPots blockId slotNo epochNo adaPots
@@ -99,6 +102,7 @@ applyAndInsertBlockMaybe syncEnv cblk = do
99102
| Just epochNo <- getNewEpoch applyRes ->
100103
liftIO $ logInfo tracer $ "Reached " <> textShow epochNo
101104
_ -> pure ()
105+
pure []
102106
where
103107
tracer = getTrace syncEnv
104108

@@ -122,26 +126,31 @@ applyAndInsertBlockMaybe syncEnv cblk = do
122126

123127
insertBlock ::
124128
SyncEnv ->
129+
[BlockGroupedData] ->
125130
CardanoBlock ->
126131
ApplyResult ->
132+
-- force inserting all data
133+
Bool ->
127134
-- is first Block after rollback
128135
Bool ->
129136
-- has snapshot been taken
130137
Bool ->
131-
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
132-
insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
138+
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) [BlockGroupedData]
139+
insertBlock syncEnv groupsPrev cblk applyRes forceInsert firstAfterRollback tookSnapshot = do
133140
!epochEvents <- liftIO $ atomically $ generateNewEpochEvents syncEnv (apSlotDetails applyRes)
134141
let !applyResult = applyRes {apEvents = sort $ epochEvents <> apEvents applyRes}
135142
let !details = apSlotDetails applyResult
136143
let !withinTwoMin = isWithinTwoMin details
137144
let !withinHalfHour = isWithinHalfHour details
145+
let !insertAll = forceInsert || withinTwoMin || withinHalfHour || tookSnapshot
138146
insertLedgerEvents syncEnv (sdEpochNo details) (apEvents applyResult)
139147
let isNewEpochEvent = hasNewEpochEvent (apEvents applyResult)
140148
let isStartEventOrRollback = hasEpochStartEvent (apEvents applyResult) || firstAfterRollback
141149
let isMember poolId = Set.member poolId (apPoolsRegistered applyResult)
142150
let insertShelley blk =
143151
insertShelleyBlock
144152
syncEnv
153+
groupsPrev
145154
isStartEventOrRollback
146155
withinTwoMin
147156
withinHalfHour
@@ -152,10 +161,11 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
152161

153162
-- Here we insert the block and it's txs, but in adition we also cache some values which we later
154163
-- use when updating the Epoch, thus saving us having to recalulating them later.
155-
case cblk of
156-
BlockByron blk ->
164+
groups <- case cblk of
165+
BlockByron blk -> do
157166
newExceptT $
158167
insertByronBlock syncEnv isStartEventOrRollback blk details
168+
pure []
159169
BlockShelley blk ->
160170
newExceptT $
161171
insertShelley $
@@ -186,7 +196,15 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
186196
when (unBlockNo blkNo `mod` getPruneInterval syncEnv == 0) $
187197
do
188198
lift $ DB.deleteConsumedTxOut tracer (getSafeBlockNoDiff syncEnv)
199+
groups' <-
200+
if insertAll then do
201+
unless (null groups) $
202+
void $ insertBlockGroupedData syncEnv $ mconcat $ reverse groups
203+
pure []
204+
else
205+
pure groups
189206
commitOrIndexes withinTwoMin withinHalfHour
207+
pure groups'
190208
where
191209
tracer = getTrace syncEnv
192210
iopts = getInsertOptions syncEnv

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,16 @@ type IsPoolMember = PoolKeyHash -> Bool
101101
insertShelleyBlock ::
102102
(MonadBaseControl IO m, MonadIO m) =>
103103
SyncEnv ->
104+
[BlockGroupedData] ->
104105
Bool ->
105106
Bool ->
106107
Bool ->
107108
Generic.Block ->
108109
SlotDetails ->
109110
IsPoolMember ->
110111
ApplyResult ->
111-
ReaderT SqlBackend m (Either SyncNodeError ())
112-
insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details isMember applyResult = do
112+
ReaderT SqlBackend m (Either SyncNodeError [BlockGroupedData])
113+
insertShelleyBlock syncEnv groupsPrev shouldLog withinTwoMins withinHalfHour blk details isMember applyResult = do
113114
runExceptT $ do
114115
pbid <- case Generic.blkPreviousHash blk of
115116
Nothing -> liftLookupFail (renderErrorMessage (Generic.blkEra blk)) DB.queryGenesis -- this is for networks that fork from Byron on epoch 0.
@@ -141,12 +142,11 @@ insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details is
141142

142143
let zippedTx = zip [0 ..] (Generic.blkTxs blk)
143144

144-
txsPrepared <- foldAndAccM (prepareTx syncEnv blkId applyResult) zippedTx
145+
txsPrepared <- foldAndAccM (prepareTx syncEnv txOutPrev blkId applyResult) zippedTx
145146
txIds <- lift $ DB.insertManyTx (ptrTxDb <$> txsPrepared)
146-
let txInserter = insertTx syncEnv blkId isMember (sdEpochNo details) (Generic.blkSlotNo blk) applyResult
147+
let txInserter = insertTx syncEnv txOutPrev blkId isMember (sdEpochNo details) (Generic.blkSlotNo blk) applyResult
147148
let newZip = zipWith3 (\tx txId ptr -> (txId, tx, ptr)) (Generic.blkTxs blk) txIds txsPrepared
148149
blockGroupedData <- foldM txInserter mempty newZip
149-
minIds <- insertBlockGroupedData syncEnv blockGroupedData
150150

151151
-- now that we've inserted the Block and all it's txs lets cache what we'll need
152152
-- when we later update the epoch values.
@@ -164,9 +164,6 @@ insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details is
164164
, ebdTxCount = fromIntegral $ length (Generic.blkTxs blk)
165165
}
166166

167-
when withinHalfHour $
168-
insertReverseIndex blkId minIds
169-
170167
liftIO $ do
171168
let epoch = unEpochNo epochNo
172169
slotWithinEpoch = unEpochSlot (sdEpochSlot details)
@@ -208,9 +205,20 @@ insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details is
208205
when (ioOffChainPoolData iopts)
209206
. lift
210207
$ insertOffChainPoolResults tracer (envOffChainPoolResultQueue syncEnv)
208+
209+
if withinHalfHour then do
210+
unless (null groupsPrev) $
211+
void $ insertBlockGroupedData syncEnv $ mconcat $ reverse groupsPrev
212+
minIds <- insertBlockGroupedData syncEnv blockGroupedData
213+
insertReverseIndex blkId minIds
214+
pure []
215+
else do
216+
pure $ blockGroupedData : groupsPrev
211217
where
212218
iopts = getInsertOptions syncEnv
213219

220+
txOutPrev = fmap fst . groupedTxOut <$> groupsPrev
221+
214222
logger :: Trace IO a -> a -> IO ()
215223
logger
216224
| shouldLog = logInfo
@@ -273,12 +281,13 @@ data PrepareTxRes = PrepareTxRes
273281
prepareTx ::
274282
(MonadBaseControl IO m, MonadIO m) =>
275283
SyncEnv ->
284+
[[ExtendedTxOut]] ->
276285
DB.BlockId ->
277286
ApplyResult ->
278287
[(ByteString, Generic.TxOut)] ->
279288
(Word64, Generic.Tx) ->
280289
ExceptT SyncNodeError (ReaderT SqlBackend m) (PrepareTxRes, [(ByteString, Generic.TxOut)])
281-
prepareTx syncEnv blkId applyResult blockTxOuts (blockIndex, tx) = do
290+
prepareTx syncEnv txOutPrev blkId applyResult blockTxOuts (blockIndex, tx) = do
282291
let !txHash = Generic.txHash tx
283292
let !mdeposits = if not (Generic.txValidContract tx) then Just (Coin 0) else lookupDepositsMap txHash (apDepositsMap applyResult)
284293
let !outSum = fromIntegral $ unCoin $ Generic.txOutSum tx
@@ -293,7 +302,7 @@ prepareTx syncEnv blkId applyResult blockTxOuts (blockIndex, tx) = do
293302
pure (resolvedInputsDB, fees, unCoin <$> mdeposits)
294303
(_, Nothing) -> do
295304
-- Nothing in fees means a phase 2 failure
296-
(resolvedInsFull, amounts) <- splitLast <$> mapM (resolveTxInputsValue blockTxOuts) (Generic.txInputs tx)
305+
(resolvedInsFull, amounts) <- splitLast <$> mapM (resolveTxInputsValue txOutPrev blockTxOuts) (Generic.txInputs tx)
297306
let !inSum = sum $ map unDbLovelace $ catMaybes amounts
298307
!diffSum = if inSum >= outSum then inSum - outSum else 0
299308
!fees = maybe diffSum (fromIntegral . unCoin) (Generic.txFees tx)
@@ -318,6 +327,7 @@ prepareTx syncEnv blkId applyResult blockTxOuts (blockIndex, tx) = do
318327
insertTx ::
319328
(MonadBaseControl IO m, MonadIO m) =>
320329
SyncEnv ->
330+
[[ExtendedTxOut]] ->
321331
DB.BlockId ->
322332
IsPoolMember ->
323333
EpochNo ->
@@ -326,14 +336,14 @@ insertTx ::
326336
BlockGroupedData ->
327337
(DB.TxId, Generic.Tx, PrepareTxRes) ->
328338
ExceptT SyncNodeError (ReaderT SqlBackend m) BlockGroupedData
329-
insertTx syncEnv blkId isMember epochNo slotNo applyResult grouped (txId, tx, ptr) = do
339+
insertTx syncEnv txOutPrev blkId isMember epochNo slotNo applyResult grouped (txId, tx, ptr) = do
330340
let !txHash = Generic.txHash tx
331341
disInOut <- liftIO $ getDisableInOutState syncEnv
332342
if not (Generic.txValidContract tx)
333343
then do
334344
!txOutsGrouped <- mapM (prepareTxOut tracer cache iopts (txId, txHash)) (Generic.txOutputs tx)
335345

336-
!txIns <- mapM (prepareTxIn txId (fst <$> groupedTxOut grouped) Map.empty) (ptrResolvedTxIn ptr)
346+
!txIns <- mapM (prepareTxIn txId groups Map.empty) (ptrResolvedTxIn ptr)
337347
-- There is a custom semigroup instance for BlockGroupedData which uses addition for the values `fees` and `outSum`.
338348
-- Same happens bellow on last line of this function.
339349
pure (grouped <> BlockGroupedData txIns txOutsGrouped [] [] (ptrFees ptr) (ptrOutSum ptr))
@@ -346,7 +356,7 @@ insertTx syncEnv blkId isMember epochNo slotNo applyResult grouped (txId, tx, pt
346356
Map.fromList
347357
<$> whenFalseMempty
348358
(ioPlutusExtra iopts)
349-
(mapM (insertRedeemer tracer disInOut (fst <$> groupedTxOut grouped) txId) (Generic.txRedeemer tx))
359+
(mapM (insertRedeemer tracer disInOut groups txId) (Generic.txRedeemer tx))
350360

351361
when (ioPlutusExtra iopts) $ do
352362
mapM_ (insertDatum tracer cache txId) (Generic.txData tx)
@@ -390,13 +400,15 @@ insertTx syncEnv blkId isMember epochNo slotNo applyResult grouped (txId, tx, pt
390400
mapM_ (insertGovActionProposal cache blkId txId (getGovExpiresAt applyResult epochNo)) $ zip [0 ..] (Generic.txProposalProcedure tx)
391401
mapM_ (insertVotingProcedures tracer cache txId) (Generic.txVotingProcedure tx)
392402

393-
!txIns <- mapM (prepareTxIn txId (fst <$> groupedTxOut grouped) redeemers) (ptrResolvedTxIn ptr)
403+
!txIns <- mapM (prepareTxIn txId groups redeemers) (ptrResolvedTxIn ptr)
394404
pure (grouped <> BlockGroupedData txIns txOutsGrouped txMetadata maTxMint (ptrFees ptr) (ptrOutSum ptr))
395405
where
396406
tracer = getTrace syncEnv
397407
cache = envCache syncEnv
398408
iopts = getInsertOptions syncEnv
399409

410+
groups = (fst <$> groupedTxOut grouped) : txOutPrev
411+
400412
prepareTxOut ::
401413
(MonadBaseControl IO m, MonadIO m) =>
402414
Trace IO Text ->
@@ -481,15 +493,15 @@ insertCollateralTxOut tracer cache iopts (txId, _txHash) (Generic.TxOut index ad
481493
prepareTxIn ::
482494
Monad m =>
483495
DB.TxId ->
484-
[ExtendedTxOut] ->
496+
[[ExtendedTxOut]] ->
485497
Map Word64 DB.RedeemerId ->
486498
(Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId) ->
487499
ExceptT SyncNodeError m ExtendedTxIn
488500
prepareTxIn txInId groupedOutputs redeemers (txIn, mtxOutId, mTxOutId) = do
489501
txOutId <- liftLookupFail "resolveScriptHash" $
490502
case mtxOutId of
491503
Just txOutId -> pure $ Right txOutId
492-
Nothing -> case resolveInMemory txIn groupedOutputs of
504+
Nothing -> case resolveInMemoryMany txIn groupedOutputs of
493505
Nothing -> pure $ Left $ DB.DbLookupTxHash (Generic.txInHash txIn)
494506
Just txOut -> pure $ Right $ DB.txOutTxId $ etoTxOut txOut
495507
let txInDB =
@@ -1138,7 +1150,7 @@ insertRedeemer ::
11381150
(MonadBaseControl IO m, MonadIO m) =>
11391151
Trace IO Text ->
11401152
Bool ->
1141-
[ExtendedTxOut] ->
1153+
[[ExtendedTxOut]] ->
11421154
DB.TxId ->
11431155
(Word64, Generic.TxRedeemer) ->
11441156
ExceptT SyncNodeError (ReaderT SqlBackend m) (Word64, DB.RedeemerId)

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Grouped.hs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ module Cardano.DbSync.Era.Shelley.Insert.Grouped (
1313
resolveTxInputsValue,
1414
resolveScriptHash,
1515
resolveInMemory,
16+
resolveInMemoryMany,
1617
mkmaTxOuts,
1718
) where
1819

@@ -172,18 +173,21 @@ resolveTxInputs hasConsumed txIn = do
172173
-- This happens the input consumes an output introduced in the same block.
173174
resolveTxInputsValue ::
174175
MonadIO m =>
176+
[[ExtendedTxOut]] ->
175177
[(ByteString, Generic.TxOut)] ->
176178
Generic.TxIn ->
177179
ExceptT SyncNodeError (ReaderT SqlBackend m) (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace)
178-
resolveTxInputsValue blockTxOuts txIn =
180+
resolveTxInputsValue txOutPrev blockTxOuts txIn =
179181
liftLookupFail ("resolveTxInputsValue " <> textShow txIn <> " ") $ do
180182
qres <- fmap convertFoundAll <$> resolveInputTxOutIdValue txIn
181183
case qres of
182184
Right ret -> pure $ Right ret
183185
Left err ->
184186
case resolveInMemory' txIn blockTxOuts of
185-
Nothing -> pure $ Left err
186187
Just txOut -> pure $ Right $ convertFoundValue $ DB.DbLovelace $ fromIntegral $ unCoin $ Generic.txOutAdaValue txOut
188+
Nothing -> case resolveInMemoryMany txIn txOutPrev of
189+
Nothing -> pure $ Left err
190+
Just txOut -> pure $ Right $ convertFoundValue $ DB.txOutValue $ etoTxOut txOut
187191
where
188192
convertFoundAll :: (DB.TxId, DB.TxOutId, DbLovelace) -> (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace)
189193
convertFoundAll (txId, txOutId, lovelace) = (txIn, Just txId, Right txOutId, Just lovelace)
@@ -208,7 +212,7 @@ resolveRemainingInputs etis mp =
208212

209213
resolveScriptHash ::
210214
(MonadBaseControl IO m, MonadIO m) =>
211-
[ExtendedTxOut] ->
215+
[[ExtendedTxOut]] ->
212216
Generic.TxIn ->
213217
ExceptT SyncNodeError (ReaderT SqlBackend m) (Maybe ByteString)
214218
resolveScriptHash groupedOutputs txIn =
@@ -217,7 +221,7 @@ resolveScriptHash groupedOutputs txIn =
217221
case qres of
218222
Right ret -> pure $ Right ret
219223
Left err ->
220-
case resolveInMemory txIn groupedOutputs of
224+
case resolveInMemoryMany txIn groupedOutputs of
221225
Nothing -> pure $ Left err
222226
Just eutxo -> pure $ Right $ DB.txOutPaymentCred $ etoTxOut eutxo
223227

@@ -239,6 +243,18 @@ matches' txIn (txHash, txOut) =
239243
Generic.txInHash txIn == txHash
240244
&& Generic.txInIndex txIn == Generic.txOutIndex txOut
241245

246+
resolveInMemoryMany :: Generic.TxIn -> [[ExtendedTxOut]] -> Maybe ExtendedTxOut
247+
resolveInMemoryMany txIn =
248+
findMapMaybe (resolveInMemory txIn)
249+
where
250+
findMapMaybe :: (a -> Maybe b) -> [a] -> Maybe b
251+
findMapMaybe f = go
252+
where
253+
go [] = Nothing
254+
go (a : as) = case f a of
255+
Nothing -> go as
256+
Just b -> Just b
257+
242258
minimumMaybe :: (Ord a, Foldable f) => f a -> Maybe a
243259
minimumMaybe xs
244260
| null xs = Nothing

0 commit comments

Comments
 (0)