From 8648b659fadd953b058db07c0ae32d5260dbd00b Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Sat, 3 Sep 2022 20:45:05 +0200 Subject: [PATCH 1/2] trie, eth/protocols/snap: write batches in more meaningful sizes when trie-healing --- eth/protocols/snap/sync.go | 33 +++++++++++++++++---------------- trie/sync.go | 8 +++++++- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index e2369378972..7df53666fb6 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -651,6 +651,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { trienodeHealResps = make(chan *trienodeHealResponse) bytecodeHealResps = make(chan *bytecodeHealResponse) ) + defer s.commit(true) for { // Remove all completed tasks and terminate sync if everything's done s.cleanStorageTasks() @@ -2154,14 +2155,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { log.Error("Invalid trienode processed", "hash", hash, "err", err) } } - batch := s.db.NewBatch() - if err := s.healer.scheduler.Commit(batch); err != nil { - log.Error("Failed to commit healing data", "err", err) - } - if err := batch.Write(); err != nil { - log.Crit("Failed to persist healing data", "err", err) - } - log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) + s.commit(false) // Calculate the processing rate of one filled trie node rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second)) @@ -2208,6 +2202,20 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { } } +func (s *Syncer) commit(force bool) { + if !force && s.healer.scheduler.MemSize() < ethdb.IdealBatchSize { + return + } + batch := s.db.NewBatch() + if err := s.healer.scheduler.Commit(batch); err != nil { + log.Error("Failed to commit healing data", "err", err) + } + if err := batch.Write(); err != nil { + log.Crit("Failed to persist healing data", "err", err) + } + log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) +} + // processBytecodeHealResponse integrates an already validated bytecode response // into the healer tasks. func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) { @@ -2234,14 +2242,7 @@ func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) { log.Error("Invalid bytecode processed", "hash", hash, "err", err) } } - batch := s.db.NewBatch() - if err := s.healer.scheduler.Commit(batch); err != nil { - log.Error("Failed to commit healing data", "err", err) - } - if err := batch.Write(); err != nil { - log.Crit("Failed to persist healing data", "err", err) - } - log.Debug("Persisted set of healing data", "type", "bytecode", "bytes", common.StorageSize(batch.ValueSize())) + s.commit(false) } // forwardAccountTask takes a filled account task and persists anything available diff --git a/trie/sync.go b/trie/sync.go index 862ce7e16e6..ae11b58d492 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -111,6 +111,7 @@ type syncMemBatch struct { nodes map[string][]byte // In-memory membatch of recently completed nodes hashes map[string]common.Hash // Hashes of recently completed nodes codes map[common.Hash][]byte // In-memory membatch of recently completed codes + size uint64 } // newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. @@ -338,6 +339,10 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { return nil } +func (s *Sync) MemSize() uint64 { + return s.membatch.size +} + // Pending returns the number of state entries currently pending for download. func (s *Sync) Pending() int { return len(s.nodeReqs) + len(s.codeReqs) @@ -479,7 +484,7 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error { // Write the node content to the membatch s.membatch.nodes[string(req.path)] = req.data s.membatch.hashes[string(req.path)] = req.hash - + s.membatch.size += 2*uint64(len(req.path)) + 32 + uint64(len(req.data)) delete(s.nodeReqs, string(req.path)) s.fetches[len(req.path)]-- @@ -501,6 +506,7 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error { func (s *Sync) commitCodeRequest(req *codeRequest) error { // Write the node content to the membatch s.membatch.codes[req.hash] = req.data + s.membatch.size += uint64(32 + len(req.data)) delete(s.codeReqs, req.hash) s.fetches[len(req.path)]-- From 37800eeb1cdcdff7dc5ec835c8217849b6c59f77 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Tue, 6 Sep 2022 12:34:21 +0200 Subject: [PATCH 2/2] eth/protocols/snap, trie: address review concerns --- eth/protocols/snap/sync.go | 9 +++++---- trie/sync.go | 10 +++++++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 7df53666fb6..f262824f9ad 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -615,6 +615,8 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { } }() defer s.report(true) + // commit any trie- and bytecode-healing data. + defer s.commitHealer(true) // Whether sync completed or not, disregard any future packets defer func() { @@ -651,7 +653,6 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { trienodeHealResps = make(chan *trienodeHealResponse) bytecodeHealResps = make(chan *bytecodeHealResponse) ) - defer s.commit(true) for { // Remove all completed tasks and terminate sync if everything's done s.cleanStorageTasks() @@ -2155,7 +2156,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { log.Error("Invalid trienode processed", "hash", hash, "err", err) } } - s.commit(false) + s.commitHealer(false) // Calculate the processing rate of one filled trie node rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second)) @@ -2202,7 +2203,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { } } -func (s *Syncer) commit(force bool) { +func (s *Syncer) commitHealer(force bool) { if !force && s.healer.scheduler.MemSize() < ethdb.IdealBatchSize { return } @@ -2242,7 +2243,7 @@ func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) { log.Error("Invalid bytecode processed", "hash", hash, "err", err) } } - s.commit(false) + s.commitHealer(false) } // forwardAccountTask takes a filled account task and persists anything available diff --git a/trie/sync.go b/trie/sync.go index ae11b58d492..31d3cbe91b9 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -111,7 +111,7 @@ type syncMemBatch struct { nodes map[string][]byte // In-memory membatch of recently completed nodes hashes map[string]common.Hash // Hashes of recently completed nodes codes map[common.Hash][]byte // In-memory membatch of recently completed codes - size uint64 + size uint64 // Estimated batch-size of in-memory data. } // newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. @@ -339,6 +339,7 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { return nil } +// MemSize returns an estimated size (in bytes) of the data held in the membatch. func (s *Sync) MemSize() uint64 { return s.membatch.size } @@ -484,7 +485,10 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error { // Write the node content to the membatch s.membatch.nodes[string(req.path)] = req.data s.membatch.hashes[string(req.path)] = req.hash - s.membatch.size += 2*uint64(len(req.path)) + 32 + uint64(len(req.data)) + // The size tracking refers to the db-batch, not the in-memory data. + // Therefore, we ignore the req.path, and account only for the hash+data + // which eventually is written to db. + s.membatch.size += common.HashLength + uint64(len(req.data)) delete(s.nodeReqs, string(req.path)) s.fetches[len(req.path)]-- @@ -506,7 +510,7 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error { func (s *Sync) commitCodeRequest(req *codeRequest) error { // Write the node content to the membatch s.membatch.codes[req.hash] = req.data - s.membatch.size += uint64(32 + len(req.data)) + s.membatch.size += common.HashLength + uint64(len(req.data)) delete(s.codeReqs, req.hash) s.fetches[len(req.path)]--