Skip to content

Commit f434016

Browse files
committed
complete consumedTxOut and start changing tests/smash
1 parent f1bb0df commit f434016

34 files changed

+2063
-835
lines changed

cardano-db-sync/cardano-db-sync.cabal

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ library
5454
Cardano.DbSync.Config.Shelley
5555
Cardano.DbSync.Config.Types
5656
Cardano.DbSync.Database
57-
Cardano.DbSync.DbAction
57+
Cardano.DbSync.DbEvent
5858
Cardano.DbSync.Error
5959

6060
Cardano.DbSync.Era

cardano-db-sync/src/Cardano/DbSync.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import Cardano.DbSync.Config (configureLogging)
4646
import Cardano.DbSync.Config.Cardano
4747
import Cardano.DbSync.Config.Types
4848
import Cardano.DbSync.Database
49-
import Cardano.DbSync.DbAction
49+
import Cardano.DbSync.DbEvent
5050
import Cardano.DbSync.Era
5151
import Cardano.DbSync.Error
5252
import Cardano.DbSync.Ledger.State
@@ -198,7 +198,7 @@ runSyncNode metricsSetters trce iomgr dbConnSetting runMigrationFnc syncNodeConf
198198
when (not isJsonbInSchema && not removeJsonbFromSchemaConfig) $ do
199199
liftIO $ logWarning trce "Adding jsonb datatypes back to the database. This can take time."
200200
liftIO $ runAddJsonbToSchema syncEnv
201-
liftIO $ runExtraMigrationsMaybe syncEnv
201+
liftIO $ runConsumedTxOutMigrationsMaybe syncEnv
202202
unless useLedger $ liftIO $ do
203203
logInfo trce "Migrating to a no ledger schema"
204204
Db.noLedgerMigrations pool trce

