Skip to content

Commit b93c379

Browse files
facundominguezamesgen
authored andcommitted
Introduce a collection of chainsync handles that synchronizes a map and a queue
1 parent 5749666 commit b93c379

File tree

9 files changed

+146
-71
lines changed

9 files changed

+146
-71
lines changed

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
6161
import Ouroboros.Consensus.Mempool
6262
import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface
6363
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
64-
(ChainSyncClientHandle (..), ChainSyncState (..),
65-
viewChainSyncState)
64+
(ChainSyncClientHandle (..),
65+
ChainSyncClientHandleCollection (..), ChainSyncState (..),
66+
newChainSyncClientHandleCollection, viewChainSyncState)
6667
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck
6768
(SomeHeaderInFutureCheck)
6869
import Ouroboros.Consensus.Node.Genesis (GenesisNodeKernelArgs (..),
@@ -140,7 +141,7 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {
140141
, getGsmState :: STM m GSM.GsmState
141142

142143
-- | The kill handle and exposed state for each ChainSync client.
143-
, getChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
144+
, getChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk
144145

145146
-- | Read the current peer sharing registry, used for interacting with
146147
-- the PeerSharing protocol
@@ -242,7 +243,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
242243
<&> \wd (_headers, lst) ->
243244
GSM.getDurationUntilTooOld wd (getTipSlot lst)
244245
, GSM.equivalent = (==) `on` (AF.headPoint . fst)
245-
, GSM.getChainSyncStates = fmap cschState <$> readTVar varChainSyncHandles
246+
, GSM.getChainSyncStates = fmap cschState <$> cschcMap varChainSyncHandles
246247
, GSM.getCurrentSelection = do
247248
headers <- ChainDB.getCurrentChain chainDB
248249
extLedgerState <- ChainDB.getCurrentLedger chainDB
@@ -254,7 +255,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
254255
, GSM.writeGsmState = \gsmState ->
255256
atomicallyWithMonotonicTime $ \time -> do
256257
writeTVar varGsmState gsmState
257-
handles <- readTVar varChainSyncHandles
258+
handles <- cschcMap varChainSyncHandles
258259
traverse_ (($ time) . ($ gsmState) . cschOnGsmStateChanged) handles
259260
, GSM.isHaaSatisfied = do
260261
readTVar varOutboundConnectionsState <&> \case
@@ -289,7 +290,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
289290
chainDB
290291
(readTVar varGsmState)
291292
-- TODO GDD should only consider (big) ledger peers
292-
(readTVar varChainSyncHandles)
293+
(cschcMap varChainSyncHandles)
293294
varLoEFragment
294295

295296
void $ forkLinkedThread registry "NodeKernel.blockForging" $
@@ -345,7 +346,7 @@ data InternalState m addrNTN addrNTC blk = IS {
345346
, chainDB :: ChainDB m blk
346347
, blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m
347348
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m
348-
, varChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
349+
, varChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk
349350
, varGsmState :: StrictTVar m GSM.GsmState
350351
, mempool :: Mempool m blk
351352
, peerSharingRegistry :: PeerSharingRegistry addrNTN m
@@ -373,7 +374,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
373374
gsmMarkerFileView
374375
newTVarIO gsmState
375376

376-
varChainSyncHandles <- newTVarIO mempty
377+
varChainSyncHandles <- atomically newChainSyncClientHandleCollection
377378
mempool <- openMempool registry
378379
(chainDBLedgerInterface chainDB)
379380
(configLedger cfg)
@@ -384,7 +385,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
384385
fetchClientRegistry <- newFetchClientRegistry
385386

386387
let getCandidates :: STM m (Map (ConnectionId addrNTN) (AnchoredFragment (Header blk)))
387-
getCandidates = viewChainSyncState varChainSyncHandles csCandidate
388+
getCandidates = viewChainSyncState (cschcMap varChainSyncHandles) csCandidate
388389

389390
slotForgeTimeOracle <- BlockFetchClientInterface.initSlotForgeTimeOracle cfg chainDB
390391
let readFetchMode = BlockFetchClientInterface.readFetchModeDefault

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/CSJInvariants.hs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import Data.Typeable (Typeable)
1919
import Ouroboros.Consensus.Block (Point, StandardHash, castPoint)
2020
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State as CSState
2121
import Ouroboros.Consensus.Util.IOLike (Exception, MonadSTM (STM),
22-
MonadThrow (throwIO), StrictTVar, readTVar)
22+
MonadThrow (throwIO), readTVar)
2323
import Ouroboros.Consensus.Util.STM (Watcher (..))
2424

2525
--------------------------------------------------------------------------------
@@ -109,10 +109,10 @@ readAndView ::
109109
forall m peer blk.
110110
( MonadSTM m
111111
) =>
112-
StrictTVar m (Map peer (CSState.ChainSyncClientHandle m blk)) ->
112+
STM m (Map peer (CSState.ChainSyncClientHandle m blk)) ->
113113
STM m (View peer blk)
114-
readAndView handles =
115-
traverse (fmap idealiseState . readTVar . CSState.cschJumping) =<< readTVar handles
114+
readAndView readHandles =
115+
traverse (fmap idealiseState . readTVar . CSState.cschJumping) =<< readHandles
116116
where
117117
-- Idealise the state of a ChainSync peer with respect to ChainSync jumping.
118118
-- In particular, we get rid of non-comparable information such as the TVars
@@ -170,7 +170,7 @@ watcher ::
170170
Typeable blk,
171171
StandardHash blk
172172
) =>
173-
StrictTVar m (Map peer (CSState.ChainSyncClientHandle m blk)) ->
173+
STM m (Map peer (CSState.ChainSyncClientHandle m blk)) ->
174174
Watcher m (View peer blk) (View peer blk)
175175
watcher handles =
176176
Watcher

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/ChainSync.hs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,23 @@ module Test.Consensus.PeerSimulator.ChainSync (
1313
import Control.Exception (SomeException)
1414
import Control.Monad.Class.MonadTimer.SI (MonadTimer)
1515
import Control.Tracer (Tracer (Tracer), nullTracer, traceWith)
16-
import Data.Map.Strict (Map)
1716
import Data.Proxy (Proxy (..))
1817
import Network.TypedProtocol.Codec (AnyMessage)
1918
import Ouroboros.Consensus.Block (Header, Point)
2019
import Ouroboros.Consensus.Config (TopLevelConfig (..))
2120
import Ouroboros.Consensus.Ledger.SupportsProtocol
2221
(LedgerSupportsProtocol)
2322
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
24-
(CSJConfig (..), ChainDbView, ChainSyncClientHandle,
25-
ChainSyncLoPBucketConfig, ChainSyncStateView (..),
26-
Consensus, bracketChainSyncClient, chainSyncClient)
23+
(CSJConfig (..), ChainDbView,
24+
ChainSyncClientHandleCollection, ChainSyncLoPBucketConfig,
25+
ChainSyncStateView (..), Consensus, bracketChainSyncClient,
26+
chainSyncClient)
2727
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
2828
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck
2929
import Ouroboros.Consensus.Node.GsmState (GsmState (Syncing))
3030
import Ouroboros.Consensus.Util (ShowProxy)
3131
import Ouroboros.Consensus.Util.IOLike (Exception (fromException),
32-
IOLike, MonadCatch (try), StrictTVar)
32+
IOLike, MonadCatch (try))
3333
import Ouroboros.Network.Block (Tip)
3434
import Ouroboros.Network.Channel (Channel)
3535
import Ouroboros.Network.ControlMessage (ControlMessage (..))
@@ -124,7 +124,7 @@ runChainSyncClient ::
124124
-- ^ Configuration for ChainSync Jumping
125125
StateViewTracers blk m ->
126126
-- ^ Tracers used to record information for the future 'StateView'.
127-
StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) ->
127+
ChainSyncClientHandleCollection PeerId m blk ->
128128
-- ^ A TVar containing a map of states for each peer. This
129129
-- function will (via 'bracketChainSyncClient') register and de-register a
130130
-- TVar for the state of the peer.

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/NodeLifecycle.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import Data.Set (Set)
1919
import qualified Data.Set as Set
2020
import Ouroboros.Consensus.Block
2121
import Ouroboros.Consensus.Config (TopLevelConfig (..))
22+
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
23+
(ChainSyncClientHandleCollection (..))
2224
import Ouroboros.Consensus.Storage.ChainDB.API
2325
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
2426
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl as ChainDB
@@ -204,7 +206,7 @@ lifecycleStop resources LiveNode {lnStateViewTracers, lnCopyToImmDb, lnPeers} =
204206
releaseAll lrRegistry
205207
-- Reset the resources in TVars that were allocated by the simulator
206208
atomically $ do
207-
modifyTVar psrHandles (const mempty)
209+
cschcRemoveAllHandles psrHandles
208210
case lrLoEVar of
209211
LoEEnabled var -> modifyTVar var (const (AF.Empty AF.AnchorGenesis))
210212
LoEDisabled -> pure ()

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Resources.hs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import Data.Traversable (for)
2525
import Ouroboros.Consensus.Block (WithOrigin (Origin))
2626
import Ouroboros.Consensus.Block.Abstract (Header, Point (..))
2727
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
28-
(ChainSyncClientHandle)
28+
(ChainSyncClientHandleCollection,
29+
newChainSyncClientHandleCollection)
2930
import Ouroboros.Consensus.Util.IOLike (IOLike, MonadSTM (STM),
3031
StrictTVar, readTVar, uncheckedNewTVarM, writeTVar)
3132
import qualified Ouroboros.Network.AnchoredFragment as AF
@@ -115,7 +116,7 @@ data PeerSimulatorResources m blk =
115116

