Skip to content

Commit b92d0ea

Browse files
authored
miner: use atomic type (#27013)
Use the new typed atomics in the miner package
1 parent d0fbb10 commit b92d0ea

File tree

2 files changed

+26
-26
lines changed

2 files changed

+26
-26
lines changed

miner/worker.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ const (
164164

165165
// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
166166
type newWorkReq struct {
167-
interrupt *int32
167+
interrupt *atomic.Int32
168168
noempty bool
169169
timestamp int64
170170
}
@@ -239,15 +239,15 @@ type worker struct {
239239
snapshotState *state.StateDB
240240

241241
// atomic status counters
242-
running int32 // The indicator whether the consensus engine is running or not.
243-
newTxs int32 // New arrival transaction count since last sealing work submitting.
242+
running atomic.Bool // The indicator whether the consensus engine is running or not.
243+
newTxs atomic.Int32 // New arrival transaction count since last sealing work submitting.
244244

245245
// noempty is the flag used to control whether the feature of pre-seal empty
246246
// block is enabled. The default value is false(pre-seal is enabled by default).
247247
// But in some special scenario the consensus engine will seal blocks instantaneously,
248248
// in this case this feature will add all empty blocks into canonical chain
249249
// non-stop and no real transaction will be included.
250-
noempty uint32
250+
noempty atomic.Bool
251251

252252
// newpayloadTimeout is the maximum timeout allowance for creating payload.
253253
// The default value is 2 seconds but node operator can set it to arbitrary
@@ -372,12 +372,12 @@ func (w *worker) setRecommitInterval(interval time.Duration) {
372372

373373
// disablePreseal disables pre-sealing feature
374374
func (w *worker) disablePreseal() {
375-
atomic.StoreUint32(&w.noempty, 1)
375+
w.noempty.Store(true)
376376
}
377377

378378
// enablePreseal enables pre-sealing feature
379379
func (w *worker) enablePreseal() {
380-
atomic.StoreUint32(&w.noempty, 0)
380+
w.noempty.Store(false)
381381
}
382382

383383
// pending returns the pending state and corresponding block.
@@ -409,24 +409,24 @@ func (w *worker) pendingBlockAndReceipts() (*types.Block, types.Receipts) {
409409

410410
// start sets the running status as 1 and triggers new work submitting.
411411
func (w *worker) start() {
412-
atomic.StoreInt32(&w.running, 1)
412+
w.running.Store(true)
413413
w.startCh <- struct{}{}
414414
}
415415

416416
// stop sets the running status as 0.
417417
func (w *worker) stop() {
418-
atomic.StoreInt32(&w.running, 0)
418+
w.running.Store(false)
419419
}
420420

421421
// isRunning returns an indicator whether worker is running or not.
422422
func (w *worker) isRunning() bool {
423-
return atomic.LoadInt32(&w.running) == 1
423+
return w.running.Load()
424424
}
425425

426426
// close terminates all background threads maintained by the worker.
427427
// Note the worker does not support being closed multiple times.
428428
func (w *worker) close() {
429-
atomic.StoreInt32(&w.running, 0)
429+
w.running.Store(false)
430430
close(w.exitCh)
431431
w.wg.Wait()
432432
}
@@ -457,7 +457,7 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t
457457
func (w *worker) newWorkLoop(recommit time.Duration) {
458458
defer w.wg.Done()
459459
var (
460-
interrupt *int32
460+
interrupt *atomic.Int32
461461
minRecommit = recommit // minimal resubmit interval specified by user.
462462
timestamp int64 // timestamp for each round of sealing.
463463
)
@@ -469,16 +469,16 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
469469
// commit aborts in-flight transaction execution with given signal and resubmits a new one.
470470
commit := func(noempty bool, s int32) {
471471
if interrupt != nil {
472-
atomic.StoreInt32(interrupt, s)
472+
interrupt.Store(s)
473473
}
474-
interrupt = new(int32)
474+
interrupt = new(atomic.Int32)
475475
select {
476476
case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}:
477477
case <-w.exitCh:
478478
return
479479
}
480480
timer.Reset(recommit)
481-
atomic.StoreInt32(&w.newTxs, 0)
481+
w.newTxs.Store(0)
482482
}
483483
// clearPending cleans the stale pending tasks.
484484
clearPending := func(number uint64) {
@@ -508,7 +508,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
508508
// higher priced transactions. Disable this overhead for pending blocks.
509509
if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) {
510510
// Short circuit if no new transaction arrives.
511-
if atomic.LoadInt32(&w.newTxs) == 0 {
511+
if w.newTxs.Load() == 0 {
512512
timer.Reset(recommit)
513513
continue
514514
}
@@ -650,7 +650,7 @@ func (w *worker) mainLoop() {
650650
w.commitWork(nil, true, time.Now().Unix())
651651
}
652652
}
653-
atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
653+
w.newTxs.Add(int32(len(ev.Txs)))
654654

655655
// System stopped
656656
case <-w.exitCh:
@@ -877,7 +877,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*
877877
return receipt.Logs, nil
878878
}
879879

880-
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error {
880+
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *atomic.Int32) error {
881881
gasLimit := env.header.GasLimit
882882
if env.gasPool == nil {
883883
env.gasPool = new(core.GasPool).AddGas(gasLimit)
@@ -887,7 +887,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
887887
for {
888888
// Check interruption signal and abort building if it's fired.
889889
if interrupt != nil {
890-
if signal := atomic.LoadInt32(interrupt); signal != commitInterruptNone {
890+
if signal := interrupt.Load(); signal != commitInterruptNone {
891891
return signalToErr(signal)
892892
}
893893
}
@@ -1067,7 +1067,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
10671067
// fillTransactions retrieves the pending transactions from the txpool and fills them
10681068
// into the given sealing block. The transaction selection and ordering strategy can
10691069
// be customized with the plugin in the future.
1070-
func (w *worker) fillTransactions(interrupt *int32, env *environment) error {
1070+
func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) error {
10711071
// Split the pending transactions into locals and remotes
10721072
// Fill the block with all available pending transactions.
10731073
pending := w.eth.TxPool().Pending(true)
@@ -1102,9 +1102,9 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, *big.Int, e
11021102
defer work.discard()
11031103

11041104
if !params.noTxs {
1105-
interrupt := new(int32)
1105+
interrupt := new(atomic.Int32)
11061106
timer := time.AfterFunc(w.newpayloadTimeout, func() {
1107-
atomic.StoreInt32(interrupt, commitInterruptTimeout)
1107+
interrupt.Store(commitInterruptTimeout)
11081108
})
11091109
defer timer.Stop()
11101110

@@ -1122,7 +1122,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, *big.Int, e
11221122

11231123
// commitWork generates several new sealing tasks based on the parent block
11241124
// and submit them to the sealer.
1125-
func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
1125+
func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int64) {
11261126
start := time.Now()
11271127

11281128
// Set the coinbase if the worker is running or it's required
@@ -1143,7 +1143,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
11431143
}
11441144
// Create an empty block based on temporary copied state for
11451145
// sealing in advance without waiting block execution finished.
1146-
if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
1146+
if !noempty && !w.noempty.Load() {
11471147
w.commit(work.copy(), nil, false, start)
11481148
}
11491149
// Fill pending transactions from the txpool into the block.

miner/worker_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -454,11 +454,11 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
454454
progress = make(chan struct{}, 10)
455455
result = make([]float64, 0, 10)
456456
index = 0
457-
start uint32
457+
start atomic.Bool
458458
)
459459
w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) {
460460
// Short circuit if interval checking hasn't started.
461-
if atomic.LoadUint32(&start) == 0 {
461+
if !start.Load() {
462462
return
463463
}
464464
var wantMinInterval, wantRecommitInterval time.Duration
@@ -493,7 +493,7 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
493493
w.start()
494494

495495
time.Sleep(time.Second) // Ensure two tasks have been submitted due to start opt
496-
atomic.StoreUint32(&start, 1)
496+
start.Store(true)
497497

498498
w.setRecommitInterval(3 * time.Second)
499499
select {

0 commit comments

Comments
 (0)