cardano-db-sync/src/Cardano/DbSync/Api.hs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ module Cardano.DbSync.Api (
1717
getRanIndexes,
1818
runIndexMigrations,
1919
initPruneConsumeMigration,
20-
runExtraMigrationsMaybe,
20+
runConsumedTxOutMigrationsMaybe,
2121
runAddJsonbToSchema,
2222
runRemoveJsonbFromSchema,
2323
getSafeBlockNoDiff,
@@ -151,13 +151,13 @@ initPruneConsumeMigration consumed pruneTxOut bootstrap forceTxIn' =
151151
getPruneConsume :: SyncEnv -> DB.PruneConsumeMigration
152152
getPruneConsume = soptPruneConsumeMigration . envOptions
153153

154-
runExtraMigrationsMaybe :: SyncEnv -> IO ()
155-
runExtraMigrationsMaybe syncEnv = do
154+
runConsumedTxOutMigrationsMaybe :: SyncEnv -> IO ()
155+
runConsumedTxOutMigrationsMaybe syncEnv = do
156156
let pcm = getPruneConsume syncEnv
157157
txOutTableType = getTxOutTableType syncEnv
158-
logInfo (getTrace syncEnv) $ "runExtraMigrationsMaybe: " <> textShow pcm
158+
logInfo (getTrace syncEnv) $ "runConsumedTxOutMigrationsMaybe: " <> textShow pcm
159159
DB.runDbIohkNoLogging (envDbEnv syncEnv) $
160-
DB.runExtraMigrations
160+
DB.runConsumedTxOutMigrations
161161
(getTrace syncEnv)
162162
txOutTableType
163163
(getSafeBlockNoDiff syncEnv)
@@ -170,7 +170,7 @@ runAddJsonbToSchema syncEnv =
170170
runRemoveJsonbFromSchema ::
171171
(MonadIO m, AsDbError e) =>
172172
SyncEnv ->
173-
DbAction e m ()
173+
DbEvent e m ()
174174
runRemoveJsonbFromSchema syncEnv = do
175175
DB.runDbT DB.Write transx
176176
where

cardano-db-sync/src/Cardano/DbSync/Database.hs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@
44
{-# LANGUAGE NoImplicitPrelude #-}
55

66
module Cardano.DbSync.Database (
7-
DbAction (..),
7+
DbEvent (..),
88
ThreadChannels,
9-
lengthDbActionQueue,
9+
lengthDbEventQueue,
1010
mkDbApply,
1111
runDbThread,
1212
) where
1313

1414
import Cardano.BM.Trace (logDebug, logError, logInfo)
1515
import Cardano.DbSync.Api
1616
import Cardano.DbSync.Api.Types (ConsistentLevel (..), LedgerEnv (..), SyncEnv (..))
17-
import Cardano.DbSync.DbAction
17+
import Cardano.DbSync.DbEvent
1818
import Cardano.DbSync.Default
1919
import Cardano.DbSync.Error
2020
import Cardano.DbSync.Ledger.State
@@ -53,7 +53,7 @@ runDbThread syncEnv metricsSetters queue = do
5353
-- Main loop to process the queue
5454
processQueue :: IO ()
5555
processQueue = do
56-
actions <- blockingFlushDbActionQueue queue
56+
actions <- blockingFlushDbEventQueue queue
5757

5858
-- Log the number of blocks being processed if there are multiple
5959
when (length actions > 1) $ do
@@ -65,7 +65,7 @@ runDbThread syncEnv metricsSetters queue = do
6565
Nothing -> processActions actions
6666

6767
-- Process a list of actions
68-
processActions :: [DbAction] -> IO ()
68+
processActions :: [DbEvent] -> IO ()
6969
processActions actions = do
7070
result <- runExceptT $ runActions syncEnv actions -- runActions is where we start inserting information we recieve from the node.
7171

@@ -108,7 +108,7 @@ runDbThread syncEnv metricsSetters queue = do
108108
-- where
109109
-- trce = getTrace syncEnv
110110
-- loop = do
111-
-- xs <- blockingFlushDbActionQueue queue
111+
-- xs <- blockingFlushDbEventQueue queue
112112

113113
-- when (length xs > 1) $ do
114114
-- logDebug trce $ "runDbThread: " <> textShow (length xs) <> " blocks"
@@ -136,19 +136,19 @@ runDbThread syncEnv metricsSetters queue = do
136136
-- atomically $ putTMVar resultVar (latestPoints, currentTip)
137137
-- loop
138138

139-
-- | Run the list of 'DbAction's. Block are applied in a single set (as a transaction)
139+
-- | Run the list of 'DbEvent's. Block are applied in a single set (as a transaction)
140140
-- and other operations are applied one-by-one.
141141
runActions ::
142142
SyncEnv ->
143-
[DbAction] ->
143+
[DbEvent] ->
144144
ExceptT SyncNodeError IO NextState
145145
runActions syncEnv actions = do
146-
dbAction Continue actions
146+
dbEvent Continue actions
147147
where
148-
dbAction :: NextState -> [DbAction] -> ExceptT SyncNodeError IO NextState
149-
dbAction next [] = pure next
150-
dbAction Done _ = pure Done
151-
dbAction Continue xs =
148+
dbEvent :: NextState -> [DbEvent] -> ExceptT SyncNodeError IO NextState
149+
dbEvent next [] = pure next
150+
dbEvent Done _ = pure Done
151+
dbEvent Continue xs =
152152
case spanDbApply xs of
153153
([], DbFinish : _) -> do
154154
pure Done
@@ -171,12 +171,12 @@ runActions syncEnv actions = do
171171
liftIO $ setConsistentLevel syncEnv DBAheadOfLedger
172172
blockNo <- lift $ getDbTipBlockNo syncEnv
173173
lift $ atomically $ putTMVar resultVar (points, blockNo)
174-
dbAction Continue ys
174+
dbEvent Continue ys
175175
(ys, zs) -> do
176176
newExceptT $ insertListBlocks syncEnv ys
177177
if null zs
178178
then pure Continue
179-
else dbAction Continue zs
179+
else dbEvent Continue zs
180180

181181
rollbackLedger :: SyncEnv -> CardanoPoint -> IO (Maybe [CardanoPoint])
182182
rollbackLedger syncEnv point =
@@ -238,14 +238,14 @@ validateConsistentLevel syncEnv stPoint = do
238238
, show cLevel
239239
]
240240

241-
-- | Split the DbAction list into a prefix containing blocks to apply and a postfix.
242-
spanDbApply :: [DbAction] -> ([CardanoBlock], [DbAction])
241+
-- | Split the DbEvent list into a prefix containing blocks to apply and a postfix.
242+
spanDbApply :: [DbEvent] -> ([CardanoBlock], [DbEvent])
243243
spanDbApply lst =
244244
case lst of
245245
(DbApplyBlock bt : xs) -> let (ys, zs) = spanDbApply xs in (bt : ys, zs)
246246
xs -> ([], xs)
247247

248-
hasRestart :: [DbAction] -> Maybe (StrictTMVar IO ([(CardanoPoint, Bool)], WithOrigin BlockNo))
248+
hasRestart :: [DbEvent] -> Maybe (StrictTMVar IO ([(CardanoPoint, Bool)], WithOrigin BlockNo))
249249
hasRestart = go
250250
where
251251
go [] = Nothing

cardano-db-sync/src/Cardano/DbSync/DbAction.hs renamed to cardano-db-sync/src/Cardano/DbSync/DbEvent.hs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
{-# LANGUAGE NoImplicitPrelude #-}
22

3-
module Cardano.DbSync.DbAction (
4-
DbAction (..),
3+
module Cardano.DbSync.DbEvent (
4+
DbEvent (..),
55
ThreadChannels (..),
6-
blockingFlushDbActionQueue,
7-
lengthDbActionQueue,
6+
blockingFlushDbEventQueue,
7+
lengthDbEventQueue,
88
mkDbApply,
99
newThreadChannels,
10-
writeDbActionQueue,
10+
writeDbEventQueue,
1111
waitRollback,
1212
waitRestartState,
1313
waitDoneInit,
@@ -23,34 +23,34 @@ import qualified Control.Concurrent.STM.TBQueue as TBQ
2323
import Ouroboros.Network.Block (BlockNo, Tip (..))
2424
import qualified Ouroboros.Network.Point as Point
2525

26-
data DbAction
26+
data DbEvent
2727
= DbApplyBlock !CardanoBlock
2828
| DbRollBackToPoint !CardanoPoint !(Tip CardanoBlock) (StrictTMVar IO (Maybe [CardanoPoint], Point.WithOrigin BlockNo))
2929
| DbRestartState (StrictTMVar IO ([(CardanoPoint, Bool)], Point.WithOrigin BlockNo))
3030
| DbFinish
3131

3232
data ThreadChannels = ThreadChannels
33-
{ tcQueue :: TBQueue DbAction
33+
{ tcQueue :: TBQueue DbEvent
3434
, tcDoneInit :: !(StrictTVar IO Bool)
3535
}
3636

37-
mkDbApply :: CardanoBlock -> DbAction
37+
mkDbApply :: CardanoBlock -> DbEvent
3838
mkDbApply = DbApplyBlock
3939

4040
-- | This simulates a synhronous operations, since the thread waits for the db
4141
-- worker thread to finish the rollback.
4242
waitRollback :: ThreadChannels -> CardanoPoint -> Tip CardanoBlock -> IO (Maybe [CardanoPoint], Point.WithOrigin BlockNo)
4343
waitRollback tc point serverTip = do
4444
resultVar <- newEmptyTMVarIO
45-
atomically $ writeDbActionQueue tc $ DbRollBackToPoint point serverTip resultVar
45+
atomically $ writeDbEventQueue tc $ DbRollBackToPoint point serverTip resultVar
4646
atomically $ takeTMVar resultVar
4747

4848
waitRestartState :: ThreadChannels -> IO ([(CardanoPoint, Bool)], Point.WithOrigin BlockNo)
4949
waitRestartState tc = do
5050
resultVar <- newEmptyTMVarIO
5151
atomically $ do
5252
_ <- TBQ.flushTBQueue (tcQueue tc)
53-
writeDbActionQueue tc $ DbRestartState resultVar
53+
writeDbEventQueue tc $ DbRestartState resultVar
5454
atomically $ takeTMVar resultVar
5555

5656
waitDoneInit :: ThreadChannels -> IO ()
@@ -68,8 +68,8 @@ runAndSetDone tc action = do
6868
atomically $ writeTVar (tcDoneInit tc) fl
6969
pure fl
7070

71-
lengthDbActionQueue :: ThreadChannels -> STM Natural
72-
lengthDbActionQueue = STM.lengthTBQueue . tcQueue
71+
lengthDbEventQueue :: ThreadChannels -> STM Natural
72+
lengthDbEventQueue = STM.lengthTBQueue . tcQueue
7373

7474
newThreadChannels :: IO ThreadChannels
7575
newThreadChannels =
@@ -81,15 +81,15 @@ newThreadChannels =
8181
<$> TBQ.newTBQueueIO 47
8282
<*> newTVarIO False
8383

84-
writeDbActionQueue :: ThreadChannels -> DbAction -> STM ()
85-
writeDbActionQueue = TBQ.writeTBQueue . tcQueue
84+
writeDbEventQueue :: ThreadChannels -> DbEvent -> STM ()
85+
writeDbEventQueue = TBQ.writeTBQueue . tcQueue
8686

8787
-- | Block if the queue is empty and if its not read/flush everything.
8888
-- Need this because `flushTBQueue` never blocks and we want to block until
8989
-- there is one item or more.
9090
-- Use this instead of STM.check to make sure it blocks if the queue is empty.
91-
blockingFlushDbActionQueue :: ThreadChannels -> IO [DbAction]
92-
blockingFlushDbActionQueue tc = do
91+
blockingFlushDbEventQueue :: ThreadChannels -> IO [DbEvent]
92+
blockingFlushDbEventQueue tc = do
9393
STM.atomically $ do
9494
x <- TBQ.readTBQueue $ tcQueue tc
9595
xs <- TBQ.flushTBQueue $ tcQueue tc

cardano-db-sync/src/Cardano/DbSync/Metrics.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ data Metrics = Metrics
3030
{ mNodeBlockHeight :: !Gauge
3131
-- ^ The block tip number of the remote node.
3232
, mDbQueueLength :: !Gauge
33-
-- ^ The number of @DbAction@ remaining for the database.
33+
-- ^ The number of @DbEvent@ remaining for the database.
3434
, mDbBlockHeight :: !Gauge
3535
-- ^ The block tip number in the database.
3636
, mDbSlotHeight :: !Gauge

cardano-db-sync/src/Cardano/DbSync/Sync.hs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import Cardano.DbSync.Api
3131
import Cardano.DbSync.Api.Types (ConsistentLevel (..), LedgerEnv (..), SyncEnv (..), envLedgerEnv, envNetworkMagic, envOptions)
3232
import Cardano.DbSync.Config
3333
import Cardano.DbSync.Database
34-
import Cardano.DbSync.DbAction
34+
import Cardano.DbSync.DbEvent
3535
import Cardano.DbSync.LocalStateQuery
3636
import Cardano.DbSync.Metrics
3737
import Cardano.DbSync.Tracing.ToObjectOrphans ()
@@ -221,7 +221,7 @@ dbSyncProtocols syncEnv metricsSetters tc codecConfig version bversion =
221221
( chainSyncClientPeerPipelined $
222222
chainSyncClient metricsSetters tracer (fst <$> latestPoints) currentTip tc
223223
)
224-
atomically $ writeDbActionQueue tc DbFinish
224+
atomically $ writeDbEventQueue tc DbFinish
225225
-- We should return leftover bytes returned by 'runPipelinedPeer', but
226226
-- client application do not care about them (it's only important if one
227227
-- would like to restart a protocol on the same mux and thus bearer).
@@ -350,8 +350,8 @@ chainSyncClient metricsSetters trce latestPoints currentTip tc = do
350350
setNodeBlockHeight metricsSetters (getTipBlockNo tip)
351351

352352
newSize <- atomically $ do
353-
writeDbActionQueue tc $ mkDbApply blk
354-
lengthDbActionQueue tc
353+
writeDbEventQueue tc $ mkDbApply blk
354+
lengthDbEventQueue tc
355355

356356
setDbQueueLength metricsSetters newSize
357357

0 commit comments

Comments
 (0)