@@ -26,6 +26,9 @@ module Cardano.DbSync.Api (
26
26
getHasConsumedOrPruneTxOut ,
27
27
getSkipTxIn ,
28
28
getPrunes ,
29
+ withDBSyncConnections ,
30
+ withScriptConnection ,
31
+ withDatumConnection ,
29
32
mkSyncEnvFromConfig ,
30
33
getInsertOptions ,
31
34
getTrace ,
@@ -41,7 +44,7 @@ import qualified Cardano.Chain.Genesis as Byron
41
44
import Cardano.Crypto.ProtocolMagic (ProtocolMagicId (.. ))
42
45
import qualified Cardano.Db as DB
43
46
import Cardano.DbSync.Api.Types
44
- import Cardano.DbSync.Cache.Types (CacheCapacity (.. ), newEmptyCache , newStakeChannels , useNoCache )
47
+ import Cardano.DbSync.Cache.Types (CacheCapacity (.. ), newEmptyCache , newMAChannels , newStakeChannels , useNoCache )
45
48
import Cardano.DbSync.Config.Cardano
46
49
import Cardano.DbSync.Config.Shelley
47
50
import Cardano.DbSync.Config.Types
@@ -65,9 +68,12 @@ import Control.Concurrent.Class.MonadSTM.Strict (
65
68
writeTVar ,
66
69
)
67
70
import qualified Control.Concurrent.Class.MonadSTM.Strict.TBQueue as TBQ
71
+ import Control.Concurrent.MVar
72
+ import Control.Monad.Logger (LoggingT , MonadLoggerIO )
73
+ import Control.Monad.Trans.Resource (MonadUnliftIO )
68
74
import qualified Data.Strict.Maybe as Strict
69
75
import Data.Time.Clock (getCurrentTime )
70
- import Database.Persist.Postgresql (ConnectionString , createPostgresqlPool )
76
+ import Database.Persist.Postgresql (ConnectionString , createPostgresqlPool , withPostgresqlConn )
71
77
import Database.Persist.Sql (SqlBackend )
72
78
import Ouroboros.Consensus.Block.Abstract (BlockProtocol )
73
79
import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (.. ))
@@ -248,7 +254,7 @@ writePrefetch syncEnv cblock = do
248
254
249
255
mkSyncEnv ::
250
256
Trace IO Text ->
251
- SqlBackend ->
257
+ DbConnections ->
252
258
ConnectionString ->
253
259
SyncOptions ->
254
260
ProtocolInfo CardanoBlock ->
@@ -259,7 +265,7 @@ mkSyncEnv ::
259
265
SyncNodeParams ->
260
266
RunMigration ->
261
267
IO SyncEnv
262
- mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemStart syncNodeConfigFromFile syncNP runMigrationFnc = do
268
+ mkSyncEnv trce backends connectionString syncOptions protoInfo nw nwMagic systemStart syncNodeConfigFromFile syncNP runMigrationFnc = do
263
269
dbCNamesVar <- newTVarIO =<< dbConstraintNamesExists backend
264
270
cache <-
265
271
if soptCache syncOptions
@@ -278,8 +284,9 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
278
284
indexesVar <- newTVarIO $ enpForceIndexes syncNP
279
285
bts <- getBootstrapInProgress trce (isTxOutConsumedBootstrap' syncNodeConfigFromFile) backend
280
286
bootstrapVar <- newTVarIO bts
281
- -- Offline Pool + Anchor queues
282
287
cChans <- newStakeChannels
288
+ maChans <- newMAChannels
289
+ -- Offline Pool + Anchor queues
283
290
opwq <- newTBQueueIO 1000
284
291
oprq <- newTBQueueIO 1000
285
292
oawq <- newTBQueueIO 1000
@@ -309,7 +316,7 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
309
316
310
317
pure $
311
318
SyncEnv
312
- { envBackend = backend
319
+ { envBackends = backends
313
320
, envPool = pool
314
321
, envBootstrap = bootstrapVar
315
322
, envCache = cache
@@ -323,6 +330,7 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
323
330
, envLedgerEnv = ledgerEnvType
324
331
, envNetworkMagic = nwMagic
325
332
, envStakeChans = cChans
333
+ , envMAChans = maChans
326
334
, envOffChainPoolResultQueue = oprq
327
335
, envOffChainPoolWorkQueue = opwq
328
336
, envOffChainVoteResultQueue = oarq
@@ -335,10 +343,47 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
335
343
where
336
344
hasLedger' = hasLedger . sioLedger . dncInsertOptions
337
345
isTxOutConsumedBootstrap' = isTxOutConsumedBootstrap . sioTxOut . dncInsertOptions
346
+ backend = mainBackend backends
347
+
348
+ withDBSyncConnections ::
349
+ (MonadUnliftIO m , MonadLoggerIO m ) =>
350
+ ConnectionString ->
351
+ (DbConnections -> m a ) ->
352
+ m a
353
+ withDBSyncConnections connStr action =
354
+ withPostgresqlConn connStr $ \ mainConn ->
355
+ withPostgresqlConn connStr $ \ scriptConn ->
356
+ withPostgresqlConn connStr $ \ datumConn -> do
357
+ scr <- liftIO $ newMVar scriptConn
358
+ dt <- liftIO $ newMVar datumConn
359
+ action $ DbConnections mainConn scr dt
360
+
361
+ withScriptConnection ::
362
+ SyncEnv ->
363
+ ReaderT SqlBackend (LoggingT IO ) a ->
364
+ IO a
365
+ withScriptConnection = withGivenConnection scriptBackend
366
+
367
+ withDatumConnection ::
368
+ SyncEnv ->
369
+ ReaderT SqlBackend (LoggingT IO ) a ->
370
+ IO a
371
+ withDatumConnection = withGivenConnection datumBackend
372
+
373
+ withGivenConnection ::
374
+ (DbConnections -> MVar SqlBackend ) ->
375
+ SyncEnv ->
376
+ ReaderT SqlBackend (LoggingT IO ) a ->
377
+ IO a
378
+ withGivenConnection toConn syncEnv action = do
379
+ withMVar connVar $ \ conn ->
380
+ DB. runDbLogging conn (getTrace syncEnv) action
381
+ where
382
+ connVar = toConn $ envBackends syncEnv
338
383
339
384
mkSyncEnvFromConfig ::
340
385
Trace IO Text ->
341
- SqlBackend ->
386
+ DbConnections ->
342
387
ConnectionString ->
343
388
SyncOptions ->
344
389
GenesisConfig ->
@@ -347,7 +392,7 @@ mkSyncEnvFromConfig ::
347
392
-- | run migration function
348
393
RunMigration ->
349
394
IO (Either SyncNodeError SyncEnv )
350
- mkSyncEnvFromConfig trce backend connectionString syncOptions genCfg syncNodeConfigFromFile syncNodeParams runMigrationFnc =
395
+ mkSyncEnvFromConfig trce backends connectionString syncOptions genCfg syncNodeConfigFromFile syncNodeParams runMigrationFnc =
351
396
case genCfg of
352
397
GenesisCardano _ bCfg sCfg _ _
353
398
| unProtocolMagicId (Byron. configProtocolMagicId bCfg) /= Shelley. sgNetworkMagic (scConfig sCfg) ->
@@ -374,7 +419,7 @@ mkSyncEnvFromConfig trce backend connectionString syncOptions genCfg syncNodeCon
374
419
Right
375
420
<$> mkSyncEnv
376
421
trce
377
- backend
422
+ backends
378
423
connectionString
379
424
syncOptions
380
425
(fst $ mkProtocolInfoCardano genCfg [] )
0 commit comments