From 6986095f810c11328b3808bc7a06500eb0f8d3e6 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Tue, 26 Mar 2024 17:17:14 -0700 Subject: [PATCH 1/8] eth/downloader: lift broken test-case from post-merge cleanup PR. --- eth/downloader/downloader_test.go | 94 +++++++++++++++++++++++++++++++ eth/downloader/testchain_test.go | 7 ++- 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 0fdc7ead9c3..9d9e2886dca 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1311,3 +1311,97 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) { }) } } + +// Tests that synchronisation progress (origin block number and highest block +// number) is tracked and updated correctly in case of manual head reversion +func TestBeaconForkedSyncProgress68Full(t *testing.T) { + testBeaconForkedSyncProgress(t, eth.ETH68, FullSync) +} +func TestBeaconForkedSyncProgress68Snap(t *testing.T) { + testBeaconForkedSyncProgress(t, eth.ETH68, SnapSync) +} +func TestBeaconForkedSyncProgress68Light(t *testing.T) { + testBeaconForkedSyncProgress(t, eth.ETH68, LightSync) +} + +func testBeaconForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { + success := make(chan struct{}) + tester := newTesterWithNotification(t, func() { + success <- struct{}{} + }) + defer tester.terminate() + + chainA := testChainForkLightA.shorten(len(testChainBase.blocks) + MaxHeaderFetch) + chainB := testChainForkHigher.shorten(len(testChainBase.blocks) + MaxHeaderFetch) + + // Set a sync init hook to catch progress changes + starting := make(chan struct{}) + progress := make(chan struct{}) + + tester.downloader.syncInitHook = func(origin, latest uint64) { + fmt.Println("sync init") + starting <- struct{}{} + <-progress + } + checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) + + // Synchronise with one of the forks and check progress + tester.newPeer("fork A", protocol, chainA.blocks[1:]) + pending := new(sync.WaitGroup) + pending.Add(1) + go func() { + defer pending.Done() + if err := tester.downloader.BeaconSync(mode, chainA.blocks[len(chainA.blocks)-1].Header(), nil); err != nil { + panic(fmt.Sprintf("failed to beacon sync: %v", err)) + } + }() + + <-starting + progress <- struct{}{} + select { + case <-success: + checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ + HighestBlock: uint64(len(chainA.blocks) - 1), + CurrentBlock: uint64(len(chainA.blocks) - 1), + }) + case <-time.NewTimer(time.Second * 3).C: + t.Fatalf("Failed to sync chain in three seconds") + } + + fmt.Println("jump in") + log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stdout, log.LevelInfo, false))) + // Set the head to a second fork (which forks after the origin of the last sync cycle + tester.newPeer("fork B", protocol, chainB.blocks[1:]) + pending.Add(1) + go func() { + defer pending.Done() + if err := tester.downloader.BeaconSync(mode, chainB.blocks[len(chainB.blocks)-1].Header(), nil); err != nil { + panic(fmt.Sprintf("failed to beacon sync: %v", err)) + } + }() + + select { + case <-success: + checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ + HighestBlock: uint64(len(chainB.blocks) - 1), + CurrentBlock: uint64(len(chainB.blocks) - 1), + // TODO: check that origin block is at the start of the fork (not 0) + }) + case <-time.NewTimer(time.Second * 3).C: + t.Fatalf("Failed to sync chain in three seconds") + } + checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{ + StartingBlock: uint64(len(testChainBase.blocks)) - 1, + CurrentBlock: uint64(len(chainA.blocks) - 1), + HighestBlock: uint64(len(chainB.blocks) - 1), + }) + + // Check final progress after successful sync + progress <- struct{}{} + pending.Wait() + checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ + StartingBlock: uint64(len(testChainBase.blocks)) - 1, + CurrentBlock: uint64(len(chainB.blocks) - 1), + HighestBlock: uint64(len(chainB.blocks) - 1), + }) +} diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go index 46f3febd8ba..d9582bcd638 100644 --- a/eth/downloader/testchain_test.go +++ b/eth/downloader/testchain_test.go @@ -53,6 +53,9 @@ var testChainBase *testChain // Different forks on top of the base chain: var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain +// Fork from base chain, fork occurs 10 blocks later than other forks +var testChainForkHigher *testChain + var pregenerated bool func init() { @@ -69,10 +72,11 @@ func init() { var wg sync.WaitGroup // Generate the test chains to seed the peers with - wg.Add(3) + wg.Add(4) go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }() go func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2); wg.Done() }() go func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3); wg.Done() }() + go func() { testChainForkHigher = testChainBase.makeFork(forkLen-10, false, 3); wg.Done() }() wg.Wait() // Generate the test peers used by the tests to avoid overloading during testing. @@ -103,6 +107,7 @@ func init() { testChainForkLightA.shorten(len(testChainBase.blocks) + MaxHeaderFetch), testChainForkLightB.shorten(len(testChainBase.blocks) + MaxHeaderFetch), testChainForkHeavy.shorten(len(testChainBase.blocks) + 79), + testChainForkHigher.shorten(len(testChainBase.blocks) + MaxHeaderFetch), } wg.Add(len(chains)) for _, chain := range chains { From 1a4887bd43efd09f663a371a00365452e600c3cb Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Tue, 26 Mar 2024 17:40:21 -0700 Subject: [PATCH 2/8] fix --- eth/downloader/downloader_test.go | 4 +--- eth/downloader/testchain_test.go | 7 +------ 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 9d9e2886dca..5f39f4ee497 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1332,14 +1332,13 @@ func testBeaconForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { defer tester.terminate() chainA := testChainForkLightA.shorten(len(testChainBase.blocks) + MaxHeaderFetch) - chainB := testChainForkHigher.shorten(len(testChainBase.blocks) + MaxHeaderFetch) + chainB := testChainForkLightB.shorten(len(testChainBase.blocks) + MaxHeaderFetch) // Set a sync init hook to catch progress changes starting := make(chan struct{}) progress := make(chan struct{}) tester.downloader.syncInitHook = func(origin, latest uint64) { - fmt.Println("sync init") starting <- struct{}{} <-progress } @@ -1368,7 +1367,6 @@ func testBeaconForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { t.Fatalf("Failed to sync chain in three seconds") } - fmt.Println("jump in") log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stdout, log.LevelInfo, false))) // Set the head to a second fork (which forks after the origin of the last sync cycle tester.newPeer("fork B", protocol, chainB.blocks[1:]) diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go index d9582bcd638..46f3febd8ba 100644 --- a/eth/downloader/testchain_test.go +++ b/eth/downloader/testchain_test.go @@ -53,9 +53,6 @@ var testChainBase *testChain // Different forks on top of the base chain: var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain -// Fork from base chain, fork occurs 10 blocks later than other forks -var testChainForkHigher *testChain - var pregenerated bool func init() { @@ -72,11 +69,10 @@ func init() { var wg sync.WaitGroup // Generate the test chains to seed the peers with - wg.Add(4) + wg.Add(3) go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }() go func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2); wg.Done() }() go func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3); wg.Done() }() - go func() { testChainForkHigher = testChainBase.makeFork(forkLen-10, false, 3); wg.Done() }() wg.Wait() // Generate the test peers used by the tests to avoid overloading during testing. @@ -107,7 +103,6 @@ func init() { testChainForkLightA.shorten(len(testChainBase.blocks) + MaxHeaderFetch), testChainForkLightB.shorten(len(testChainBase.blocks) + MaxHeaderFetch), testChainForkHeavy.shorten(len(testChainBase.blocks) + 79), - testChainForkHigher.shorten(len(testChainBase.blocks) + MaxHeaderFetch), } wg.Add(len(chains)) for _, chain := range chains { From c7942ce10d68ba0bfb514133b2e1e4403590880d Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Tue, 26 Mar 2024 17:41:33 -0700 Subject: [PATCH 3/8] fix --- eth/downloader/downloader_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 5f39f4ee497..f311598270f 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1367,8 +1367,7 @@ func testBeaconForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { t.Fatalf("Failed to sync chain in three seconds") } - log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stdout, log.LevelInfo, false))) - // Set the head to a second fork (which forks after the origin of the last sync cycle + // Set the head to a second fork tester.newPeer("fork B", protocol, chainB.blocks[1:]) pending.Add(1) go func() { From 16ef4825b519e0008cbdb7a9e0733dde23e527cf Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Tue, 26 Mar 2024 17:58:27 -0700 Subject: [PATCH 4/8] verbose logging + log statements --- eth/downloader/downloader.go | 1 + eth/downloader/downloader_test.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 941f575aa89..4633cf3d5b2 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -498,6 +498,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * if number < oldest.Number.Uint64() { count := int(oldest.Number.Uint64() - number) // it's capped by fsMinFullBlocks headers := d.readHeaderRange(oldest, count) + fmt.Printf("readHeaderRange, oldest=%x, oldest num=%d, count=%d\n", oldest.Hash(), oldest.Number, count) if len(headers) == count { pivot = headers[len(headers)-1] log.Warn("Retrieved pivot header from local", "number", pivot.Number, "hash", pivot.Hash(), "latest", latest.Number, "oldest", oldest.Number) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index f311598270f..2e849fe98e9 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1355,6 +1355,7 @@ func testBeaconForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { } }() + log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stdout, log.LevelInfo, false))) <-starting progress <- struct{}{} select { @@ -1367,6 +1368,7 @@ func testBeaconForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { t.Fatalf("Failed to sync chain in three seconds") } + fmt.Println("syncing to fork B") // Set the head to a second fork tester.newPeer("fork B", protocol, chainB.blocks[1:]) pending.Add(1) From 8f61ee01393289e69f0abaf40e7478edb8d29e57 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Sun, 7 Apr 2024 11:46:14 -0700 Subject: [PATCH 5/8] after skeleton linked, if the skeleton and snap/full chain have forked, rewind the chain to the shared ancestor before restarting state/block backfilling. --- eth/downloader/downloader.go | 2 +- eth/downloader/skeleton.go | 48 ++++++++++++++++++++++++++++++--- eth/downloader/skeleton_test.go | 6 ++--- 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 4633cf3d5b2..ecaa0377699 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -235,7 +235,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchai syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(), } // Create the post-merge skeleton syncer and start the process - dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) + dl.skeleton = newSkeleton(chain, stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) go dl.stateFetcher() return dl diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index 873ee950b66..5157cc0f99e 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -141,7 +141,8 @@ type headerRequest struct { cancel chan struct{} // Channel to track sync cancellation stale chan struct{} // Channel to signal the request was dropped - head uint64 // Head number of the requested batch of headers + head uint64 // Head number of the requested batch of headers + chain BlockChain } // headerResponse is an already verified remote response to a header request. @@ -201,6 +202,7 @@ type backfiller interface { type skeleton struct { db ethdb.Database // Database backing the skeleton filler backfiller // Chain syncer suspended/resumed by head events + chain BlockChain peers *peerSet // Set of peers we can sync from idles map[string]*peerConnection // Set of idle peers in the current sync cycle @@ -227,7 +229,7 @@ type skeleton struct { // newSkeleton creates a new sync skeleton that tracks a potentially dangling // header chain until it's linked into an existing set of blocks. -func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton { +func newSkeleton(chain BlockChain, db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton { sk := &skeleton{ db: db, filler: filler, @@ -237,6 +239,7 @@ func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler back headEvents: make(chan *headUpdate), terminate: make(chan chan error), terminated: make(chan struct{}), + chain: chain, } go sk.startup() return sk @@ -346,7 +349,7 @@ func (s *skeleton) Sync(head *types.Header, final *types.Header, force bool) err // sync is the internal version of Sync that executes a single sync cycle, either // until some termination condition is reached, or until the current cycle merges // with a previously aborted run. -func (s *skeleton) sync(head *types.Header) (*types.Header, error) { +func (s *skeleton) sync(head *types.Header) (header *types.Header, err error) { // If we're continuing a previous merge interrupt, just access the existing // old state without initing from disk. if head == nil { @@ -387,6 +390,27 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { log.Error("Latest filled block is not available") return } + // if the skeleton just linked up and the current snap/full block is within + // the range of the skeleton, the skeleton forked + newlyLinked := + rawdb.HasHeader(s.db, s.progress.Subchains[0].Next, s.progress.Subchains[0].Tail-1) && + rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.progress.Subchains[0].Tail-1) && + rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.progress.Subchains[0].Tail-1) && !linked + if newlyLinked && filled.Number.Uint64() >= s.progress.Subchains[0].Tail { + // TODO: this could also happen if the skeleton was reverted back to a block already in the filled history? + // ^probably/definitely not but need to verify + + // revert the chain to the shared ancestor + ancestor, err := s.findSkeletonAncestor(filled) + if err != nil { + log.Crit("Failed to find skeleton ancestor", "err", err) + } + if err = s.chain.SetHead(ancestor); err != nil { + log.Crit("Failed to rewind chain", "err", err) + } + filled = s.chain.CurrentSnapBlock() + } + // If something was filled, try to delete stale sync helpers. If // unsuccessful, warn the user, but not much else we can do (it's // a programming error, just let users report an issue and don't @@ -1246,3 +1270,21 @@ func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, final *type func (s *skeleton) Header(number uint64) *types.Header { return rawdb.ReadSkeletonHeader(s.db, number) } + +// find the common ancestor header of the skeleton chain and the snap block +func (s *skeleton) findSkeletonAncestor(filledHeader *types.Header) (uint64, error) { + for { + if filledHeader.Hash() == s.Header(filledHeader.Number.Uint64()).Hash() { + return filledHeader.Number.Uint64(), nil + } + if filledHeader.Number.Uint64() == s.progress.Subchains[0].Tail-1 { + if filledHeader.Hash() == s.progress.Subchains[0].Next { + return filledHeader.Number.Uint64(), nil + } + break + } + filledHeader = s.chain.GetHeaderByHash(filledHeader.ParentHash) + } + log.Crit("absolutely should not happen: the chain of the filled header and the skeleton chain must have a common ancestor") + return 0, nil +} diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 3693ab095ff..6af364a8012 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -368,7 +368,7 @@ func TestSkeletonSyncInit(t *testing.T) { // Create a skeleton sync and run a cycle wait := make(chan struct{}) - skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) + skeleton := newSkeleton(nil, db, newPeerSet(), nil, newHookedBackfiller()) skeleton.syncStarting = func() { close(wait) } skeleton.Sync(tt.head, nil, true) @@ -482,7 +482,7 @@ func TestSkeletonSyncExtend(t *testing.T) { // Create a skeleton sync and run a cycle wait := make(chan struct{}) - skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) + skeleton := newSkeleton(nil, db, newPeerSet(), nil, newHookedBackfiller()) skeleton.syncStarting = func() { close(wait) } skeleton.Sync(tt.head, nil, true) @@ -858,7 +858,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) { } } // Create a skeleton sync and run a cycle - skeleton := newSkeleton(db, peerset, drop, filler) + skeleton := newSkeleton(nil, db, peerset, drop, filler) skeleton.Sync(tt.head, nil, true) var progress skeletonProgress From 9ad31093ea0d681c9680d37672bf2fa1bcb11edb Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Mon, 8 Apr 2024 06:50:23 -0700 Subject: [PATCH 6/8] fix test case --- eth/downloader/downloader_test.go | 38 ++++++++++++++++++------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 2e849fe98e9..7469608bfb2 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1379,28 +1379,34 @@ func testBeaconForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { } }() + <-starting + progress <- struct{}{} + + // reorg below available state causes the state sync to rewind to genesis select { case <-success: checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ - HighestBlock: uint64(len(chainB.blocks) - 1), - CurrentBlock: uint64(len(chainB.blocks) - 1), - // TODO: check that origin block is at the start of the fork (not 0) + HighestBlock: uint64(len(chainB.blocks) - 1), + CurrentBlock: uint64(len(chainB.blocks) - 1), + StartingBlock: 0, }) case <-time.NewTimer(time.Second * 3).C: t.Fatalf("Failed to sync chain in three seconds") } - checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{ - StartingBlock: uint64(len(testChainBase.blocks)) - 1, - CurrentBlock: uint64(len(chainA.blocks) - 1), - HighestBlock: uint64(len(chainB.blocks) - 1), - }) + /* + checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{ + StartingBlock: uint64(len(testChainBase.blocks)) - 1, + CurrentBlock: uint64(len(chainA.blocks) - 1), + HighestBlock: uint64(len(chainB.blocks) - 1), + }) - // Check final progress after successful sync - progress <- struct{}{} - pending.Wait() - checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ - StartingBlock: uint64(len(testChainBase.blocks)) - 1, - CurrentBlock: uint64(len(chainB.blocks) - 1), - HighestBlock: uint64(len(chainB.blocks) - 1), - }) + // Check final progress after successful sync + progress <- struct{}{} + pending.Wait() + checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ + StartingBlock: uint64(len(testChainBase.blocks)) - 1, + CurrentBlock: uint64(len(chainB.blocks) - 1), + HighestBlock: uint64(len(chainB.blocks) - 1), + }) + */ } From ee00779a790e7190f89d9f7b13bab548b12c0415 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Mon, 8 Apr 2024 11:58:24 -0700 Subject: [PATCH 7/8] remove verbose logging --- eth/downloader/downloader_test.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 7469608bfb2..c810518d56a 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1355,7 +1355,6 @@ func testBeaconForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { } }() - log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stdout, log.LevelInfo, false))) <-starting progress <- struct{}{} select { @@ -1368,7 +1367,6 @@ func testBeaconForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { t.Fatalf("Failed to sync chain in three seconds") } - fmt.Println("syncing to fork B") // Set the head to a second fork tester.newPeer("fork B", protocol, chainB.blocks[1:]) pending.Add(1) @@ -1393,20 +1391,4 @@ func testBeaconForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { case <-time.NewTimer(time.Second * 3).C: t.Fatalf("Failed to sync chain in three seconds") } - /* - checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{ - StartingBlock: uint64(len(testChainBase.blocks)) - 1, - CurrentBlock: uint64(len(chainA.blocks) - 1), - HighestBlock: uint64(len(chainB.blocks) - 1), - }) - - // Check final progress after successful sync - progress <- struct{}{} - pending.Wait() - checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ - StartingBlock: uint64(len(testChainBase.blocks)) - 1, - CurrentBlock: uint64(len(chainB.blocks) - 1), - HighestBlock: uint64(len(chainB.blocks) - 1), - }) - */ } From 9f6de6d68a3cb32f8ed407596dc8900c455d542a Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Fri, 12 Apr 2024 07:54:49 -0700 Subject: [PATCH 8/8] eth/downloader: return nil from cleanStales if the latest filled hashnis non-existent or different than the corresponding skeleton header. add test case that beacon syncs a chain, beacon syncs to a separate fork MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Péter Szilágyi --- eth/downloader/downloader.go | 3 +- eth/downloader/skeleton.go | 58 ++++++++------------------------- eth/downloader/skeleton_test.go | 6 ++-- 3 files changed, 17 insertions(+), 50 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index ecaa0377699..941f575aa89 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -235,7 +235,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchai syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(), } // Create the post-merge skeleton syncer and start the process - dl.skeleton = newSkeleton(chain, stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) + dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) go dl.stateFetcher() return dl @@ -498,7 +498,6 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * if number < oldest.Number.Uint64() { count := int(oldest.Number.Uint64() - number) // it's capped by fsMinFullBlocks headers := d.readHeaderRange(oldest, count) - fmt.Printf("readHeaderRange, oldest=%x, oldest num=%d, count=%d\n", oldest.Hash(), oldest.Number, count) if len(headers) == count { pivot = headers[len(headers)-1] log.Warn("Retrieved pivot header from local", "number", pivot.Number, "hash", pivot.Hash(), "latest", latest.Number, "oldest", oldest.Number) diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index 5157cc0f99e..04421a2bf5c 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -141,8 +141,7 @@ type headerRequest struct { cancel chan struct{} // Channel to track sync cancellation stale chan struct{} // Channel to signal the request was dropped - head uint64 // Head number of the requested batch of headers - chain BlockChain + head uint64 // Head number of the requested batch of headers } // headerResponse is an already verified remote response to a header request. @@ -202,7 +201,6 @@ type backfiller interface { type skeleton struct { db ethdb.Database // Database backing the skeleton filler backfiller // Chain syncer suspended/resumed by head events - chain BlockChain peers *peerSet // Set of peers we can sync from idles map[string]*peerConnection // Set of idle peers in the current sync cycle @@ -229,7 +227,7 @@ type skeleton struct { // newSkeleton creates a new sync skeleton that tracks a potentially dangling // header chain until it's linked into an existing set of blocks. -func newSkeleton(chain BlockChain, db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton { +func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton { sk := &skeleton{ db: db, filler: filler, @@ -239,7 +237,6 @@ func newSkeleton(chain BlockChain, db ethdb.Database, peers *peerSet, drop peerD headEvents: make(chan *headUpdate), terminate: make(chan chan error), terminated: make(chan struct{}), - chain: chain, } go sk.startup() return sk @@ -349,7 +346,7 @@ func (s *skeleton) Sync(head *types.Header, final *types.Header, force bool) err // sync is the internal version of Sync that executes a single sync cycle, either // until some termination condition is reached, or until the current cycle merges // with a previously aborted run. -func (s *skeleton) sync(head *types.Header) (header *types.Header, err error) { +func (s *skeleton) sync(head *types.Header) (*types.Header, error) { // If we're continuing a previous merge interrupt, just access the existing // old state without initing from disk. if head == nil { @@ -390,27 +387,6 @@ func (s *skeleton) sync(head *types.Header) (header *types.Header, err error) { log.Error("Latest filled block is not available") return } - // if the skeleton just linked up and the current snap/full block is within - // the range of the skeleton, the skeleton forked - newlyLinked := - rawdb.HasHeader(s.db, s.progress.Subchains[0].Next, s.progress.Subchains[0].Tail-1) && - rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.progress.Subchains[0].Tail-1) && - rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.progress.Subchains[0].Tail-1) && !linked - if newlyLinked && filled.Number.Uint64() >= s.progress.Subchains[0].Tail { - // TODO: this could also happen if the skeleton was reverted back to a block already in the filled history? - // ^probably/definitely not but need to verify - - // revert the chain to the shared ancestor - ancestor, err := s.findSkeletonAncestor(filled) - if err != nil { - log.Crit("Failed to find skeleton ancestor", "err", err) - } - if err = s.chain.SetHead(ancestor); err != nil { - log.Crit("Failed to rewind chain", "err", err) - } - filled = s.chain.CurrentSnapBlock() - } - // If something was filled, try to delete stale sync helpers. If // unsuccessful, warn the user, but not much else we can do (it's // a programming error, just let users report an issue and don't @@ -1156,6 +1132,16 @@ func (s *skeleton) cleanStales(filled *types.Header) error { if number+1 == s.progress.Subchains[0].Tail { return nil } + // If the latest fill was on a different subchain, it means the backfiller + // was interrupted before it got to do any meaningful work, no cleanup + header := rawdb.ReadSkeletonHeader(s.db, filled.Number.Uint64()) + if header == nil { + log.Debug("Filled header outside of skeleton range", "number", number, "head", s.progress.Subchains[0].Head, "tail", s.progress.Subchains[0].Tail) + return nil + } else if header.Hash() != filled.Hash() { + log.Debug("Filled header on different sidechain", "number", number, "filled", filled.Hash(), "skeleton", header.Hash()) + return nil + } var ( start uint64 end uint64 @@ -1270,21 +1256,3 @@ func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, final *type func (s *skeleton) Header(number uint64) *types.Header { return rawdb.ReadSkeletonHeader(s.db, number) } - -// find the common ancestor header of the skeleton chain and the snap block -func (s *skeleton) findSkeletonAncestor(filledHeader *types.Header) (uint64, error) { - for { - if filledHeader.Hash() == s.Header(filledHeader.Number.Uint64()).Hash() { - return filledHeader.Number.Uint64(), nil - } - if filledHeader.Number.Uint64() == s.progress.Subchains[0].Tail-1 { - if filledHeader.Hash() == s.progress.Subchains[0].Next { - return filledHeader.Number.Uint64(), nil - } - break - } - filledHeader = s.chain.GetHeaderByHash(filledHeader.ParentHash) - } - log.Crit("absolutely should not happen: the chain of the filled header and the skeleton chain must have a common ancestor") - return 0, nil -} diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 6af364a8012..3693ab095ff 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -368,7 +368,7 @@ func TestSkeletonSyncInit(t *testing.T) { // Create a skeleton sync and run a cycle wait := make(chan struct{}) - skeleton := newSkeleton(nil, db, newPeerSet(), nil, newHookedBackfiller()) + skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) skeleton.syncStarting = func() { close(wait) } skeleton.Sync(tt.head, nil, true) @@ -482,7 +482,7 @@ func TestSkeletonSyncExtend(t *testing.T) { // Create a skeleton sync and run a cycle wait := make(chan struct{}) - skeleton := newSkeleton(nil, db, newPeerSet(), nil, newHookedBackfiller()) + skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) skeleton.syncStarting = func() { close(wait) } skeleton.Sync(tt.head, nil, true) @@ -858,7 +858,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) { } } // Create a skeleton sync and run a cycle - skeleton := newSkeleton(nil, db, peerset, drop, filler) + skeleton := newSkeleton(db, peerset, drop, filler) skeleton.Sync(tt.head, nil, true) var progress skeletonProgress