diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 00d22179665d..4824ffa3972c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -90,6 +90,7 @@ var ( utils.TxPoolGlobalSlotsFlag, utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, + utils.TxPoolAccountPendingLimitFlag, utils.TxPoolLifetimeFlag, utils.SyncModeFlag, utils.ExitWhenSyncedFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 26818ddfd14d..99ab2fb567d1 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -108,6 +108,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.TxPoolGlobalSlotsFlag, utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, + utils.TxPoolAccountPendingLimitFlag, utils.TxPoolLifetimeFlag, }, }, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index bccd6017b36e..6b6c35bc1082 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -402,6 +402,11 @@ var ( Usage: "Maximum number of non-executable transaction slots for all accounts", Value: ethconfig.Defaults.TxPool.GlobalQueue, } + TxPoolAccountPendingLimitFlag = cli.Uint64Flag{ + Name: "txpool.accountpendinglimit", + Usage: "Maximum number of executable transactions allowed per account", + Value: ethconfig.Defaults.TxPool.AccountPendingLimit, + } TxPoolLifetimeFlag = cli.DurationFlag{ Name: "txpool.lifetime", Usage: "Maximum amount of time non-executable transaction are queued", @@ -1519,6 +1524,9 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { if ctx.GlobalIsSet(TxPoolGlobalQueueFlag.Name) { cfg.GlobalQueue = ctx.GlobalUint64(TxPoolGlobalQueueFlag.Name) } + if ctx.GlobalIsSet(TxPoolAccountPendingLimitFlag.Name) { + cfg.AccountPendingLimit = ctx.GlobalUint64(TxPoolAccountPendingLimitFlag.Name) + } if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) { cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name) } diff --git a/core/tx_list.go b/core/tx_list.go index f219264260bb..73d646a87095 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -439,7 +439,11 @@ func (l *txList) Ready(start uint64) types.Transactions { // Len returns the length of the transaction list. func (l *txList) Len() int { - return l.txs.Len() + if l == nil { + return 0 + } else { + return l.txs.Len() + } } // Empty returns whether the list of transactions is empty or not. diff --git a/core/tx_pool.go b/core/tx_pool.go index a0a1454d5cc9..812a5ccd2227 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -104,6 +104,7 @@ var ( pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil) pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds + pendingEvictionMeter = metrics.NewRegisteredMeter("txpool/pending/eviction", nil) // Dropped due to lifetime // Metrics for the queued pool queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil) @@ -178,6 +179,8 @@ type TxPoolConfig struct { AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts + AccountPendingLimit uint64 // Number of executable transactions allowed per account + Lifetime time.Duration // Maximum amount of time non-executable transaction are queued } @@ -195,6 +198,8 @@ var DefaultTxPoolConfig = TxPoolConfig{ AccountQueue: 64, GlobalQueue: 1024, + AccountPendingLimit: 1024, + Lifetime: 3 * time.Hour, } @@ -230,6 +235,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultTxPoolConfig.GlobalQueue) conf.GlobalQueue = DefaultTxPoolConfig.GlobalQueue } + if conf.AccountPendingLimit < 1 { + log.Warn("Sanitizing invalid txpool account pending limit", "provided", conf.AccountPendingLimit, "updated", DefaultTxPoolConfig.AccountPendingLimit) + conf.AccountPendingLimit = DefaultTxPoolConfig.AccountPendingLimit + } if conf.Lifetime < 1 { log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime) conf.Lifetime = DefaultTxPoolConfig.Lifetime @@ -422,6 +431,7 @@ func (pool *TxPool) loop() { // Handle inactive account transaction eviction case <-evict.C: pool.mu.Lock() + // Evict queued transactions for addr := range pool.queue { // Skip local transactions from the eviction mechanism if pool.locals.contains(addr) { @@ -431,12 +441,28 @@ func (pool *TxPool) loop() { if time.Since(pool.beats[addr]) > pool.config.Lifetime { list := pool.queue[addr].Flatten() for _, tx := range list { - log.Trace("Evicting transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds()) + log.Trace("Evicting queued transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds()) pool.removeTx(tx.Hash(), true) } queuedEvictionMeter.Mark(int64(len(list))) } } + // Evict pending transactions + for addr := range pool.pending { + // Skip local transactions from the eviction mechanism + if pool.locals.contains(addr) { + continue + } + // Any non-locals old enough should be removed + if time.Since(pool.beats[addr]) > pool.config.Lifetime { + list := pool.pending[addr].Flatten() + for _, tx := range list { + log.Trace("Evicting pending transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds()) + pool.removeTx(tx.Hash(), true) + } + pendingEvictionMeter.Mark(int64(len(list))) + } + } pool.mu.Unlock() // Handle local transaction journal rotation @@ -957,6 +983,15 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T } list := pool.pending[addr] + // Account pending list is full + if uint64(list.Len()) >= pool.config.AccountPendingLimit { + pool.all.Remove(hash) + pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now()) + pool.priced.Removed(1) + pendingDiscardMeter.Mark(1) + return false + } + inserted, old := list.Add(tx, pool.currentState, pool.config.PriceBump, pool.chainconfig, pool.currentHead) if !inserted { // An older transaction was better, discard this @@ -1153,6 +1188,12 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { addr, _ := types.Sender(pool.signer, tx) // already validated during insertion + defer func(addr common.Address) { + if pool.queue[addr].Empty() && pool.pending[addr].Empty() { + delete(pool.beats, addr) + } + }(addr) + // Remove it from the list of known transactions pool.all.Remove(hash) pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now()) @@ -1189,7 +1230,6 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { } if future.Empty() { delete(pool.queue, addr) - delete(pool.beats, addr) } } } @@ -1544,6 +1584,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.queue, addr) + } + if pool.queue[addr].Empty() && pool.pending[addr].Empty() { delete(pool.beats, addr) } } @@ -1574,6 +1616,29 @@ func (pool *TxPool) executableTxFilter(costLimit *big.Int) func(tx *types.Transa // pending limit. The algorithm tries to reduce transaction counts by an approximately // equal number for all for accounts with many pending transactions. func (pool *TxPool) truncatePending() { + // Truncate pending lists to max length + for addr, list := range pool.pending { + if list.Len() > int(pool.config.AccountPendingLimit) { + caps := list.Cap(int(pool.config.AccountPendingLimit)) + for _, tx := range caps { + // Drop the transaction from the global pools too + hash := tx.Hash() + pool.all.Remove(hash) + pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now()) + + // Update the account nonce to the dropped transaction + // note: this will set pending nonce to the min nonce from the discarded txs + pool.pendingNonces.setIfLower(addr, tx.Nonce()) + log.Trace("Removed pending transaction to comply with hard limit", "hash", hash.Hex()) + } + pool.priced.Removed(len(caps)) + pendingGauge.Dec(int64(len(caps))) + if pool.locals.contains(addr) { + localGauge.Dec(int64(len(caps))) + } + } + } + pending := uint64(0) for _, list := range pool.pending { pending += uint64(list.Len()) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 9c75a16e7f0a..0fdade058cc0 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/core/rawdb" @@ -958,6 +959,92 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { } } +// Tests that if an account remains idle for a prolonged amount of time, any +// executable transactions queued up are dropped. +func TestPendingTransactionTimeLimiting(t *testing.T) { + // Reduce the eviction interval to a testable amount + defer func(old time.Duration) { evictionInterval = old }(evictionInterval) + evictionInterval = time.Millisecond * 100 + + // Create the pool to test the non-expiration enforcement + statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + + config := testTxPoolConfig + config.AccountPendingLimit = 1 + config.Lifetime = time.Second + + pool := NewTxPool(config, params.TestChainConfig, blockchain) + defer pool.Stop() + + // Create two test accounts to ensure remotes expire but locals do not + local, _ := crypto.GenerateKey() + remote, _ := crypto.GenerateKey() + + testAddBalance(pool, crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) + testAddBalance(pool, crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) + + err := pool.AddLocal(pricedTransaction(0, 100000, big.NewInt(1), local)) + require.NoError(t, err, "Failed to insert local transaction") + + err = pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), remote)) + require.NoError(t, err, "Failed to insert remote transaction") + + err = validateTxPoolInternals(pool) + require.NoError(t, err, "Pool internal state corrupted") + + pending, queued := pool.Stats() + require.Equal(t, 2, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + + // Allow the eviction interval to run + time.Sleep(2 * evictionInterval) + + // Transactions should not be evicted from the pool yet since lifetime duration has not passed + pending, queued = pool.Stats() + require.Equal(t, 2, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + + // Wait a bit for eviction to run and clean up any leftovers, and ensure only the local remains + time.Sleep(2 * config.Lifetime) + + pending, queued = pool.Stats() + require.Equal(t, 1, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + + // Reinsert dropped remote transaction + err = pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), remote)) + require.NoError(t, err, "Failed to insert remote transaction") + + err = validateTxPoolInternals(pool) + require.NoError(t, err, "Pool internal state corrupted") + + pending, queued = pool.Stats() + require.Equal(t, 2, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + + // Try to insert a 2nd transaction but it is discarded due to account pending limit + time.Sleep(config.Lifetime / 2) + + err = pool.AddRemote(pricedTransaction(1, 100000, big.NewInt(1), remote)) + require.NoError(t, err, "Failed to insert remote transaction") + + err = validateTxPoolInternals(pool) + require.NoError(t, err, "Pool internal state corrupted") + + pending, queued = pool.Stats() + require.Equal(t, 2, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + + // Clean up remote transaction, this shows that beat was not bumped after failed insertion + time.Sleep(config.Lifetime) + + pending, queued = pool.Stats() + require.Equal(t, 1, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + +} + // Tests that if an account remains idle for a prolonged amount of time, any // non-executable transactions queued up are dropped to prevent wasting resources // on shuffling them around. @@ -1097,8 +1184,14 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { // The whole life time pass after last promotion, kick out stale transactions time.Sleep(2 * config.Lifetime) pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if nolocals { + if pending != 0 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) + } + } else { + if pending != 1 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) + } } if nolocals { if queued != 0 { @@ -1203,6 +1296,102 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { } } +// Tests that the transaction count belonging to any account cannot go +// above the configured hard threshold. +func TestTransactionPendingPerAccountLimiting(t *testing.T) { + t.Parallel() + + // Create the pool to test the limit enforcement with + statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + + limit := 16 + + config := testTxPoolConfig + config.AccountPendingLimit = uint64(limit) + + pool := NewTxPool(config, params.TestChainConfig, blockchain) + defer pool.Stop() + + // Create a number of test accounts and fund them + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + + // Generate and queue a batch of transactions (limit + 1 per account) + nonces := make(map[common.Address]uint64) + txs := types.Transactions{} + for _, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + for j := 0; j < limit+1; j++ { + txs = append(txs, transaction(nonces[addr], 100000, key)) + nonces[addr]++ + } + } + + // Import the batch and verify txpool consistency + pool.AddRemotesSync(txs) + err := validateTxPoolInternals(pool) + require.NoError(t, err, "pool internal state corrupted") + + // Check that limits are enforced + for _, list := range pool.pending { + require.LessOrEqual(t, list.Len(), limit, "Pending transactions for account overflow allowance") + } + + pending, queued := pool.Stats() + require.Equal(t, limit*5, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + + // Save dropped nonce for future use + pendingNonces := make(map[common.Address]uint64) + for addr, nonce := range nonces { + pendingNonces[addr] = nonce - 1 + } + + // Generate and queue a batch of transactions (with nonce gap) + txs = types.Transactions{} + for _, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + for j := 0; j < limit; j++ { + txs = append(txs, transaction(nonces[addr], 100000, key)) + nonces[addr]++ + } + } + + // Import the batch and verify txpool consistency + pool.AddRemotesSync(txs) + err = validateTxPoolInternals(pool) + require.NoError(t, err, "pool internal state corrupted") + + pending, queued = pool.Stats() + require.Equal(t, limit*5, pending, "Unexpected global pending tx count") + require.Equal(t, limit*5, queued, "Unexpected global queued tx count") + + // Generate and queue a batch of transactions (fill the nonce gap) + txs = types.Transactions{} + for _, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + txs = append(txs, transaction(pendingNonces[addr], 100000, key)) + } + + // Import the batch and verify txpool consistency + pool.AddRemotesSync(txs) + err = validateTxPoolInternals(pool) + require.NoError(t, err, "pool internal state corrupted") + + // Check that limits are enforced + for _, list := range pool.pending { + require.LessOrEqual(t, list.Len(), limit, "Pending transactions for account overflow allowance") + } + + pending, queued = pool.Stats() + require.Equal(t, limit*5, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") +} + // Test the limit on transaction size is enforced correctly. // This test verifies every transaction having allowed size // is added to the pool, and longer transactions are rejected. diff --git a/params/version.go b/params/version.go index 9bffc4505f0f..fce502c363af 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ import ( const ( VersionMajor = 5 // Major version component of the current release VersionMinor = 8 // Minor version component of the current release - VersionPatch = 1 // Patch version component of the current release + VersionPatch = 2 // Patch version component of the current release VersionMeta = "mainnet" // Version metadata to append to the version string )