Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,9 @@ func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets []
// of the session, any request in-flight need to be responded to! Empty responses
// are fine though in that case.
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
waitTimer := time.NewTimer(wait)
defer waitTimer.Stop()

for {
// Allocate a new bloom bit index to retrieve data for, stopping when done
bit, ok := s.allocateRetrieval()
Expand All @@ -604,14 +607,15 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
}
// Bit allocated, throttle a bit if we're below our batch limit
if s.pendingSections(bit) < batch {
waitTimer.Reset(wait)
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.allocateSections(bit, 0)
s.deliverSections(bit, []uint64{}, [][]byte{})
return

case <-time.After(wait):
case <-waitTimer.C:
// Throttling up, fetch whatever is available
}
}
Expand Down
6 changes: 5 additions & 1 deletion eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
localHeaders = d.readHeaderRange(tail, int(count))
log.Warn("Retrieved beacon headers from local", "from", from, "count", count)
}
fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck)
defer fsHeaderContCheckTimer.Stop()

for {
// Some beacon headers might have appeared since the last cycle, make
// sure we're always syncing to all available ones
Expand Down Expand Up @@ -381,8 +384,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
}
// State sync still going, wait a bit for new headers and retry
log.Trace("Pivot not yet committed, waiting...")
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
select {
case <-time.After(fsHeaderContCheck):
case <-fsHeaderContCheckTimer.C:
case <-d.cancelCh:
return errCanceled
}
Expand Down
29 changes: 21 additions & 8 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,11 +1015,14 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e

// Start pulling the header chain skeleton until all is done
var (
skeleton = true // Skeleton assembly phase or finishing up
pivoting = false // Whether the next request is pivot verification
ancestor = from
mode = d.getMode()
skeleton = true // Skeleton assembly phase or finishing up
pivoting = false // Whether the next request is pivot verification
ancestor = from
mode = d.getMode()
fsHeaderContCheckTimer = time.NewTimer(fsHeaderContCheck)
)
defer fsHeaderContCheckTimer.Stop()

for {
// Pull the next batch of headers, it either:
// - Pivot check to see if the chain moved too far
Expand Down Expand Up @@ -1124,8 +1127,9 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
// Don't abort header fetches while the pivot is downloading
if !d.committed.Load() && pivot <= from {
p.log.Debug("No headers, waiting for pivot commit")
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
select {
case <-time.After(fsHeaderContCheck):
case <-fsHeaderContCheckTimer.C:
continue
case <-d.cancelCh:
return errCanceled
Expand Down Expand Up @@ -1194,9 +1198,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
// sleep a bit and retry. Take care with headers already consumed during
// skeleton filling
if len(headers) == 0 && !progressed {
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
p.log.Trace("All headers delayed, waiting")
select {
case <-time.After(fsHeaderContCheck):
case <-fsHeaderContCheckTimer.C:
continue
case <-d.cancelCh:
return errCanceled
Expand Down Expand Up @@ -1276,7 +1281,10 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
var (
mode = d.getMode()
gotHeaders = false // Wait for batches of headers to process
timer = time.NewTimer(time.Second)
)
defer timer.Stop()

for {
select {
case <-d.cancelCh:
Expand Down Expand Up @@ -1397,10 +1405,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
if mode == FullSync || mode == SnapSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select {
case <-d.cancelCh:
return errCanceled
case <-time.After(time.Second):
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
Expand Down Expand Up @@ -1567,7 +1576,10 @@ func (d *Downloader) processSnapSyncContent() error {
var (
oldPivot *fetchResult // Locked in pivot block, might change eventually
oldTail []*fetchResult // Downloaded content after the pivot
timer = time.NewTimer(time.Second)
)
defer timer.Stop()

for {
// Wait for the next batch of downloaded data to be available. If we have
// not yet reached the pivot point, wait blockingly as there's no need to
Expand Down Expand Up @@ -1650,6 +1662,7 @@ func (d *Downloader) processSnapSyncContent() error {
oldPivot = P
}
// Wait for completion, occasionally checking for pivot staleness
timer.Reset(time.Second)
select {
case <-sync.done:
if sync.err != nil {
Expand All @@ -1660,7 +1673,7 @@ func (d *Downloader) processSnapSyncContent() error {
}
oldPivot = nil

case <-time.After(time.Second):
case <-timer.C:
oldTail = afterP
continue
}
Expand Down
5 changes: 4 additions & 1 deletion ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,13 @@ func (s *Service) reportLatency(conn *connWrapper) error {
return err
}
// Wait for the pong request to arrive back
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

select {
case <-s.pongCh:
// Pong delivered, report the latency
case <-time.After(5 * time.Second):
case <-timer.C:
// Ping timeout, abort
return errors.New("ping timed out")
}
Expand Down
5 changes: 4 additions & 1 deletion p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,13 @@ func (n *ExecNode) Stop() error {
go func() {
waitErr <- n.Cmd.Wait()
}()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

select {
case err := <-waitErr:
return err
case <-time.After(5 * time.Second):
case <-timer.C:
return n.Cmd.Process.Kill()
}
}
Expand Down
10 changes: 8 additions & 2 deletions p2p/simulations/mocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
if err != nil {
panic("Could not startup node network for mocker")
}
tick := time.NewTicker(10 * time.Second)
var (
tick = time.NewTicker(10 * time.Second)
timer = time.NewTimer(3 * time.Second)
)
defer tick.Stop()
defer timer.Stop()

for {
select {
case <-quit:
Expand All @@ -80,11 +85,12 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
return
}

timer.Reset(3 * time.Second)
select {
case <-quit:
log.Info("Terminating simulation loop")
return
case <-time.After(3 * time.Second):
case <-timer.C:
}

log.Debug("starting node", "id", id)
Expand Down
5 changes: 4 additions & 1 deletion p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,11 +1028,14 @@ func (net *Network) Load(snap *Snapshot) error {
}
}

timeout := time.NewTimer(snapshotLoadTimeout)
defer timeout.Stop()

select {
// Wait until all connections from the snapshot are established.
case <-allConnected:
// Make sure that we do not wait forever.
case <-time.After(snapshotLoadTimeout):
case <-timeout.C:
return errors.New("snapshot connections not established")
}
return nil
Expand Down