@@ -26,6 +26,9 @@ module Cardano.DbSync.Api (
2626 getHasConsumedOrPruneTxOut ,
2727 getSkipTxIn ,
2828 getPrunes ,
29+ withDBSyncConnections ,
30+ withScriptConnection ,
31+ withDatumConnection ,
2932 mkSyncEnvFromConfig ,
3033 getInsertOptions ,
3134 getTrace ,
@@ -41,7 +44,7 @@ import qualified Cardano.Chain.Genesis as Byron
4144import Cardano.Crypto.ProtocolMagic (ProtocolMagicId (.. ))
4245import qualified Cardano.Db as DB
4346import Cardano.DbSync.Api.Types
44- import Cardano.DbSync.Cache.Types (CacheCapacity (.. ), newEmptyCache , newStakeChannels , useNoCache )
47+ import Cardano.DbSync.Cache.Types (CacheCapacity (.. ), newEmptyCache , newMAChannels , newStakeChannels , useNoCache )
4548import Cardano.DbSync.Config.Cardano
4649import Cardano.DbSync.Config.Shelley
4750import Cardano.DbSync.Config.Types
@@ -65,9 +68,12 @@ import Control.Concurrent.Class.MonadSTM.Strict (
6568 writeTVar ,
6669 )
6770import qualified Control.Concurrent.Class.MonadSTM.Strict.TBQueue as TBQ
71+ import Control.Concurrent.MVar
72+ import Control.Monad.Logger (LoggingT , MonadLoggerIO )
73+ import Control.Monad.Trans.Resource (MonadUnliftIO )
6874import qualified Data.Strict.Maybe as Strict
6975import Data.Time.Clock (getCurrentTime )
70- import Database.Persist.Postgresql (ConnectionString , createPostgresqlPool )
76+ import Database.Persist.Postgresql (ConnectionString , createPostgresqlPool , withPostgresqlConn )
7177import Database.Persist.Sql (SqlBackend )
7278import Ouroboros.Consensus.Block.Abstract (BlockProtocol )
7379import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (.. ))
@@ -248,7 +254,7 @@ writePrefetch syncEnv cblock = do
248254
249255mkSyncEnv ::
250256 Trace IO Text ->
251- SqlBackend ->
257+ DbConnections ->
252258 ConnectionString ->
253259 SyncOptions ->
254260 ProtocolInfo CardanoBlock ->
@@ -259,7 +265,7 @@ mkSyncEnv ::
259265 SyncNodeParams ->
260266 RunMigration ->
261267 IO SyncEnv
262- mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemStart syncNodeConfigFromFile syncNP runMigrationFnc = do
268+ mkSyncEnv trce backends connectionString syncOptions protoInfo nw nwMagic systemStart syncNodeConfigFromFile syncNP runMigrationFnc = do
263269 dbCNamesVar <- newTVarIO =<< dbConstraintNamesExists backend
264270 cache <-
265271 if soptCache syncOptions
@@ -278,8 +284,9 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
278284 indexesVar <- newTVarIO $ enpForceIndexes syncNP
279285 bts <- getBootstrapInProgress trce (isTxOutConsumedBootstrap' syncNodeConfigFromFile) backend
280286 bootstrapVar <- newTVarIO bts
281- -- Offline Pool + Anchor queues
282287 cChans <- newStakeChannels
288+ maChans <- newMAChannels
289+ -- Offline Pool + Anchor queues
283290 opwq <- newTBQueueIO 1000
284291 oprq <- newTBQueueIO 1000
285292 oawq <- newTBQueueIO 1000
@@ -309,7 +316,7 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
309316
310317 pure $
311318 SyncEnv
312- { envBackend = backend
319+ { envBackends = backends
313320 , envPool = pool
314321 , envBootstrap = bootstrapVar
315322 , envCache = cache
@@ -323,6 +330,7 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
323330 , envLedgerEnv = ledgerEnvType
324331 , envNetworkMagic = nwMagic
325332 , envStakeChans = cChans
333+ , envMAChans = maChans
326334 , envOffChainPoolResultQueue = oprq
327335 , envOffChainPoolWorkQueue = opwq
328336 , envOffChainVoteResultQueue = oarq
@@ -335,10 +343,47 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
335343 where
336344 hasLedger' = hasLedger . sioLedger . dncInsertOptions
337345 isTxOutConsumedBootstrap' = isTxOutConsumedBootstrap . sioTxOut . dncInsertOptions
346+ backend = mainBackend backends
347+
348+ withDBSyncConnections ::
349+ (MonadUnliftIO m , MonadLoggerIO m ) =>
350+ ConnectionString ->
351+ (DbConnections -> m a ) ->
352+ m a
353+ withDBSyncConnections connStr action =
354+ withPostgresqlConn connStr $ \ mainConn ->
355+ withPostgresqlConn connStr $ \ scriptConn ->
356+ withPostgresqlConn connStr $ \ datumConn -> do
357+ scr <- liftIO $ newMVar scriptConn
358+ dt <- liftIO $ newMVar datumConn
359+ action $ DbConnections mainConn scr dt
360+
361+ withScriptConnection ::
362+ SyncEnv ->
363+ ReaderT SqlBackend (LoggingT IO ) a ->
364+ IO a
365+ withScriptConnection = withGivenConnection scriptBackend
366+
367+ withDatumConnection ::
368+ SyncEnv ->
369+ ReaderT SqlBackend (LoggingT IO ) a ->
370+ IO a
371+ withDatumConnection = withGivenConnection datumBackend
372+
373+ withGivenConnection ::
374+ (DbConnections -> MVar SqlBackend ) ->
375+ SyncEnv ->
376+ ReaderT SqlBackend (LoggingT IO ) a ->
377+ IO a
378+ withGivenConnection toConn syncEnv action = do
379+ withMVar connVar $ \ conn ->
380+ DB. runDbLogging conn (getTrace syncEnv) action
381+ where
382+ connVar = toConn $ envBackends syncEnv
338383
339384mkSyncEnvFromConfig ::
340385 Trace IO Text ->
341- SqlBackend ->
386+ DbConnections ->
342387 ConnectionString ->
343388 SyncOptions ->
344389 GenesisConfig ->
@@ -347,7 +392,7 @@ mkSyncEnvFromConfig ::
347392 -- | run migration function
348393 RunMigration ->
349394 IO (Either SyncNodeError SyncEnv )
350- mkSyncEnvFromConfig trce backend connectionString syncOptions genCfg syncNodeConfigFromFile syncNodeParams runMigrationFnc =
395+ mkSyncEnvFromConfig trce backends connectionString syncOptions genCfg syncNodeConfigFromFile syncNodeParams runMigrationFnc =
351396 case genCfg of
352397 GenesisCardano _ bCfg sCfg _ _
353398 | unProtocolMagicId (Byron. configProtocolMagicId bCfg) /= Shelley. sgNetworkMagic (scConfig sCfg) ->
@@ -374,7 +419,7 @@ mkSyncEnvFromConfig trce backend connectionString syncOptions genCfg syncNodeCon
374419 Right
375420 <$> mkSyncEnv
376421 trce
377- backend
422+ backends
378423 connectionString
379424 syncOptions
380425 (fst $ mkProtocolInfoCardano genCfg [] )
0 commit comments