Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 51 additions & 24 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats

dumpReorgTxHashThreshold = 100 // Number of transaction hashse to dump when runReorg
dumpReorgTxHashThreshold = 100 // Number of transaction hashes to dump when runReorg
)

var (
Expand Down Expand Up @@ -467,7 +467,7 @@ func (pool *TxPool) loop() {
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.queue[addr].Flatten()
for _, tx := range list {
log.Trace("Evicting queued transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds())
log.Debug("Evicting queued transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds())
pool.removeTx(tx.Hash(), true)
}
queuedEvictionMeter.Mark(int64(len(list)))
Expand All @@ -483,7 +483,7 @@ func (pool *TxPool) loop() {
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.pending[addr].Flatten()
for _, tx := range list {
log.Trace("Evicting pending transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds())
log.Debug("Evicting pending transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds())
pool.removeTx(tx.Hash(), true)
}
pendingEvictionMeter.Mark(int64(len(list)))
Expand Down Expand Up @@ -546,6 +546,7 @@ func (pool *TxPool) SetGasPrice(price *big.Int) {
// pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
drop := pool.all.RemotesBelowTip(price)
for _, tx := range drop {
log.Debug("Removing transaction below price limit", "hash", tx.Hash().Hex(), "price", tx.GasFeeCap().String(), "limit", price.String())
pool.removeTx(tx.Hash(), false)
}
pool.priced.Removed(len(drop))
Expand Down Expand Up @@ -852,13 +853,13 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash.Hex())
log.Debug("Discarding already known transaction when adding the transaction", "hash", hash.Hex())
knownTxMeter.Mark(1)
return false, ErrAlreadyKnown
}

if pool.IsMiner() && rawdb.IsSkippedTransaction(pool.chain.Database(), hash) {
log.Trace("Discarding already known skipped transaction", "hash", hash.Hex())
log.Debug("Discarding already known skipped transaction when adding the transaction", "hash", hash.Hex())
knownSkippedTxMeter.Mark(1)
return false, ErrAlreadyKnown
}
Expand All @@ -869,15 +870,15 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e

// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, isLocal); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash.Hex(), "err", err)
log.Debug("Discarding invalid transaction when adding the transaction", "hash", hash.Hex(), "err", err)
invalidTxMeter.Mark(1)
return false, err
}
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash.Hex(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
log.Debug("Discarding underpriced transaction when transaction pool is full", "hash", hash.Hex(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
return false, ErrUnderpriced
}
Expand All @@ -897,15 +898,15 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e

// Special case, we still can't make the room for the new remote one.
if !isLocal && !success {
log.Trace("Discarding overflown transaction", "hash", hash.Hex())
log.Debug("Discarding overflown transaction when transaction pool is full", "hash", hash.Hex(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}
// Bump the counter of rejections-since-reorg
pool.changesSinceReorg += len(drop)
// Kick out the underpriced remote transactions.
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
log.Debug("Discarding underpriced transaction when transaction pool is full", "hash", tx.Hash().Hex(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
pool.removeTx(tx.Hash(), false)
}
Expand All @@ -916,11 +917,13 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.currentState, pool.config.PriceBump, pool.chainconfig, pool.currentHead)
if !inserted {
log.Debug("Discarding underpriced pending transaction", "hash", hash.Hex(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
pendingDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
if old != nil {
log.Debug("Replacing pending transaction", "hash", tx.Hash().Hex(), "old", old.Hash().Hex())
pool.all.Remove(old.Hash())
pool.calculateTxsLifecycle(types.Transactions{old}, time.Now())
pool.priced.Removed(1)
Expand All @@ -930,7 +933,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.queueTxEvent(tx)
log.Trace("Pooled new executable transaction", "hash", hash.Hex(), "from", from.Hex(), "to", tx.To())
log.Debug("Pooled new executable transaction", "hash", hash.Hex(), "from", from.Hex(), "to", tx.To(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())

// Successful promotion, bump the heartbeat
pool.beats[from] = time.Now()
Expand All @@ -952,7 +955,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
}
pool.journalTx(from, tx)

log.Trace("Pooled new future transaction", "hash", hash.Hex(), "from", from.Hex(), "to", tx.To())
log.Debug("Pooled new future transaction", "hash", hash.Hex(), "from", from.Hex(), "to", tx.To(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
return replaced, nil
}

Expand All @@ -969,11 +972,13 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo
inserted, old := pool.queue[from].Add(tx, pool.currentState, pool.config.PriceBump, pool.chainconfig, pool.currentHead)
if !inserted {
// An older transaction was better, discard this
log.Debug("Discarding underpriced queued transaction", "hash", hash.Hex(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
queuedDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {
log.Debug("Replacing queued transaction", "hash", tx.Hash().Hex(), "old", old.Hash().Hex())
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pool.calculateTxsLifecycle(types.Transactions{old}, time.Now())
Expand Down Expand Up @@ -1023,6 +1028,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T

// Account pending list is full
if uint64(list.Len()) >= pool.config.AccountPendingLimit {
log.Debug("Discarding pending transaction when account pending limit is reached", "hash", hash.Hex(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
pool.priced.Removed(1)
Expand All @@ -1033,6 +1039,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
inserted, old := list.Add(tx, pool.currentState, pool.config.PriceBump, pool.chainconfig, pool.currentHead)
if !inserted {
// An older transaction was better, discard this
log.Debug("Discarding underpriced pending transaction", "hash", hash.Hex(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
pool.priced.Removed(1)
Expand All @@ -1041,6 +1048,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
}
// Otherwise discard any previous transaction and mark this
if old != nil {
log.Debug("Replacing pending transaction", "hash", tx.Hash().Hex(), "old", old.Hash().Hex())
pool.all.Remove(old.Hash())
pool.calculateTxsLifecycle(types.Transactions{old}, time.Now())
pool.priced.Removed(1)
Expand All @@ -1049,7 +1057,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
// Nothing was replaced, bump the pending counter
pendingGauge.Inc(1)
}
log.Trace("Transaction promoted from future queue to pending", "hash", hash.Hex(), "from", addr.Hex(), "to", tx.To())
log.Debug("Transaction promoted from future queue to pending", "hash", hash.Hex(), "from", addr.Hex(), "to", tx.To())
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.pendingNonces.set(addr, tx.Nonce()+1)

Expand Down Expand Up @@ -1222,7 +1230,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
return
}

log.Trace("Removing transaction from pool", "hash", hash.Hex())
log.Debug("Removing transaction from pool", "hash", hash.Hex())

addr, _ := types.Sender(pool.signer, tx) // already validated during insertion

Expand Down Expand Up @@ -1452,10 +1460,10 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt
}
pool.txFeed.Send(NewTxsEvent{txs})

log.Debug("runReorg", "len(txs)", len(txs))
log.Trace("runReorg", "len(txs)", len(txs))
if len(txs) > dumpReorgTxHashThreshold {
for _, txs := range txs {
log.Debug("dumping runReorg tx hashes", "txHash", txs.Hash().Hex())
log.Trace("dumping runReorg tx hashes", "txHash", txs.Hash().Hex())
}
}
}
Expand Down Expand Up @@ -1524,6 +1532,15 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
}
}
reinject = types.TxDifference(discarded, included)

for _, tx := range discarded {
log.Debug("TXPOOL_REORG: TX removed from old chain", "hash", tx.Hash().Hex())
}

for _, tx := range included {
log.Debug("TXPOOL_REORG: TX added in the chain", "hash", tx.Hash().Hex())
}

}
}
}
Expand All @@ -1541,7 +1558,9 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
pool.currentMaxGas = newHead.GasLimit

// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
if len(reinject) > 0 {
log.Debug("Reinjecting stale transactions", "count", len(reinject))
}
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)

Expand Down Expand Up @@ -1577,6 +1596,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
// This is a special case where the transaction was already included in a block,
// thus no need to mark it as removed, setting a trace log level to avoid confusion.
log.Trace("Removed queued transaction with low nonce", "hash", hash.Hex())
}
log.Trace("Removed old queued transactions", "count", len(forwards))

Expand All @@ -1587,6 +1609,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
log.Debug("Removed unpayable queued transaction", "hash", hash.Hex())
}
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
Expand All @@ -1610,7 +1633,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
log.Trace("Removed cap-exceeding queued transaction", "hash", hash.Hex())
log.Debug("Removed cap-exceeding queued transaction", "hash", hash.Hex())
}
queuedRateLimitMeter.Mark(int64(len(caps)))
}
Expand Down Expand Up @@ -1668,7 +1691,7 @@ func (pool *TxPool) truncatePending() {
// Update the account nonce to the dropped transaction
// note: this will set pending nonce to the min nonce from the discarded txs
pool.pendingNonces.setIfLower(addr, tx.Nonce())
log.Trace("Removed pending transaction to comply with hard limit", "hash", hash.Hex())
log.Debug("Removed pending transaction to comply with hard limit", "hash", hash.Hex())
}
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
Expand Down Expand Up @@ -1721,7 +1744,7 @@ func (pool *TxPool) truncatePending() {

// Update the account nonce to the dropped transaction
pool.pendingNonces.setIfLower(offenders[i], tx.Nonce())
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash.Hex())
log.Debug("Removed fairness-exceeding pending transaction", "hash", hash.Hex())
}
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
Expand Down Expand Up @@ -1749,7 +1772,7 @@ func (pool *TxPool) truncatePending() {

// Update the account nonce to the dropped transaction
pool.pendingNonces.setIfLower(addr, tx.Nonce())
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
log.Debug("Removed fairness-exceeding pending transaction", "hash", hash.Hex())
}
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
Expand Down Expand Up @@ -1792,6 +1815,7 @@ func (pool *TxPool) truncateQueue() {
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
log.Debug("Removed fairness-exceeding queued transaction", "hash", tx.Hash().Hex())
pool.removeTx(tx.Hash(), true)
}
drop -= size
Expand All @@ -1801,6 +1825,7 @@ func (pool *TxPool) truncateQueue() {
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
log.Debug("Removed fairness-exceeding queued transaction", "hash", txs[i].Hash().Hex())
pool.removeTx(txs[i].Hash(), true)
drop--
queuedRateLimitMeter.Mark(1)
Expand All @@ -1826,22 +1851,24 @@ func (pool *TxPool) demoteUnexecutables() {
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
log.Trace("Removed old pending transaction", "hash", hash.Hex())
// This is a special case where the transaction was already included in a block,
// thus no need to mark it as removed, setting a trace log level to avoid confusion.
log.Trace("Removed pending transaction with low nonce", "hash", hash.Hex())
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
costLimit := pool.currentState.GetBalance(addr)
drops, invalids := list.FilterF(costLimit, pool.currentMaxGas, pool.executableTxFilter(costLimit))
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash.Hex())
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
log.Debug("Removed unpayable pending transaction", "hash", hash.Hex())
}
pendingNofundsMeter.Mark(int64(len(drops)))

for _, tx := range invalids {
hash := tx.Hash()
log.Trace("Demoting pending transaction", "hash", hash.Hex())
log.Debug("Demoting pending transaction", "hash", hash.Hex())

// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
Expand All @@ -1855,7 +1882,7 @@ func (pool *TxPool) demoteUnexecutables() {
gapped := list.Cap(0)
for _, tx := range gapped {
hash := tx.Hash()
log.Error("Demoting invalidated transaction", "hash", hash)
log.Error("Demoting invalidated transaction", "hash", hash.Hex())

// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 8 // Minor version component of the current release
VersionPatch = 36 // Patch version component of the current release
VersionPatch = 37 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down