Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"math"
"math/big"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -76,15 +77,16 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to fast or full sync
BloomCache uint64 // Megabytes to alloc for fast sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to fast or full sync
BloomCache uint64 // Megabytes to alloc for fast sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
ShadowForkPeerIDs []string // List of peer ids that take part in the shadow-fork
}

type handler struct {
Expand Down Expand Up @@ -122,6 +124,8 @@ type handler struct {
chainSync *chainSyncer
wg sync.WaitGroup
peerWG sync.WaitGroup

shadowForkPeerIDs []string
}

// newHandler returns a handler for all Ethereum chain management protocol.
Expand All @@ -131,15 +135,16 @@ func newHandler(config *handlerConfig) (*handler, error) {
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
quitSync: make(chan struct{}),
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
quitSync: make(chan struct{}),
shadowForkPeerIDs: config.ShadowForkPeerIDs,
}
if config.Sync == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the fast
Expand Down Expand Up @@ -182,7 +187,14 @@ func newHandler(config *handlerConfig) (*handler, error) {
if atomic.LoadUint32(&h.fastSync) == 1 && atomic.LoadUint32(&h.snapSync) == 0 {
h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database)
}
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer)

dropPeerFunc := h.removePeer
// If we are shadowforking, don't drop peers.
if config.ShadowForkPeerIDs != nil {
dropPeerFunc = func(id string) {}
}

h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, dropPeerFunc)

// Construct the fetcher (short sync)
validator := func(header *types.Header) error {
Expand Down Expand Up @@ -217,7 +229,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return n, err
}
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, dropPeerFunc)

fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.peer(peer)
Expand Down Expand Up @@ -433,7 +445,7 @@ func (h *handler) Stop() {
// will only announce its availability (depending what's requested).
func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
hash := block.Hash()
peers := h.peers.peersWithoutBlock(hash)
peers := onlyShadowForkPeers(h.shadowForkPeerIDs, h.peers.peersWithoutBlock(hash))

// If propagation is requested, send to a subset of the peer
if propagate {
Expand Down Expand Up @@ -483,7 +495,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
if tx.IsL1MessageTx() {
continue
}
peers := h.peers.peersWithoutTransaction(tx.Hash())
peers := onlyShadowForkPeers(h.shadowForkPeerIDs, h.peers.peersWithoutTransaction(tx.Hash()))
// Send the tx unconditionally to a subset of our peers
numDirect := int(math.Sqrt(float64(len(peers))))
for _, peer := range peers[:numDirect] {
Expand Down Expand Up @@ -533,3 +545,16 @@ func (h *handler) txBroadcastLoop() {
}
}
}

// onlyShadowForkPeers filters out peers that are not part of the shadow fork
func onlyShadowForkPeers[peerT interface {
ID() string
}](shadowForkPeerIDs []string, peers []peerT) []peerT {
if shadowForkPeerIDs == nil {
return peers
}

return slices.DeleteFunc(peers, func(peer peerT) bool {
return !slices.Contains(shadowForkPeerIDs, peer.ID())
})
}
82 changes: 82 additions & 0 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"math/big"
"sort"
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/consensus/ethash"
Expand Down Expand Up @@ -168,3 +171,82 @@ func (b *testHandler) close() {
b.handler.Stop()
b.chain.Stop()
}

type testPeer struct {
id string
}

func (p testPeer) ID() string {
return p.id
}

func TestOnlyShadowForkPeers(t *testing.T) {

tests := map[string]struct {
shadowForkPeerIDs []string
peers []testPeer
expectedPeerIDs []string
}{
"nil peers": {
shadowForkPeerIDs: nil,
peers: nil,
expectedPeerIDs: []string{},
},
"empty peers": {
shadowForkPeerIDs: nil,
peers: []testPeer{},
expectedPeerIDs: []string{},
},
"no fork": {
shadowForkPeerIDs: nil,
peers: []testPeer{
{
id: "peer1",
},
{
id: "peer2",
},
},
expectedPeerIDs: []string{
"peer1",
"peer2",
},
},
"some shadow fork peers": {
shadowForkPeerIDs: []string{"peer2"},
peers: []testPeer{
{
id: "peer1",
},
{
id: "peer2",
},
},
expectedPeerIDs: []string{
"peer2",
},
},
"no shadow fork peers": {
shadowForkPeerIDs: []string{"peer2"},
peers: []testPeer{
{
id: "peer1",
},
{
id: "peer3",
},
},
expectedPeerIDs: []string{},
},
}

for desc, test := range tests {
t.Run(desc, func(t *testing.T) {
gotIds := []string{}
for _, peer := range onlyShadowForkPeers(test.shadowForkPeerIDs, test.peers) {
gotIds = append(gotIds, peer.ID())
}
require.Equal(t, gotIds, test.expectedPeerIDs)
})
}
}