Skip to content

Commit bb1e64a

Browse files
committed
Insert tx in bulk
1 parent 738c598 commit bb1e64a

File tree

4 files changed

+143
-76
lines changed

4 files changed

+143
-76
lines changed

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs

Lines changed: 69 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
{-# LANGUAGE RankNTypes #-}
88
{-# LANGUAGE ScopedTypeVariables #-}
99
{-# LANGUAGE TypeFamilies #-}
10+
{-# LANGUAGE TupleSections #-}
1011
{-# LANGUAGE NoImplicitPrelude #-}
1112

1213
module Cardano.DbSync.Era.Shelley.Insert (
@@ -139,8 +140,12 @@ insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details is
139140
}
140141

141142
let zippedTx = zip [0 ..] (Generic.blkTxs blk)
142-
let txInserter = insertTx syncEnv isMember blkId (sdEpochNo details) (Generic.blkSlotNo blk) applyResult
143-
blockGroupedData <- foldM (\gp (idx, tx) -> txInserter idx tx gp) mempty zippedTx
143+
144+
txsPrepared <- foldAndAccM (prepareTx syncEnv blkId applyResult) zippedTx
145+
txIds <- lift $ DB.insertManyTx (ptrTxDb <$> txsPrepared)
146+
let txInserter = insertTx syncEnv blkId isMember (sdEpochNo details) (Generic.blkSlotNo blk) applyResult
147+
let newZip = zipWith3 (\tx txId ptr -> (txId, tx, ptr)) (Generic.blkTxs blk) txIds txsPrepared
148+
blockGroupedData <- foldM txInserter mempty newZip
144149
minIds <- insertBlockGroupedData syncEnv blockGroupedData
145150

146151
-- now that we've inserted the Block and all it's txs lets cache what we'll need
@@ -258,52 +263,44 @@ insertOnNewEpoch tracer iopts blkId slotNo epochNo newEpoch = do
258263

259264
-- -----------------------------------------------------------------------------
260265

261-
insertTx ::
266+
data PrepareTxRes = PrepareTxRes
267+
{ ptrTxDb :: DB.Tx
268+
, ptrFees :: Word64
269+
, ptrOutSum :: Word64
270+
, ptrResolvedTxIn :: [(Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId)]
271+
}
272+
273+
prepareTx ::
262274
(MonadBaseControl IO m, MonadIO m) =>
263275
SyncEnv ->
264-
IsPoolMember ->
265276
DB.BlockId ->
266-
EpochNo ->
267-
SlotNo ->
268277
ApplyResult ->
269-
Word64 ->
270-
Generic.Tx ->
271-
BlockGroupedData ->
272-
ExceptT SyncNodeError (ReaderT SqlBackend m) BlockGroupedData
273-
insertTx syncEnv isMember blkId epochNo slotNo applyResult blockIndex tx grouped = do
278+
[(ByteString, Generic.TxOut)] ->
279+
(Word64, Generic.Tx) ->
280+
ExceptT SyncNodeError (ReaderT SqlBackend m) (PrepareTxRes, [(ByteString, Generic.TxOut)])
281+
prepareTx syncEnv blkId applyResult blockTxOuts (blockIndex, tx) = do
274282
let !txHash = Generic.txHash tx
275283
let !mdeposits = if not (Generic.txValidContract tx) then Just (Coin 0) else lookupDepositsMap txHash (apDepositsMap applyResult)
276284
let !outSum = fromIntegral $ unCoin $ Generic.txOutSum tx
277-
!withdrawalSum = fromIntegral $ unCoin $ Generic.txWithdrawalSum tx
278285
hasConsumed = getHasConsumedOrPruneTxOut syncEnv
279286
disInOut <- liftIO $ getDisableInOutState syncEnv
280287
-- In some txs and with specific configuration we may be able to find necessary data within the tx body.
281288
-- In these cases we can avoid expensive queries.
282-
(resolvedInputs, fees', deposits) <- case (disInOut, mdeposits, unCoin <$> Generic.txFees tx) of
283-
(True, _, _) -> pure ([], 0, unCoin <$> mdeposits)
284-
(_, Just deposits, Just fees) -> do
285-
(resolvedInputs, _) <- splitLast <$> mapM (resolveTxInputs hasConsumed False (fst <$> groupedTxOut grouped)) (Generic.txInputs tx)
286-
pure (resolvedInputs, fees, Just (unCoin deposits))
287-
(_, Nothing, Just fees) -> do
288-
(resolvedInputs, amounts) <- splitLast <$> mapM (resolveTxInputs hasConsumed False (fst <$> groupedTxOut grouped)) (Generic.txInputs tx)
289-
if any isNothing amounts
290-
then pure (resolvedInputs, fees, Nothing)
291-
else
292-
let !inSum = sum $ map unDbLovelace $ catMaybes amounts
293-
in pure (resolvedInputs, fees, Just $ fromIntegral (inSum + withdrawalSum) - fromIntegral outSum - fromIntegral fees)
294-
(_, _, Nothing) -> do
289+
(resolvedInputs, fees', deposits) <- case (disInOut, unCoin <$> Generic.txFees tx) of
290+
(True, _) -> pure ([], 0, unCoin <$> mdeposits)
291+
(_, Just fees) -> do
292+
resolvedInputsDB <- lift $ mapM (resolveTxInputs hasConsumed) (Generic.txInputs tx)
293+
pure (resolvedInputsDB, fees, unCoin <$> mdeposits)
294+
(_, Nothing) -> do
295295
-- Nothing in fees means a phase 2 failure
296-
(resolvedInsFull, amounts) <- splitLast <$> mapM (resolveTxInputs hasConsumed True (fst <$> groupedTxOut grouped)) (Generic.txInputs tx)
296+
(resolvedInsFull, amounts) <- splitLast <$> mapM (resolveTxInputsValue blockTxOuts) (Generic.txInputs tx)
297297
let !inSum = sum $ map unDbLovelace $ catMaybes amounts
298298
!diffSum = if inSum >= outSum then inSum - outSum else 0
299299
!fees = maybe diffSum (fromIntegral . unCoin) (Generic.txFees tx)
300300
pure (resolvedInsFull, fromIntegral fees, Just 0)
301301
let fees = fromIntegral fees'
302302
-- Insert transaction and get txId from the DB.
303-
!txId <-
304-
lift
305-
. DB.insertTx
306-
$ DB.Tx
303+
let txDb = DB.Tx
307304
{ DB.txHash = txHash
308305
, DB.txBlockId = blkId
309306
, DB.txBlockIndex = blockIndex
@@ -316,15 +313,30 @@ insertTx syncEnv isMember blkId epochNo slotNo applyResult blockIndex tx grouped
316313
, DB.txValidContract = Generic.txValidContract tx
317314
, DB.txScriptSize = sum $ Generic.txScriptSizes tx
318315
}
316+
pure (PrepareTxRes txDb fees outSum resolvedInputs, blockTxOuts <> ((txHash,) <$> Generic.txOutputs tx))
319317

318+
insertTx ::
319+
(MonadBaseControl IO m, MonadIO m) =>
320+
SyncEnv ->
321+
DB.BlockId ->
322+
IsPoolMember ->
323+
EpochNo ->
324+
SlotNo ->
325+
ApplyResult ->
326+
BlockGroupedData ->
327+
(DB.TxId, Generic.Tx, PrepareTxRes) ->
328+
ExceptT SyncNodeError (ReaderT SqlBackend m) BlockGroupedData
329+
insertTx syncEnv blkId isMember epochNo slotNo applyResult grouped (txId, tx, ptr) = do
330+
let !txHash = Generic.txHash tx
331+
disInOut <- liftIO $ getDisableInOutState syncEnv
320332
if not (Generic.txValidContract tx)
321333
then do
322334
!txOutsGrouped <- mapM (prepareTxOut tracer cache iopts (txId, txHash)) (Generic.txOutputs tx)
323335

324-
let !txIns = map (prepareTxIn txId Map.empty) resolvedInputs
336+
!txIns <- mapM (prepareTxIn txId (fst <$> groupedTxOut grouped) Map.empty) (ptrResolvedTxIn ptr)
325337
-- There is a custom semigroup instance for BlockGroupedData which uses addition for the values `fees` and `outSum`.
326338
-- Same happens bellow on last line of this function.
327-
pure (grouped <> BlockGroupedData txIns txOutsGrouped [] [] fees outSum)
339+
pure (grouped <> BlockGroupedData txIns txOutsGrouped [] [] (ptrFees ptr) (ptrOutSum ptr))
328340
else do
329341
-- The following operations only happen if the script passes stage 2 validation (or the tx has
330342
-- no script).
@@ -378,8 +390,8 @@ insertTx syncEnv isMember blkId epochNo slotNo applyResult blockIndex tx grouped
378390
mapM_ (insertGovActionProposal cache blkId txId (getGovExpiresAt applyResult epochNo)) $ zip [0 ..] (Generic.txProposalProcedure tx)
379391
mapM_ (insertVotingProcedures tracer cache txId) (Generic.txVotingProcedure tx)
380392

381-
let !txIns = map (prepareTxIn txId redeemers) resolvedInputs
382-
pure (grouped <> BlockGroupedData txIns txOutsGrouped txMetadata maTxMint fees outSum)
393+
!txIns <- mapM (prepareTxIn txId (fst <$> groupedTxOut grouped) redeemers) (ptrResolvedTxIn ptr)
394+
pure (grouped <> BlockGroupedData txIns txOutsGrouped txMetadata maTxMint (ptrFees ptr) (ptrOutSum ptr))
383395
where
384396
tracer = getTrace syncEnv
385397
cache = envCache syncEnv
@@ -467,23 +479,31 @@ insertCollateralTxOut tracer cache iopts (txId, _txHash) (Generic.TxOut index ad
467479
hasScript = maybe False Generic.hasCredScript (Generic.getPaymentCred addr)
468480

469481
prepareTxIn ::
482+
Monad m =>
470483
DB.TxId ->
484+
[ExtendedTxOut] ->
471485
Map Word64 DB.RedeemerId ->
472-
(Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId) ->
473-
ExtendedTxIn
474-
prepareTxIn txInId redeemers (txIn, txOutId, mTxOutId) =
475-
ExtendedTxIn
476-
{ etiTxIn = txInDB
477-
, etiTxOutId = mTxOutId
478-
}
479-
where
480-
txInDB =
481-
DB.TxIn
482-
{ DB.txInTxInId = txInId
483-
, DB.txInTxOutId = txOutId
484-
, DB.txInTxOutIndex = fromIntegral $ Generic.txInIndex txIn
485-
, DB.txInRedeemerId = mlookup (Generic.txInRedeemerIndex txIn) redeemers
486-
}
486+
(Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId) ->
487+
ExceptT SyncNodeError m ExtendedTxIn
488+
prepareTxIn txInId groupedOutputs redeemers (txIn, mtxOutId, mTxOutId) = do
489+
txOutId <- liftLookupFail "resolveScriptHash" $
490+
case mtxOutId of
491+
Just txOutId -> pure $ Right txOutId
492+
Nothing -> case resolveInMemory txIn groupedOutputs of
493+
Nothing -> pure $ Left $ DB.DbLookupTxHash (Generic.txInHash txIn)
494+
Just txOut -> pure $ Right $ DB.txOutTxId $ etoTxOut txOut
495+
let txInDB =
496+
DB.TxIn
497+
{ DB.txInTxInId = txInId
498+
, DB.txInTxOutId = txOutId
499+
, DB.txInTxOutIndex = fromIntegral $ Generic.txInIndex txIn
500+
, DB.txInRedeemerId = mlookup (Generic.txInRedeemerIndex txIn) redeemers
501+
}
502+
pure
503+
ExtendedTxIn
504+
{ etiTxIn = txInDB
505+
, etiTxOutId = mTxOutId
506+
}
487507

488508
insertCollateralTxIn ::
489509
(MonadBaseControl IO m, MonadIO m) =>

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Grouped.hs

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ module Cardano.DbSync.Era.Shelley.Insert.Grouped (
1010
insertBlockGroupedData,
1111
insertReverseIndex,
1212
resolveTxInputs,
13+
resolveTxInputsValue,
1314
resolveScriptHash,
15+
resolveInMemory,
1416
mkmaTxOuts,
1517
) where
1618

@@ -23,6 +25,7 @@ import qualified Cardano.DbSync.Era.Shelley.Generic as Generic
2325
import Cardano.DbSync.Era.Shelley.Query
2426
import Cardano.DbSync.Era.Util
2527
import Cardano.DbSync.Error
28+
import Cardano.Ledger.Coin (Coin (..))
2629
import Cardano.Prelude
2730
import Control.Monad.Trans.Control (MonadBaseControl)
2831
import qualified Data.List as List
@@ -140,41 +143,53 @@ insertReverseIndex blockId minIds =
140143
, DB.reverseIndexMinIds = minIdsToText minIds
141144
}
142145

143-
-- | If we can't resolve from the db, we fall back to the provided outputs
146+
-- | If we can't resolve from the db, we return nothing.
144147
-- This happens the input consumes an output introduced in the same block.
145148
resolveTxInputs ::
146149
MonadIO m =>
147150
Bool ->
148-
Bool ->
149-
[ExtendedTxOut] ->
150151
Generic.TxIn ->
151-
ExceptT SyncNodeError (ReaderT SqlBackend m) (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace)
152-
resolveTxInputs hasConsumed needsValue groupedOutputs txIn =
153-
liftLookupFail ("resolveTxInputs " <> textShow txIn <> " ") $ do
152+
ReaderT SqlBackend m (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId)
153+
resolveTxInputs hasConsumed txIn = do
154154
qres <-
155-
case (hasConsumed, needsValue) of
156-
(_, True) -> fmap convertFoundAll <$> resolveInputTxOutIdValue txIn
157-
(False, _) -> fmap convertnotFound <$> resolveInputTxId txIn
158-
(True, False) -> fmap convertFoundTxOutId <$> resolveInputTxOutId txIn
155+
if hasConsumed
156+
then fmap convertFoundTxOutId <$> resolveInputTxOutId txIn
157+
else fmap convertFoundTxId <$> resolveInputTxId txIn
159158
case qres of
160-
Right ret -> pure $ Right ret
161-
Left err ->
162-
case (resolveInMemory txIn groupedOutputs, hasConsumed, needsValue) of
163-
(Nothing, _, _) -> pure $ Left err
164-
(Just eutxo, True, True) -> pure $ Right $ convertFoundValue (DB.txOutTxId (etoTxOut eutxo), DB.txOutValue (etoTxOut eutxo))
165-
(Just eutxo, _, _) -> pure $ Right $ convertnotFound $ DB.txOutTxId (etoTxOut eutxo)
159+
Right ret -> pure ret
160+
Left _ -> pure foundNothing
166161
where
167-
convertnotFound :: DB.TxId -> (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace)
168-
convertnotFound txId = (txIn, txId, Left txIn, Nothing)
162+
convertFoundTxId :: DB.TxId -> (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId)
163+
convertFoundTxId txId = (txIn, Just txId, Left txIn)
169164

170-
convertFoundTxOutId :: (DB.TxId, DB.TxOutId) -> (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace)
171-
convertFoundTxOutId (txId, txOutId) = (txIn, txId, Right txOutId, Nothing)
165+
convertFoundTxOutId :: (DB.TxId, DB.TxOutId) -> (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId)
166+
convertFoundTxOutId (txId, txOutId) = (txIn, Just txId, Right txOutId)
172167

173-
convertFoundValue :: (DB.TxId, DbLovelace) -> (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace)
174-
convertFoundValue (txId, lovelace) = (txIn, txId, Left txIn, Just lovelace)
168+
foundNothing :: (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId)
169+
foundNothing = (txIn, Nothing, Left txIn)
175170

176-
convertFoundAll :: (DB.TxId, DB.TxOutId, DbLovelace) -> (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace)
177-
convertFoundAll (txId, txOutId, lovelace) = (txIn, txId, Right txOutId, Just lovelace)
171+
-- | If we can't resolve from the db, we fall back to the provided outputs
172+
-- This happens the input consumes an output introduced in the same block.
173+
resolveTxInputsValue ::
174+
MonadIO m =>
175+
[(ByteString, Generic.TxOut)] ->
176+
Generic.TxIn ->
177+
ExceptT SyncNodeError (ReaderT SqlBackend m) (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace)
178+
resolveTxInputsValue blockTxOuts txIn =
179+
liftLookupFail ("resolveTxInputsValue " <> textShow txIn <> " ") $ do
180+
qres <- fmap convertFoundAll <$> resolveInputTxOutIdValue txIn
181+
case qres of
182+
Right ret -> pure $ Right ret
183+
Left err ->
184+
case resolveInMemory' txIn blockTxOuts of
185+
Nothing -> pure $ Left err
186+
Just txOut -> pure $ Right $ convertFoundValue $ DB.DbLovelace $ fromIntegral $ unCoin $ Generic.txOutAdaValue txOut
187+
where
188+
convertFoundAll :: (DB.TxId, DB.TxOutId, DbLovelace) -> (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace)
189+
convertFoundAll (txId, txOutId, lovelace) = (txIn, Just txId, Right txOutId, Just lovelace)
190+
191+
convertFoundValue :: DbLovelace -> (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace)
192+
convertFoundValue lovelace = (txIn, Nothing, Left txIn, Just lovelace)
178193

179194
resolveRemainingInputs ::
180195
MonadIO m =>
@@ -215,6 +230,15 @@ matches txIn eutxo =
215230
Generic.txInHash txIn == etoTxHash eutxo
216231
&& Generic.txInIndex txIn == DB.txOutIndex (etoTxOut eutxo)
217232

233+
resolveInMemory' :: Generic.TxIn -> [(ByteString, Generic.TxOut)] -> Maybe Generic.TxOut
234+
resolveInMemory' txIn txOuts =
235+
snd <$> List.find (matches' txIn) txOuts
236+
237+
matches' :: Generic.TxIn -> (ByteString, Generic.TxOut) -> Bool
238+
matches' txIn (txHash, txOut) =
239+
Generic.txInHash txIn == txHash
240+
&& Generic.txInIndex txIn == Generic.txOutIndex txOut
241+
218242
minimumMaybe :: (Ord a, Foldable f) => f a -> Maybe a
219243
minimumMaybe xs
220244
| null xs = Nothing

cardano-db-sync/src/Cardano/DbSync/Util.hs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
{-# LANGUAGE FlexibleContexts #-}
22
{-# LANGUAGE OverloadedStrings #-}
3+
{-# LANGUAGE RankNTypes #-}
4+
{-# LANGUAGE ScopedTypeVariables #-}
35
{-# LANGUAGE TypeApplications #-}
46
{-# LANGUAGE TypeFamilies #-}
57
{-# LANGUAGE NoImplicitPrelude #-}
@@ -23,12 +25,14 @@ module Cardano.DbSync.Util (
2325
textPrettyShow,
2426
textShow,
2527
third,
26-
thrd3,
28+
first3,
2729
forth4,
30+
zipWith3,
2831
splitLast,
2932
traverseMEither,
3033
whenStrictJust,
3134
whenMaybe,
35+
foldAndAccM,
3236
mlookup,
3337
whenRight,
3438
whenFalseEmpty,
@@ -185,15 +189,30 @@ whenMaybe :: Monad m => Maybe a -> (a -> m b) -> m (Maybe b)
185189
whenMaybe (Just a) f = Just <$> f a
186190
whenMaybe Nothing _f = pure Nothing
187191

192+
foldAndAccM :: forall a b c t m. (Foldable t, Monad m) => ([c] -> a -> m (b, [c])) -> t a -> m [b]
193+
foldAndAccM f as = reverse . snd <$> foldM g ([], []) as
194+
where
195+
g :: ([c], [b]) -> a -> m ([c], [b])
196+
g (cs, bs) a = do
197+
(b, cs') <- f cs a
198+
pure (cs <> cs', b : bs)
199+
188200
third :: (a, b, c) -> c
189201
third (_, _, c) = c
190202

191-
thrd3 :: (a, b, c, d) -> c
192-
thrd3 (_, _, c, _) = c
203+
first3 :: (a, b, c) -> a
204+
first3 (a, _, _) = a
193205

194206
forth4 :: (a, b, c, d) -> d
195207
forth4 (_, _, _, d) = d
196208

209+
{-# NOINLINE [1] zipWith3 #-}
210+
zipWith3 :: (a -> b -> c -> d) -> [a]-> [b] -> [c] -> [d]
211+
zipWith3 z = go
212+
where
213+
go (a:as) (b:bs) (c:cs) = z a b c : go as bs cs
214+
go _ _ _ = []
215+
197216
splitLast :: [(a, b, c, d)] -> ([(a, b, c)], [d])
198217
splitLast = unzip . fmap (\(a, b, c, d) -> ((a, b, c), d))
199218

cardano-db/src/Cardano/Db/Insert.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ module Cardano.Db.Insert (
4040
insertStakeRegistration,
4141
insertTreasury,
4242
insertTx,
43+
insertManyTx,
4344
insertTxIn,
4445
insertManyTxMint,
4546
insertManyTxMetadata,
@@ -268,6 +269,9 @@ insertTreasury = insertUnchecked "Treasury"
268269
insertTx :: (MonadBaseControl IO m, MonadIO m) => Tx -> ReaderT SqlBackend m TxId
269270
insertTx tx = insertUnchecked ("Tx: " ++ show (BS.length (txHash tx))) tx
270271

272+
insertManyTx :: (MonadBaseControl IO m, MonadIO m) => [Tx] -> ReaderT SqlBackend m [TxId]
273+
insertManyTx = insertMany' "Txs"
274+
271275
insertTxIn :: (MonadBaseControl IO m, MonadIO m) => TxIn -> ReaderT SqlBackend m TxInId
272276
insertTxIn = insertUnchecked "TxIn"
273277

0 commit comments

Comments
 (0)