@@ -262,54 +262,72 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
262
262
// direct request replies. The differentiation is important so the fetcher can
263
263
// re-schedule missing transactions as soon as possible.
264
264
func (f * TxFetcher ) Enqueue (peer string , txs []* types.Transaction , direct bool ) error {
265
- // Keep track of all the propagated transactions
266
- if direct {
267
- txReplyInMeter .Mark (int64 (len (txs )))
268
- } else {
269
- txBroadcastInMeter .Mark (int64 (len (txs )))
265
+ var (
266
+ inMeter = txReplyInMeter
267
+ knownMeter = txReplyKnownMeter
268
+ underpricedMeter = txReplyUnderpricedMeter
269
+ otherRejectMeter = txReplyOtherRejectMeter
270
+ )
271
+ if ! direct {
272
+ inMeter = txBroadcastInMeter
273
+ knownMeter = txBroadcastKnownMeter
274
+ underpricedMeter = txBroadcastUnderpricedMeter
275
+ otherRejectMeter = txBroadcastOtherRejectMeter
270
276
}
277
+ // Keep track of all the propagated transactions
278
+ inMeter .Mark (int64 (len (txs )))
279
+
271
280
// Push all the transactions into the pool, tracking underpriced ones to avoid
272
281
// re-requesting them and dropping the peer in case of malicious transfers.
273
282
var (
274
- added = make ([]common.Hash , 0 , len (txs ))
275
- duplicate int64
276
- underpriced int64
277
- otherreject int64
283
+ added = make ([]common.Hash , 0 , len (txs ))
278
284
)
279
- errs := f .addTxs (txs )
280
- for i , err := range errs {
281
- // Track the transaction hash if the price is too low for us.
282
- // Avoid re-request this transaction when we receive another
283
- // announcement.
284
- if errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ) {
285
- for f .underpriced .Cardinality () >= maxTxUnderpricedSetSize {
286
- f .underpriced .Pop ()
287
- }
288
- f .underpriced .Add (txs [i ].Hash ())
285
+ // proceed in batches
286
+ for i := 0 ; i < len (txs ); i += 128 {
287
+ end := i + 128
288
+ if end > len (txs ) {
289
+ end = len (txs )
289
290
}
290
- // Track a few interesting failure types
291
- switch {
292
- case err == nil : // Noop, but need to handle to not count these
291
+ var (
292
+ duplicate int64
293
+ underpriced int64
294
+ otherreject int64
295
+ )
296
+ batch := txs [i :end ]
297
+ for j , err := range f .addTxs (batch ) {
298
+ // Track the transaction hash if the price is too low for us.
299
+ // Avoid re-request this transaction when we receive another
300
+ // announcement.
301
+ if errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ) {
302
+ for f .underpriced .Cardinality () >= maxTxUnderpricedSetSize {
303
+ f .underpriced .Pop ()
304
+ }
305
+ f .underpriced .Add (batch [j ].Hash ())
306
+ }
307
+ // Track a few interesting failure types
308
+ switch {
309
+ case err == nil : // Noop, but need to handle to not count these
293
310
294
- case errors .Is (err , core .ErrAlreadyKnown ):
295
- duplicate ++
311
+ case errors .Is (err , core .ErrAlreadyKnown ):
312
+ duplicate ++
296
313
297
- case errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ):
298
- underpriced ++
314
+ case errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ):
315
+ underpriced ++
299
316
300
- default :
301
- otherreject ++
317
+ default :
318
+ otherreject ++
319
+ }
320
+ added = append (added , batch [j ].Hash ())
321
+ }
322
+ knownMeter .Mark (duplicate )
323
+ underpricedMeter .Mark (underpriced )
324
+ otherRejectMeter .Mark (otherreject )
325
+
326
+ // If 'other reject' is >25% of the deliveries in any batch, sleep a bit.
327
+ if otherreject > 128 / 4 {
328
+ time .Sleep (200 * time .Millisecond )
329
+ log .Warn ("Peer delivering stale transactions" , "peer" , peer , "rejected" , otherreject )
302
330
}
303
- added = append (added , txs [i ].Hash ())
304
- }
305
- if direct {
306
- txReplyKnownMeter .Mark (duplicate )
307
- txReplyUnderpricedMeter .Mark (underpriced )
308
- txReplyOtherRejectMeter .Mark (otherreject )
309
- } else {
310
- txBroadcastKnownMeter .Mark (duplicate )
311
- txBroadcastUnderpricedMeter .Mark (underpriced )
312
- txBroadcastOtherRejectMeter .Mark (otherreject )
313
331
}
314
332
select {
315
333
case f .cleanup <- & txDelivery {origin : peer , hashes : added , direct : direct }:
0 commit comments