116117
-- | Handles to interact with the ChainSync client of each peer.
117118
-- See 'ChainSyncClientHandle' for more details.
118-
psrHandles :: StrictTVar m (Map PeerId (ChainSyncClientHandle m TestBlock))
119+
psrHandles :: ChainSyncClientHandleCollection PeerId m TestBlock
119120
}
120121

121122
-- | Create 'ChainSyncServerHandlers' for our default implementation using 'NodeState'.
@@ -233,5 +234,5 @@ makePeerSimulatorResources tracer blockTree peers = do
233234
resources <- for peers $ \ peerId -> do
234235
peerResources <- makePeerResources tracer blockTree peerId
235236
pure (peerId, peerResources)
236-
psrHandles <- uncheckedNewTVarM mempty
237+
psrHandles <- atomically newChainSyncClientHandleCollection
237238
pure PeerSimulatorResources {psrPeers = Map.fromList $ toList resources, psrHandles}

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
2626
(LedgerSupportsProtocol)
2727
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
2828
(CSJConfig (..), CSJEnabledConfig (..), ChainDbView,
29-
ChainSyncClientHandle, ChainSyncLoPBucketConfig (..),
29+
ChainSyncClientHandle,
30+
ChainSyncClientHandleCollection (..),
31+
ChainSyncLoPBucketConfig (..),
3032
ChainSyncLoPBucketEnabledConfig (..), viewChainSyncState)
3133
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
3234
import qualified Ouroboros.Consensus.Node.GsmState as GSM
@@ -147,7 +149,7 @@ startChainSyncConnectionThread ::
147149
ChainSyncLoPBucketConfig ->
148150
CSJConfig ->
149151
StateViewTracers blk m ->
150-
StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) ->
152+
ChainSyncClientHandleCollection PeerId m blk ->
151153
m (Thread m (), Thread m ())
152154
startChainSyncConnectionThread
153155
registry
@@ -230,7 +232,7 @@ smartDelay _ node duration = do
230232
dispatchTick :: forall m blk.
231233
IOLike m =>
232234
Tracer m (TraceSchedulerEvent blk) ->
233-
StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) ->
235+
STM m (Map PeerId (ChainSyncClientHandle m blk)) ->
234236
Map PeerId (PeerResources m blk) ->
235237
NodeLifecycle blk m ->
236238
LiveNode blk m ->
@@ -250,7 +252,7 @@ dispatchTick tracer varHandles peers lifecycle node (number, (duration, Peer pid
250252
traceNewTick = do
251253
currentChain <- atomically $ ChainDB.getCurrentChain (lnChainDb node)
252254
(csState, jumpingStates) <- atomically $ do
253-
m <- readTVar varHandles
255+
m <- varHandles
254256
csState <- traverse (readTVar . CSClient.cschState) (m Map.!? pid)
255257
jumpingStates <- forM (Map.toList m) $ \(peer, h) -> do
256258
st <- readTVar (CSClient.cschJumping h)
@@ -272,7 +274,7 @@ dispatchTick tracer varHandles peers lifecycle node (number, (duration, Peer pid
272274
runScheduler ::
273275
IOLike m =>
274276
Tracer m (TraceSchedulerEvent blk) ->
275-
StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) ->
277+
STM m (Map PeerId (ChainSyncClientHandle m blk)) ->
276278
PointSchedule blk ->
277279
Map PeerId (PeerResources m blk) ->
278280
NodeLifecycle blk m ->
@@ -314,7 +316,7 @@ mkStateTracer ::
314316
m (Tracer m ())
315317
mkStateTracer schedulerConfig GenesisTest {gtBlockTree} PeerSimulatorResources {psrHandles, psrPeers} chainDb
316318
| scTraceState schedulerConfig
317-
, let getCandidates = viewChainSyncState psrHandles CSClient.csCandidate
319+
, let getCandidates = viewChainSyncState (cschcMap psrHandles) CSClient.csCandidate
318320
getCurrentChain = ChainDB.getCurrentChain chainDb
319321
getPoints = traverse readTVar (srCurrentState . prShared <$> psrPeers)
320322
= peerSimStateDiagramSTMTracerDebug gtBlockTree getCurrentChain getCandidates getPoints
@@ -335,7 +337,7 @@ startNode ::
335337
startNode schedulerConfig genesisTest interval = do
336338
let
337339
handles = psrHandles lrPeerSim
338-
getCandidates = viewChainSyncState handles CSClient.csCandidate
340+
getCandidates = viewChainSyncState (cschcMap handles) CSClient.csCandidate
339341
fetchClientRegistry <- newFetchClientRegistry
340342
let chainDbView = CSClient.defaultChainDbView lnChainDb
341343
activePeers = Map.restrictKeys (psrPeers lrPeerSim) (lirActive liveResult)
@@ -384,10 +386,11 @@ startNode schedulerConfig genesisTest interval = do
384386
(mkGDDTracerTestBlock lrTracer)
385387
lnChainDb
386388
(pure GSM.Syncing) -- TODO actually run GSM
387-
(readTVar handles)
389+
(cschcMap handles)
388390
var
389391

390-
void $ forkLinkedWatcher lrRegistry "CSJ invariants watcher" $ CSJInvariants.watcher handles
392+
void $ forkLinkedWatcher lrRegistry "CSJ invariants watcher" $
393+
CSJInvariants.watcher (cschcMap handles)
391394
where
392395
LiveResources {lrRegistry, lrTracer, lrConfig, lrPeerSim, lrLoEVar} = resources
393396

@@ -483,7 +486,7 @@ runPointSchedule schedulerConfig genesisTest tracer0 =
483486
lifecycle <- nodeLifecycle schedulerConfig genesisTest tracer registry peerSim
484487
(chainDb, stateViewTracers) <- runScheduler
485488
(Tracer $ traceWith tracer . TraceSchedulerEvent)
486-
(psrHandles peerSim)
489+
(cschcMap (psrHandles peerSim))
487490
gtSchedule
488491
(psrPeers peerSim)
489492
lifecycle

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,12 @@ module Ouroboros.Consensus.MiniProtocol.ChainSync.Client (
6464
, TraceChainSyncClientEvent (..)
6565
-- * State shared with other components
6666
, ChainSyncClientHandle (..)
67+
, ChainSyncClientHandleCollection (..)
6768
, ChainSyncState (..)
6869
, ChainSyncStateView (..)
6970
, Jumping.noJumping
7071
, chainSyncStateFor
72+
, newChainSyncClientHandleCollection
7173
, noIdling
7274
, noLoPBucket
7375
, viewChainSyncState
@@ -229,11 +231,11 @@ newtype Our a = Our { unOur :: a }
229231
-- data from 'ChainSyncState'.
230232
viewChainSyncState ::
231233
IOLike m =>
232-
StrictTVar m (Map peer (ChainSyncClientHandle m blk)) ->
234+
STM m (Map peer (ChainSyncClientHandle m blk)) ->
233235
(ChainSyncState blk -> a) ->
234236
STM m (Map peer a)
235-
viewChainSyncState varHandles f =
236-
Map.map f <$> (traverse (readTVar . cschState) =<< readTVar varHandles)
237+
viewChainSyncState readHandles f =
238+
Map.map f <$> (traverse (readTVar . cschState) =<< readHandles)
237239

238240
-- | Convenience function for reading the 'ChainSyncState' for a single peer
239241
-- from a nested set of TVars.
@@ -327,7 +329,7 @@ bracketChainSyncClient ::
327329
)
328330
=> Tracer m (TraceChainSyncClientEvent blk)
329331
-> ChainDbView m blk
330-
-> StrictTVar m (Map peer (ChainSyncClientHandle m blk))
332+
-> ChainSyncClientHandleCollection peer m blk
331333
-- ^ The kill handle and states for each peer, we need the whole map because we
332334
-- (de)register nodes (@peer@).
333335
-> STM m GsmState
@@ -400,8 +402,8 @@ bracketChainSyncClient
400402
insertHandle = atomicallyWithMonotonicTime $ \time -> do
401403
initialGsmState <- getGsmState
402404
updateLopBucketConfig lopBucket initialGsmState time
403-
modifyTVar varHandles $ Map.insert peer handle
404-
deleteHandle = atomically $ modifyTVar varHandles $ Map.delete peer
405+
cschcAddHandle varHandles peer handle
406+
deleteHandle = atomically $ cschcRemoveHandle varHandles peer
405407
bracket_ insertHandle deleteHandle $ f Jumping.noJumping
406408

407409
withCSJCallbacks lopBucket csHandleState (CSJEnabled csjEnabledConfig) f =

0 commit comments

Comments
 (0)