From a221edc36ba201f84bfd97d7ad4c1d6dfa82c981 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Garamv=C3=B6lgyi?= Date: Tue, 28 Jan 2025 16:24:36 +0100 Subject: [PATCH 1/8] feat: txpool pending limit --- cmd/geth/main.go | 1 + cmd/geth/usage.go | 1 + cmd/utils/flags.go | 8 ++++++++ core/tx_pool.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 54 insertions(+) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 00d22179665d..4824ffa3972c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -90,6 +90,7 @@ var ( utils.TxPoolGlobalSlotsFlag, utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, + utils.TxPoolAccountPendingLimitFlag, utils.TxPoolLifetimeFlag, utils.SyncModeFlag, utils.ExitWhenSyncedFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 26818ddfd14d..99ab2fb567d1 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -108,6 +108,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.TxPoolGlobalSlotsFlag, utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, + utils.TxPoolAccountPendingLimitFlag, utils.TxPoolLifetimeFlag, }, }, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index bccd6017b36e..6b6c35bc1082 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -402,6 +402,11 @@ var ( Usage: "Maximum number of non-executable transaction slots for all accounts", Value: ethconfig.Defaults.TxPool.GlobalQueue, } + TxPoolAccountPendingLimitFlag = cli.Uint64Flag{ + Name: "txpool.accountpendinglimit", + Usage: "Maximum number of executable transactions allowed per account", + Value: ethconfig.Defaults.TxPool.AccountPendingLimit, + } TxPoolLifetimeFlag = cli.DurationFlag{ Name: "txpool.lifetime", Usage: "Maximum amount of time non-executable transaction are queued", @@ -1519,6 +1524,9 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { if ctx.GlobalIsSet(TxPoolGlobalQueueFlag.Name) { cfg.GlobalQueue = ctx.GlobalUint64(TxPoolGlobalQueueFlag.Name) } + if ctx.GlobalIsSet(TxPoolAccountPendingLimitFlag.Name) { + cfg.AccountPendingLimit = ctx.GlobalUint64(TxPoolAccountPendingLimitFlag.Name) + } if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) { cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name) } diff --git a/core/tx_pool.go b/core/tx_pool.go index a0a1454d5cc9..19e050d9d545 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -178,6 +178,8 @@ type TxPoolConfig struct { AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts + AccountPendingLimit uint64 // Number of executable transactions allowed per account + Lifetime time.Duration // Maximum amount of time non-executable transaction are queued } @@ -195,6 +197,8 @@ var DefaultTxPoolConfig = TxPoolConfig{ AccountQueue: 64, GlobalQueue: 1024, + AccountPendingLimit: 1024, + Lifetime: 3 * time.Hour, } @@ -230,6 +234,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultTxPoolConfig.GlobalQueue) conf.GlobalQueue = DefaultTxPoolConfig.GlobalQueue } + if conf.AccountPendingLimit < 1 { + log.Warn("Sanitizing invalid txpool account pending limit", "provided", conf.AccountPendingLimit, "updated", DefaultTxPoolConfig.AccountPendingLimit) + conf.AccountPendingLimit = DefaultTxPoolConfig.AccountPendingLimit + } if conf.Lifetime < 1 { log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime) conf.Lifetime = DefaultTxPoolConfig.Lifetime @@ -436,6 +444,14 @@ func (pool *TxPool) loop() { } queuedEvictionMeter.Mark(int64(len(list))) } + if time.Since(pool.beats[addr]) > pool.config.Lifetime { + list := pool.pending[addr].Flatten() + for _, tx := range list { + log.Trace("Evicting transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds()) + pool.removeTx(tx.Hash(), true) + } + queuedEvictionMeter.Mark(int64(len(list))) + } } pool.mu.Unlock() @@ -957,6 +973,11 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T } list := pool.pending[addr] + // Account pending list is full + if uint64(list.Len()) >= pool.config.AccountPendingLimit { + return false + } + inserted, old := list.Add(tx, pool.currentState, pool.config.PriceBump, pool.chainconfig, pool.currentHead) if !inserted { // An older transaction was better, discard this @@ -1574,6 +1595,29 @@ func (pool *TxPool) executableTxFilter(costLimit *big.Int) func(tx *types.Transa // pending limit. The algorithm tries to reduce transaction counts by an approximately // equal number for all for accounts with many pending transactions. func (pool *TxPool) truncatePending() { + // Truncate pending lists to max length + for addr, list := range pool.pending { + if list.Len() > int(pool.config.AccountPendingLimit) { + caps := list.Cap(int(pool.config.AccountPendingLimit)) + for _, tx := range caps { + // Drop the transaction from the global pools too + hash := tx.Hash() + pool.all.Remove(hash) + pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now()) + + // Update the account nonce to the dropped transaction + // note: this will set pending nonce to the min nonce from the discarded txs + pool.pendingNonces.setIfLower(addr, tx.Nonce()) + log.Trace("Removed pending transaction to comply with hard limit", "hash", hash.Hex()) + } + pool.priced.Removed(len(caps)) + pendingGauge.Dec(int64(len(caps))) + if pool.locals.contains(addr) { + localGauge.Dec(int64(len(caps))) + } + } + } + pending := uint64(0) for _, list := range pool.pending { pending += uint64(list.Len()) From 30cddb27ddddcc790a6d7493d1a5dc95af07350e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Garamv=C3=B6lgyi?= Date: Tue, 28 Jan 2025 16:38:11 +0100 Subject: [PATCH 2/8] fix eviction --- core/tx_pool.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/tx_pool.go b/core/tx_pool.go index 19e050d9d545..6738df05c6dc 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -430,6 +430,7 @@ func (pool *TxPool) loop() { // Handle inactive account transaction eviction case <-evict.C: pool.mu.Lock() + // Evict queued transactions for addr := range pool.queue { // Skip local transactions from the eviction mechanism if pool.locals.contains(addr) { @@ -444,6 +445,14 @@ func (pool *TxPool) loop() { } queuedEvictionMeter.Mark(int64(len(list))) } + } + // Evict pending transactions + for addr := range pool.pending { + // Skip local transactions from the eviction mechanism + if pool.locals.contains(addr) { + continue + } + // Any non-locals old enough should be removed if time.Since(pool.beats[addr]) > pool.config.Lifetime { list := pool.pending[addr].Flatten() for _, tx := range list { From f4a9348b180d9b4ea90d7bea4fca0be0ee3c5285 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Garamv=C3=B6lgyi?= Date: Tue, 28 Jan 2025 16:50:49 +0100 Subject: [PATCH 3/8] fix lifetime eviction tests --- core/tx_pool_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 9c75a16e7f0a..801eb8a7c895 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -1097,8 +1097,14 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { // The whole life time pass after last promotion, kick out stale transactions time.Sleep(2 * config.Lifetime) pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if nolocals { + if pending != 0 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) + } + } else { + if pending != 1 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) + } } if nolocals { if queued != 0 { @@ -1158,6 +1164,7 @@ func TestTransactionPendingLimiting(t *testing.T) { // Tests that if the transaction count belonging to multiple accounts go above // some hard threshold, the higher transactions are dropped to prevent DOS // attacks. +// TODO func TestTransactionPendingGlobalLimiting(t *testing.T) { t.Parallel() From 778808ce3d462aa4e1a16f9bc4c35ce36896084c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Garamv=C3=B6lgyi?= Date: Tue, 28 Jan 2025 16:52:16 +0100 Subject: [PATCH 4/8] update metric --- core/tx_pool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 6738df05c6dc..7efee902595a 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -104,6 +104,7 @@ var ( pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil) pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds + pendingEvictionMeter = metrics.NewRegisteredMeter("txpool/pending/eviction", nil) // Dropped due to lifetime // Metrics for the queued pool queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil) @@ -459,7 +460,7 @@ func (pool *TxPool) loop() { log.Trace("Evicting transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds()) pool.removeTx(tx.Hash(), true) } - queuedEvictionMeter.Mark(int64(len(list))) + pendingEvictionMeter.Mark(int64(len(list))) } } pool.mu.Unlock() From aae7cf098ca32374732047e6b55d34bc5ef6cfb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Garamv=C3=B6lgyi?= Date: Tue, 28 Jan 2025 17:24:17 +0100 Subject: [PATCH 5/8] add unit test --- core/tx_pool.go | 4 ++ core/tx_pool_test.go | 112 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 7efee902595a..cb0667a02270 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -985,6 +985,10 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Account pending list is full if uint64(list.Len()) >= pool.config.AccountPendingLimit { + pool.all.Remove(hash) + pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now()) + pool.priced.Removed(1) + pendingDiscardMeter.Mark(1) return false } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 801eb8a7c895..01ba5f3b0ca9 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -1164,7 +1164,6 @@ func TestTransactionPendingLimiting(t *testing.T) { // Tests that if the transaction count belonging to multiple accounts go above // some hard threshold, the higher transactions are dropped to prevent DOS // attacks. -// TODO func TestTransactionPendingGlobalLimiting(t *testing.T) { t.Parallel() @@ -1210,6 +1209,117 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { } } +// Tests that the transaction count belonging to any account cannot go +// above the configured hard threshold. +func TestTransactionPendingPerAccountLimiting(t *testing.T) { + t.Parallel() + + // Create the pool to test the limit enforcement with + statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + + config := testTxPoolConfig + config.AccountPendingLimit = 16 + + pool := NewTxPool(config, params.TestChainConfig, blockchain) + defer pool.Stop() + + // Create a number of test accounts and fund them + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + + // Generate and queue a batch of transactions + nonces := make(map[common.Address]uint64) + txs := types.Transactions{} + for _, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + // Add limit + 1 transactions per account + for j := 0; j < int(config.AccountPendingLimit)+1; j++ { + txs = append(txs, transaction(nonces[addr], 100000, key)) + nonces[addr]++ + } + } + // Import the batch and verify that limits have been enforced + pool.AddRemotesSync(txs) + + // Check that limits are enforced + for _, list := range pool.pending { + pending := list.Len() + if pending > int(config.AccountPendingLimit) { + t.Fatalf("pending transactions for account overflow allowance: %d > %d", pending, config.AccountPendingLimit) + } + } + globalPending, globalQueued := pool.Stats() + if globalPending != int(config.AccountPendingLimit)*5 { + t.Fatalf("unexpected global pending transaction count: %d != %d", globalPending, int(config.AccountPendingLimit)*5) + } + if globalQueued != 0 { + t.Fatalf("unexpected global queued transaction count: %d != %d", globalQueued, 0) + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + + // Save dropped nonce for future use + pendingNonces := make(map[common.Address]uint64) + for addr, nonce := range nonces { + pendingNonces[addr] = nonce - 1 + } + + // Generate and queue a batch of transactions (with nonce gap) + txs = types.Transactions{} + for _, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + for j := 0; j < int(config.AccountPendingLimit); j++ { + txs = append(txs, transaction(nonces[addr], 100000, key)) + nonces[addr]++ + } + } + // Import the batch and verify that limits have been enforced + pool.AddRemotesSync(txs) + + globalPending, globalQueued = pool.Stats() + if globalPending != int(config.AccountPendingLimit)*5 { + t.Fatalf("unexpected global pending transaction count: %d != %d", globalPending, int(config.AccountPendingLimit)*5) + } + if globalQueued != int(config.AccountPendingLimit)*5 { + t.Fatalf("unexpected global queued transaction count: %d != %d", globalQueued, int(config.AccountPendingLimit)*5) + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + + // Generate and queue a batch of transactions (fill the nonce gap) + txs = types.Transactions{} + for _, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + txs = append(txs, transaction(pendingNonces[addr], 100000, key)) + } + // Import the batch and verify that limits have been enforced + pool.AddRemotesSync(txs) + + // Check that limits are enforced + for _, list := range pool.pending { + pending := list.Len() + if pending > int(config.AccountPendingLimit) { + t.Fatalf("pending transactions for account overflow allowance: %d > %d", pending, config.AccountPendingLimit) + } + } + globalPending, globalQueued = pool.Stats() + if globalPending != int(config.AccountPendingLimit)*5 { + t.Fatalf("unexpected global pending transaction count: %d != %d", globalPending, int(config.AccountPendingLimit)*5) + } + if globalQueued != 0 { + t.Fatalf("unexpected global queued transaction count: %d != %d", globalQueued, 0) + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + // Test the limit on transaction size is enforced correctly. // This test verifies every transaction having allowed size // is added to the pool, and longer transactions are rejected. From 70c37ce984bb79016a5d1ada18d45e1bd2b8766e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Garamv=C3=B6lgyi?= Date: Wed, 29 Jan 2025 09:11:52 +0100 Subject: [PATCH 6/8] simplify test --- core/tx_pool.go | 4 +-- core/tx_pool_test.go | 78 ++++++++++++++++++-------------------------- 2 files changed, 34 insertions(+), 48 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index cb0667a02270..babea75e6973 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -441,7 +441,7 @@ func (pool *TxPool) loop() { if time.Since(pool.beats[addr]) > pool.config.Lifetime { list := pool.queue[addr].Flatten() for _, tx := range list { - log.Trace("Evicting transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds()) + log.Trace("Evicting queued transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds()) pool.removeTx(tx.Hash(), true) } queuedEvictionMeter.Mark(int64(len(list))) @@ -457,7 +457,7 @@ func (pool *TxPool) loop() { if time.Since(pool.beats[addr]) > pool.config.Lifetime { list := pool.pending[addr].Flatten() for _, tx := range list { - log.Trace("Evicting transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds()) + log.Trace("Evicting pending transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds()) pool.removeTx(tx.Hash(), true) } pendingEvictionMeter.Mark(int64(len(list))) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 01ba5f3b0ca9..2d78366ab85e 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/core/rawdb" @@ -1218,8 +1219,10 @@ func TestTransactionPendingPerAccountLimiting(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + limit := 16 + config := testTxPoolConfig - config.AccountPendingLimit = 16 + config.AccountPendingLimit = uint64(limit) pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() @@ -1231,38 +1234,31 @@ func TestTransactionPendingPerAccountLimiting(t *testing.T) { testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) } - // Generate and queue a batch of transactions + // Generate and queue a batch of transactions (limit + 1 per account) nonces := make(map[common.Address]uint64) txs := types.Transactions{} for _, key := range keys { addr := crypto.PubkeyToAddress(key.PublicKey) - // Add limit + 1 transactions per account - for j := 0; j < int(config.AccountPendingLimit)+1; j++ { + for j := 0; j < limit+1; j++ { txs = append(txs, transaction(nonces[addr], 100000, key)) nonces[addr]++ } } - // Import the batch and verify that limits have been enforced + + // Import the batch and verify txpool consistency pool.AddRemotesSync(txs) + err := validateTxPoolInternals(pool) + require.NoError(t, err, "pool internal state corrupted") // Check that limits are enforced for _, list := range pool.pending { - pending := list.Len() - if pending > int(config.AccountPendingLimit) { - t.Fatalf("pending transactions for account overflow allowance: %d > %d", pending, config.AccountPendingLimit) - } - } - globalPending, globalQueued := pool.Stats() - if globalPending != int(config.AccountPendingLimit)*5 { - t.Fatalf("unexpected global pending transaction count: %d != %d", globalPending, int(config.AccountPendingLimit)*5) - } - if globalQueued != 0 { - t.Fatalf("unexpected global queued transaction count: %d != %d", globalQueued, 0) - } - if err := validateTxPoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) + require.LessOrEqual(t, list.Len(), limit, "Pending transactions for account overflow allowance") } + pending, queued := pool.Stats() + require.Equal(t, limit*5, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + // Save dropped nonce for future use pendingNonces := make(map[common.Address]uint64) for addr, nonce := range nonces { @@ -1273,24 +1269,20 @@ func TestTransactionPendingPerAccountLimiting(t *testing.T) { txs = types.Transactions{} for _, key := range keys { addr := crypto.PubkeyToAddress(key.PublicKey) - for j := 0; j < int(config.AccountPendingLimit); j++ { + for j := 0; j < limit; j++ { txs = append(txs, transaction(nonces[addr], 100000, key)) nonces[addr]++ } } - // Import the batch and verify that limits have been enforced + + // Import the batch and verify txpool consistency pool.AddRemotesSync(txs) + err = validateTxPoolInternals(pool) + require.NoError(t, err, "pool internal state corrupted") - globalPending, globalQueued = pool.Stats() - if globalPending != int(config.AccountPendingLimit)*5 { - t.Fatalf("unexpected global pending transaction count: %d != %d", globalPending, int(config.AccountPendingLimit)*5) - } - if globalQueued != int(config.AccountPendingLimit)*5 { - t.Fatalf("unexpected global queued transaction count: %d != %d", globalQueued, int(config.AccountPendingLimit)*5) - } - if err := validateTxPoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } + pending, queued = pool.Stats() + require.Equal(t, limit*5, pending, "Unexpected global pending tx count") + require.Equal(t, limit*5, queued, "Unexpected global queued tx count") // Generate and queue a batch of transactions (fill the nonce gap) txs = types.Transactions{} @@ -1298,26 +1290,20 @@ func TestTransactionPendingPerAccountLimiting(t *testing.T) { addr := crypto.PubkeyToAddress(key.PublicKey) txs = append(txs, transaction(pendingNonces[addr], 100000, key)) } - // Import the batch and verify that limits have been enforced + + // Import the batch and verify txpool consistency pool.AddRemotesSync(txs) + err = validateTxPoolInternals(pool) + require.NoError(t, err, "pool internal state corrupted") // Check that limits are enforced for _, list := range pool.pending { - pending := list.Len() - if pending > int(config.AccountPendingLimit) { - t.Fatalf("pending transactions for account overflow allowance: %d > %d", pending, config.AccountPendingLimit) - } - } - globalPending, globalQueued = pool.Stats() - if globalPending != int(config.AccountPendingLimit)*5 { - t.Fatalf("unexpected global pending transaction count: %d != %d", globalPending, int(config.AccountPendingLimit)*5) - } - if globalQueued != 0 { - t.Fatalf("unexpected global queued transaction count: %d != %d", globalQueued, 0) - } - if err := validateTxPoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) + require.LessOrEqual(t, list.Len(), limit, "Pending transactions for account overflow allowance") } + + pending, queued = pool.Stats() + require.Equal(t, limit*5, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") } // Test the limit on transaction size is enforced correctly. From d4f6fe086ffff34ae1c597c6c4089dadca519809 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Garamv=C3=B6lgyi?= Date: Wed, 29 Jan 2025 10:15:39 +0100 Subject: [PATCH 7/8] fix pool.beats handling --- core/tx_list.go | 6 +++- core/tx_pool.go | 9 ++++- core/tx_pool_test.go | 86 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 2 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index f219264260bb..73d646a87095 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -439,7 +439,11 @@ func (l *txList) Ready(start uint64) types.Transactions { // Len returns the length of the transaction list. func (l *txList) Len() int { - return l.txs.Len() + if l == nil { + return 0 + } else { + return l.txs.Len() + } } // Empty returns whether the list of transactions is empty or not. diff --git a/core/tx_pool.go b/core/tx_pool.go index babea75e6973..812a5ccd2227 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -1188,6 +1188,12 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { addr, _ := types.Sender(pool.signer, tx) // already validated during insertion + defer func(addr common.Address) { + if pool.queue[addr].Empty() && pool.pending[addr].Empty() { + delete(pool.beats, addr) + } + }(addr) + // Remove it from the list of known transactions pool.all.Remove(hash) pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now()) @@ -1224,7 +1230,6 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { } if future.Empty() { delete(pool.queue, addr) - delete(pool.beats, addr) } } } @@ -1579,6 +1584,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.queue, addr) + } + if pool.queue[addr].Empty() && pool.pending[addr].Empty() { delete(pool.beats, addr) } } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 2d78366ab85e..0fdade058cc0 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -959,6 +959,92 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { } } +// Tests that if an account remains idle for a prolonged amount of time, any +// executable transactions queued up are dropped. +func TestPendingTransactionTimeLimiting(t *testing.T) { + // Reduce the eviction interval to a testable amount + defer func(old time.Duration) { evictionInterval = old }(evictionInterval) + evictionInterval = time.Millisecond * 100 + + // Create the pool to test the non-expiration enforcement + statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + + config := testTxPoolConfig + config.AccountPendingLimit = 1 + config.Lifetime = time.Second + + pool := NewTxPool(config, params.TestChainConfig, blockchain) + defer pool.Stop() + + // Create two test accounts to ensure remotes expire but locals do not + local, _ := crypto.GenerateKey() + remote, _ := crypto.GenerateKey() + + testAddBalance(pool, crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) + testAddBalance(pool, crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) + + err := pool.AddLocal(pricedTransaction(0, 100000, big.NewInt(1), local)) + require.NoError(t, err, "Failed to insert local transaction") + + err = pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), remote)) + require.NoError(t, err, "Failed to insert remote transaction") + + err = validateTxPoolInternals(pool) + require.NoError(t, err, "Pool internal state corrupted") + + pending, queued := pool.Stats() + require.Equal(t, 2, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + + // Allow the eviction interval to run + time.Sleep(2 * evictionInterval) + + // Transactions should not be evicted from the pool yet since lifetime duration has not passed + pending, queued = pool.Stats() + require.Equal(t, 2, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + + // Wait a bit for eviction to run and clean up any leftovers, and ensure only the local remains + time.Sleep(2 * config.Lifetime) + + pending, queued = pool.Stats() + require.Equal(t, 1, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + + // Reinsert dropped remote transaction + err = pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), remote)) + require.NoError(t, err, "Failed to insert remote transaction") + + err = validateTxPoolInternals(pool) + require.NoError(t, err, "Pool internal state corrupted") + + pending, queued = pool.Stats() + require.Equal(t, 2, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + + // Try to insert a 2nd transaction but it is discarded due to account pending limit + time.Sleep(config.Lifetime / 2) + + err = pool.AddRemote(pricedTransaction(1, 100000, big.NewInt(1), remote)) + require.NoError(t, err, "Failed to insert remote transaction") + + err = validateTxPoolInternals(pool) + require.NoError(t, err, "Pool internal state corrupted") + + pending, queued = pool.Stats() + require.Equal(t, 2, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + + // Clean up remote transaction, this shows that beat was not bumped after failed insertion + time.Sleep(config.Lifetime) + + pending, queued = pool.Stats() + require.Equal(t, 1, pending, "Unexpected global pending tx count") + require.Equal(t, 0, queued, "Unexpected global queued tx count") + +} + // Tests that if an account remains idle for a prolonged amount of time, any // non-executable transactions queued up are dropped to prevent wasting resources // on shuffling them around. From 8d7cc26d42606ce6c52d9270230a7ba841e985c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Garamv=C3=B6lgyi?= Date: Wed, 29 Jan 2025 19:54:32 +0100 Subject: [PATCH 8/8] bump version --- params/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/params/version.go b/params/version.go index 9bffc4505f0f..fce502c363af 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ import ( const ( VersionMajor = 5 // Major version component of the current release VersionMinor = 8 // Minor version component of the current release - VersionPatch = 1 // Patch version component of the current release + VersionPatch = 2 // Patch version component of the current release VersionMeta = "mainnet" // Version metadata to append to the version string )