2
2
{-# LANGUAGE FlexibleContexts #-}
3
3
{-# LANGUAGE GADTs #-}
4
4
{-# LANGUAGE LambdaCase #-}
5
+ {-# LANGUAGE NamedFieldPuns #-}
5
6
{-# LANGUAGE RankNTypes #-}
6
- {-# LANGUAGE RecordWildCards #-}
7
7
{-# LANGUAGE ScopedTypeVariables #-}
8
8
{-# LANGUAGE TypeApplications #-}
9
9
-- | Tests for the chain sync client.
@@ -28,8 +28,7 @@ import Cardano.Crypto.DSIGN.Mock
28
28
import Control.Monad (unless , void , when )
29
29
import Control.Monad.Class.MonadThrow (Handler (.. ), catches )
30
30
import Control.Monad.IOSim (runSimOrThrow )
31
- import Control.Tracer (contramap , nullTracer )
32
- import Data.Bifunctor (first )
31
+ import Control.Tracer (contramap , contramapM , nullTracer )
33
32
import Data.List (intercalate )
34
33
import qualified Data.Map.Strict as Map
35
34
import Data.Maybe (isJust )
@@ -47,6 +46,7 @@ import qualified Ouroboros.Consensus.HeaderStateHistory as HeaderStateHistory
47
46
import Ouroboros.Consensus.Ledger.Abstract
48
47
import Ouroboros.Consensus.Ledger.Extended hiding (ledgerState )
49
48
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
49
+ import Ouroboros.Consensus.Node.NetworkProtocolVersion (NodeToNodeVersion )
50
50
import Ouroboros.Consensus.Node.ProtocolInfo
51
51
import Ouroboros.Consensus.NodeId
52
52
import Ouroboros.Consensus.Protocol.BFT
@@ -57,7 +57,7 @@ import Ouroboros.Consensus.Util.Condense
57
57
import Ouroboros.Consensus.Util.IOLike
58
58
import Ouroboros.Consensus.Util.ResourceRegistry
59
59
import Ouroboros.Consensus.Util.STM (Fingerprint (.. ),
60
- WithFingerprint (.. ), forkLinkedWatcher )
60
+ WithFingerprint (.. ))
61
61
import Ouroboros.Network.AnchoredFragment (AnchoredFragment )
62
62
import qualified Ouroboros.Network.AnchoredFragment as AF
63
63
import Ouroboros.Network.Block (getTipPoint )
@@ -79,8 +79,7 @@ import Test.Tasty
79
79
import Test.Tasty.QuickCheck
80
80
import Test.Util.ChainUpdates (ChainUpdate (.. ), UpdateBehavior (.. ),
81
81
genChainUpdates , toChainUpdates )
82
- import qualified Test.Util.LogicalClock as LogicalClock
83
- import Test.Util.LogicalClock (NumTicks (.. ), Tick (.. ))
82
+ import Test.Util.LogicalClock (Tick (.. ))
84
83
import Test.Util.Orphans.Arbitrary ()
85
84
import Test.Util.Orphans.IOLike ()
86
85
import Test.Util.Schedule (Schedule (.. ), genSchedule , joinSchedule ,
@@ -103,7 +102,13 @@ tests = testGroup "ChainSyncClient"
103
102
-------------------------------------------------------------------------------}
104
103
105
104
prop_chainSync :: ChainSyncClientSetup -> Property
106
- prop_chainSync ChainSyncClientSetup {.. } =
105
+ prop_chainSync ChainSyncClientSetup {
106
+ securityParam
107
+ , clientUpdates
108
+ , serverUpdates
109
+ , startTick
110
+ , invalidBlocks
111
+ } =
107
112
counterexample
108
113
(" Client chain: " <> ppChain finalClientChain <> " \n " <>
109
114
" Server chain: " <> ppChain finalServerChain <> " \n " <>
@@ -140,8 +145,19 @@ prop_chainSync ChainSyncClientSetup {..} =
140
145
where
141
146
k = maxRollbacks securityParam
142
147
143
- ChainSyncOutcome {.. } = runSimOrThrow $
144
- runChainSync securityParam clientUpdates serverUpdates invalidBlocks startTick
148
+ ChainSyncOutcome {
149
+ finalClientChain
150
+ , finalServerChain
151
+ , mbResult
152
+ , syncedFragment
153
+ , traceEvents
154
+ } = runSimOrThrow $
155
+ runChainSync
156
+ securityParam
157
+ clientUpdates
158
+ serverUpdates
159
+ invalidBlocks
160
+ startTick
145
161
146
162
clientFragment = AF. anchorNewest k $ Chain. toAnchoredFragment finalClientChain
147
163
@@ -185,6 +201,10 @@ isSuffixOf fragment chain =
185
201
serverId :: CoreNodeId
186
202
serverId = CoreNodeId 1
187
203
204
+ -- | The schedule that determines the evolution of the local chain.
205
+ --
206
+ -- Note that the 'TestBlock' used in this test is constructed in such a way
207
+ -- that the block's slot number equals its block number.
188
208
newtype ClientUpdates =
189
209
ClientUpdates { getClientUpdates :: Schedule ChainUpdate }
190
210
deriving (Show )
@@ -248,7 +268,7 @@ runChainSync securityParam (ClientUpdates clientUpdates)
248
268
(ServerUpdates serverUpdates) (InvalidBlocks invalidBlocks)
249
269
startSyncingAt = withRegistry $ \ registry -> do
250
270
251
- clock <- LogicalClock. new registry numTicks
271
+ varCurrentLogicalTick <- uncheckedNewTVarM ( Tick 0 )
252
272
253
273
-- Set up the client
254
274
varCandidates <- uncheckedNewTVarM Map. empty
@@ -261,7 +281,12 @@ runChainSync securityParam (ClientUpdates clientUpdates)
261
281
-- at the final state of each candidate.
262
282
varFinalCandidates <- uncheckedNewTVarM Map. empty
263
283
264
- (tracer, getTrace) <- first (LogicalClock. tickTracer clock) <$> recordingTracerTVar
284
+ (tracer, getTrace) <- do
285
+ (tracer', getTrace) <- recordingTracerTVar
286
+ let foo ev = do
287
+ now <- readTVarIO varCurrentLogicalTick
288
+ pure (now, ev)
289
+ pure (contramapM foo tracer', getTrace)
265
290
let chainSyncTracer = contramap Left tracer
266
291
protocolTracer = contramap Right tracer
267
292
@@ -302,7 +327,7 @@ runChainSync securityParam (ClientUpdates clientUpdates)
302
327
chainSyncTracer
303
328
nodeCfg
304
329
chainDbView
305
- maxBound
330
+ ( maxBound :: NodeToNodeVersion )
306
331
(return Continue )
307
332
nullTracer
308
333
@@ -312,99 +337,98 @@ runChainSync securityParam (ClientUpdates clientUpdates)
312
337
(Tip TestBlock ) m ()
313
338
server = chainSyncServerExample () varChainProducerState getHeader
314
339
315
- -- Schedule updates of the client and server chains
316
- varLastUpdate <- uncheckedNewTVarM 0
317
- let forkLinkedTickWatcher :: (Tick -> m () ) -> m ()
318
- forkLinkedTickWatcher =
319
- void
320
- . forkLinkedWatcher registry " scheduled updates"
321
- . LogicalClock. tickWatcher clock
322
- forkLinkedTickWatcher $ \ tick -> do
323
- -- Stop updating the client and server chains when the chain sync client
324
- -- has thrown an exception or has gracefully terminated, so that at the
325
- -- end, we can read the chains in the states they were in when the
326
- -- exception was thrown.
327
- stop <- fmap isJust $ atomically $ readTVar varClientResult
328
- unless stop $ do
329
- -- Newly discovered invalid blocks
330
- whenJust (Map. lookup tick (getSchedule invalidBlocks)) $
331
- atomically . modifyTVar varKnownInvalid . Set. union . Set. fromList
332
-
333
- -- Client
334
- whenJust (Map. lookup tick (getSchedule clientUpdates)) $ \ chainUpdates ->
335
- atomically $ modifyTVar varClientState $ updateClientState chainUpdates
336
-
337
- -- Server
338
- whenJust (Map. lookup tick (getSchedule serverUpdates)) $ \ chainUpdates ->
339
- atomically $ do
340
- chainProducerState <- readTVar varChainProducerState
341
- case CPS. applyChainUpdates
342
- (toChainUpdates chainUpdates)
343
- chainProducerState of
344
- Just chainProducerState' ->
345
- writeTVar varChainProducerState chainProducerState'
346
- Nothing ->
347
- error $ " Invalid chainUpdates: " <> show chainUpdates <>
348
- " for " <> show (chainState chainProducerState)
349
- atomically $ writeTVar varLastUpdate tick
340
+ -- Do scheduled updates of the client and server chains
341
+ let updateChainsDuringTick :: Tick -> m ()
342
+ updateChainsDuringTick tick = do
343
+ -- Stop updating the client and server chains when the chain sync client
344
+ -- has thrown an exception or has gracefully terminated, so that at the
345
+ -- end, we can read the chains in the states they were in when the
346
+ -- exception was thrown.
347
+ stop <- fmap isJust $ atomically $ readTVar varClientResult
348
+ unless stop $ do
349
+ -- Newly discovered invalid blocks
350
+ whenJust (Map. lookup tick (getSchedule invalidBlocks)) $
351
+ atomically . modifyTVar varKnownInvalid . Set. union . Set. fromList
352
+
353
+ -- Client
354
+ doTick clientUpdates tick $ \ chainUpdates ->
355
+ atomically $ modifyTVar varClientState $ updateClientState chainUpdates
356
+
357
+ -- Server
358
+ doTick serverUpdates tick $ \ chainUpdates ->
359
+ atomically $ do
360
+ chainProducerState <- readTVar varChainProducerState
361
+ case CPS. applyChainUpdates
362
+ (toChainUpdates chainUpdates)
363
+ chainProducerState of
364
+ Just chainProducerState' ->
365
+ writeTVar varChainProducerState chainProducerState'
366
+ Nothing ->
367
+ error $ " Invalid chainUpdates: " <> show chainUpdates <>
368
+ " for " <> show (chainState chainProducerState)
350
369
351
370
-- Connect client to server and run the chain sync protocol
352
- LogicalClock. onTick registry clock " startSyncing" startSyncingAt $ do
353
- -- When updates are planned at the same time that we start syncing, we
354
- -- wait until these updates are done before we start syncing.
355
- when (Map. member startSyncingAt (getSchedule clientUpdates) ||
356
- Map. member startSyncingAt (getSchedule serverUpdates)) $
357
- atomically $ do
358
- lastUpdate <- readTVar varLastUpdate
359
- check (lastUpdate == startSyncingAt)
360
-
361
- (clientChannel, serverChannel) <- createConnectedChannels
362
- -- Don't link the thread (which will cause the exception to be rethrown
363
- -- in the main thread), just catch the exception and store it, because
364
- -- we want a "regular ending".
365
- void $ forkThread registry " ChainSyncClient" $
366
- bracketChainSyncClient
367
- chainSyncTracer
368
- chainDbView
369
- varCandidates
370
- serverId
371
- maxBound $ \ varCandidate -> do
372
- atomically $ modifyTVar varFinalCandidates $
373
- Map. insert serverId varCandidate
374
- result <-
375
- runPipelinedPeer protocolTracer codecChainSyncId clientChannel $
376
- chainSyncClientPeerPipelined $ client varCandidate
377
- atomically $ writeTVar varClientResult (Just (Right result))
378
- return ()
379
- `catchAlsoLinked` \ ex -> do
380
- atomically $ writeTVar varClientResult (Just (Left ex))
381
- -- Rethrow, but it will be ignored anyway.
382
- throwIO ex
383
- void $ forkLinkedThread registry " ChainSyncServer" $
384
- runPeer nullTracer codecChainSyncId serverChannel
385
- (chainSyncServerPeer server)
386
-
387
- LogicalClock. waitUntilDone clock
388
- -- Wait a random amount of time after the final tick for the chain sync
389
- -- to finish
390
- threadDelay 2000
371
+ --
372
+ -- Happens /immediately after/ the chain and clock effects schedule for
373
+ -- 'startSyncingAt'.
374
+ let initiateChainSync = do
375
+ (clientChannel, serverChannel) <- createConnectedChannels
376
+ -- Don't link the thread (which will cause the exception to be
377
+ -- rethrown in the main thread), just catch the exception and store
378
+ -- it, because we want a "regular ending".
379
+ void $ forkThread registry " ChainSyncClient" $
380
+ bracketChainSyncClient
381
+ chainSyncTracer
382
+ chainDbView
383
+ varCandidates
384
+ serverId
385
+ maxBound $ \ varCandidate -> do
386
+ atomically $ modifyTVar varFinalCandidates $
387
+ Map. insert serverId varCandidate
388
+ result <-
389
+ runPipelinedPeer protocolTracer codecChainSyncId clientChannel $
390
+ chainSyncClientPeerPipelined $ client varCandidate
391
+ atomically $ writeTVar varClientResult (Just (Right result))
392
+ return ()
393
+ `catchAlsoLinked` \ ex -> do
394
+ atomically $ writeTVar varClientResult (Just (Left ex))
395
+ -- Rethrow, but it will be ignored anyway.
396
+ throwIO ex
397
+ void $ forkLinkedThread registry " ChainSyncServer" $
398
+ runPeer nullTracer codecChainSyncId serverChannel
399
+ (chainSyncServerPeer server)
400
+
401
+ do
402
+ let loop tick = do
403
+ updateChainsDuringTick tick
404
+ when (tick == startSyncingAt) $ initiateChainSync
405
+ when (tick < finalTick) $ loop (tick + 1 )
406
+ loop (Tick 1 )
407
+
408
+ -- This delay seems enough to let all threads finish their final work.
409
+ --
410
+ -- TODO what is the necessary threshold?
411
+ threadDelay 86400
391
412
392
413
traceEvents <- getTrace
393
414
-- Collect the return values
394
415
atomically $ do
395
- finalClientChain <- readTVar varClientState
396
- finalServerChain <- chainState <$> readTVar varChainProducerState
416
+ finalClientChain <- readTVar varClientState
417
+ finalServerChain <- chainState <$> readTVar varChainProducerState
397
418
candidateFragment <- readTVar varFinalCandidates >>= readTVar . (Map. ! serverId)
398
- mbResult <- readTVar varClientResult
419
+ mbResult <- readTVar varClientResult
399
420
return ChainSyncOutcome {
400
- syncedFragment = AF. mapAnchoredFragment testHeader candidateFragment
401
- , ..
421
+ finalClientChain
422
+ , finalServerChain
423
+ , mbResult
424
+ , syncedFragment = AF. mapAnchoredFragment testHeader candidateFragment
425
+ , traceEvents
402
426
}
403
427
where
404
428
k = maxRollbacks securityParam
405
429
406
- slotLength :: SlotLength
407
- slotLength = slotLengthFromSec 20
430
+ doTick :: Schedule a -> Tick -> ([ a ] -> m () ) -> m ()
431
+ doTick sched tick kont = whenJust ( Map. lookup tick (getSchedule sched)) kont
408
432
409
433
nodeCfg :: TopLevelConfig TestBlock
410
434
nodeCfg = TopLevelConfig {
@@ -431,8 +455,8 @@ runChainSync securityParam (ClientUpdates clientUpdates)
431
455
numCoreNodes :: NumCoreNodes
432
456
numCoreNodes = NumCoreNodes 2
433
457
434
- numTicks :: NumTicks
435
- numTicks = LogicalClock. sufficientTimeFor
458
+ finalTick :: Tick
459
+ finalTick = maximum
436
460
[ lastTick clientUpdates
437
461
, lastTick serverUpdates
438
462
, startSyncingAt
@@ -502,6 +526,12 @@ computeHeaderStateHistory cfg =
502
526
ChainSyncClientSetup
503
527
-------------------------------------------------------------------------------}
504
528
529
+ slotLength :: SlotLength
530
+ slotLength = slotLengthFromSec $ toEnum slotLengthInSeconds
531
+
532
+ slotLengthInSeconds :: Int
533
+ slotLengthInSeconds = 20
534
+
505
535
-- | Bundle dependent arguments for test generation
506
536
data ChainSyncClientSetup = ChainSyncClientSetup
507
537
{ securityParam :: SecurityParam
@@ -535,8 +565,19 @@ instance Arbitrary ChainSyncClientSetup where
535
565
, tbValid b == Invalid
536
566
]
537
567
invalidBlocks <- InvalidBlocks <$> (genSchedule =<< shuffle trapBlocks)
538
- return ChainSyncClientSetup {.. }
539
- shrink cscs@ ChainSyncClientSetup {.. } =
568
+
569
+ return ChainSyncClientSetup {
570
+ securityParam
571
+ , clientUpdates
572
+ , serverUpdates
573
+ , startTick
574
+ , invalidBlocks
575
+ }
576
+ shrink cscs@ ChainSyncClientSetup {
577
+ clientUpdates
578
+ , serverUpdates
579
+ , startTick
580
+ } =
540
581
-- We don't shrink 'securityParam' because the updates depend on it
541
582
542
583
-- We also don't shrink 'invalidBlocks' right now (as it does not impact
@@ -573,7 +614,13 @@ instance Arbitrary ChainSyncClientSetup where
573
614
]
574
615
575
616
instance Show ChainSyncClientSetup where
576
- show ChainSyncClientSetup {.. } = unlines
617
+ show ChainSyncClientSetup {
618
+ securityParam
619
+ , clientUpdates
620
+ , serverUpdates
621
+ , startTick
622
+ , invalidBlocks
623
+ } = unlines
577
624
[ " ChainSyncClientSetup:"
578
625
, " securityParam: " <> show (maxRollbacks securityParam)
579
626
, " clientUpdates:"
0 commit comments