Skip to content

Commit 336a4d7

Browse files
karalabeobscuren
authored andcommitted
core: fix transaction reorg issues within the tx pool
1 parent 8938768 commit 336a4d7

File tree

2 files changed

+344
-44
lines changed

2 files changed

+344
-44
lines changed

core/transaction_pool.go

Lines changed: 78 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ func (pool *TxPool) resetState() {
138138
}
139139
}
140140
}
141-
142141
// Check the queue and move transactions over to the pending if possible
143142
// or remove those that have become invalid
144143
pool.checkQueue()
@@ -290,17 +289,15 @@ func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Trans
290289
}
291290

292291
// Add queues a single transaction in the pool if it is valid.
293-
func (self *TxPool) Add(tx *types.Transaction) (err error) {
292+
func (self *TxPool) Add(tx *types.Transaction) error {
294293
self.mu.Lock()
295294
defer self.mu.Unlock()
296295

297-
err = self.add(tx)
298-
if err == nil {
299-
// check and validate the queueue
300-
self.checkQueue()
296+
if err := self.add(tx); err != nil {
297+
return err
301298
}
302-
303-
return
299+
self.checkQueue()
300+
return nil
304301
}
305302

306303
// AddTransactions attempts to queue all valid transactions in txs.
@@ -406,51 +403,55 @@ func (pool *TxPool) checkQueue() {
406403
pool.resetState()
407404
}
408405

409-
var addq txQueue
406+
var promote txQueue
410407
for address, txs := range pool.queue {
411-
// guessed nonce is the nonce currently kept by the tx pool (pending state)
412-
guessedNonce := pool.pendingState.GetNonce(address)
413-
// true nonce is the nonce known by the last state
414408
currentState, err := pool.currentState()
415409
if err != nil {
416410
glog.Errorf("could not get current state: %v", err)
417411
return
418412
}
419-
trueNonce := currentState.GetNonce(address)
420-
addq := addq[:0]
413+
balance := currentState.GetBalance(address)
414+
415+
var (
416+
guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state)
417+
trueNonce = currentState.GetNonce(address) // nonce known by the last state
418+
)
419+
promote = promote[:0]
421420
for hash, tx := range txs {
422-
if tx.Nonce() < trueNonce {
423-
// Drop queued transactions whose nonce is lower than
424-
// the account nonce because they have been processed.
421+
// Drop processed or out of fund transactions
422+
if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 {
423+
if glog.V(logger.Core) {
424+
glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx)
425+
}
425426
delete(txs, hash)
426-
} else {
427-
// Collect the remaining transactions for the next pass.
428-
addq = append(addq, txQueueEntry{hash, address, tx})
429-
}
430-
}
431-
// Find the next consecutive nonce range starting at the
432-
// current account nonce.
433-
sort.Sort(addq)
434-
for i, e := range addq {
435-
// start deleting the transactions from the queue if they exceed the limit
436-
if i > maxQueued {
437-
delete(pool.queue[address], e.hash)
438427
continue
439428
}
440-
441-
if e.Nonce() > guessedNonce {
442-
if len(addq)-i > maxQueued {
429+
// Collect the remaining transactions for the next pass.
430+
promote = append(promote, txQueueEntry{hash, address, tx})
431+
}
432+
// Find the next consecutive nonce range starting at the current account nonce,
433+
// pushing the guessed nonce forward if we add consecutive transactions.
434+
sort.Sort(promote)
435+
for i, entry := range promote {
436+
// If we reached a gap in the nonces, enforce transaction limit and stop
437+
if entry.Nonce() > guessedNonce {
438+
if len(promote)-i > maxQueued {
443439
if glog.V(logger.Debug) {
444-
glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(e.hash[:]))
440+
glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:]))
445441
}
446-
for j := i + maxQueued; j < len(addq); j++ {
447-
delete(txs, addq[j].hash)
442+
for _, drop := range promote[i+maxQueued:] {
443+
delete(txs, drop.hash)
448444
}
449445
}
450446
break
451447
}
452-
delete(txs, e.hash)
453-
pool.addTx(e.hash, address, e.Transaction)
448+
// Otherwise promote the transaction and move the guess nonce if needed
449+
pool.addTx(entry.hash, address, entry.Transaction)
450+
delete(txs, entry.hash)
451+
452+
if entry.Nonce() == guessedNonce {
453+
guessedNonce++
454+
}
454455
}
455456
// Delete the entire queue entry if it became empty.
456457
if len(txs) == 0 {
@@ -460,20 +461,56 @@ func (pool *TxPool) checkQueue() {
460461
}
461462

462463
// validatePool removes invalid and processed transactions from the main pool.
464+
// If a transaction is removed for being invalid (e.g. out of funds), all sub-
465+
// sequent (Still valid) transactions are moved back into the future queue. This
466+
// is important to prevent a drained account from DOSing the network with non
467+
// executable transactions.
463468
func (pool *TxPool) validatePool() {
464469
state, err := pool.currentState()
465470
if err != nil {
466471
glog.V(logger.Info).Infoln("failed to get current state: %v", err)
467472
return
468473
}
474+
balanceCache := make(map[common.Address]*big.Int)
475+
476+
// Clean up the pending pool, accumulating invalid nonces
477+
gaps := make(map[common.Address]uint64)
478+
469479
for hash, tx := range pool.pending {
470-
from, _ := tx.From() // err already checked
471-
// perform light nonce validation
472-
if state.GetNonce(from) > tx.Nonce() {
480+
sender, _ := tx.From() // err already checked
481+
482+
// Perform light nonce and balance validation
483+
balance := balanceCache[sender]
484+
if balance == nil {
485+
balance = state.GetBalance(sender)
486+
balanceCache[sender] = balance
487+
}
488+
if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 {
489+
// Remove an already past it invalidated transaction
473490
if glog.V(logger.Core) {
474-
glog.Infof("removed tx (%x) from pool: low tx nonce\n", hash[:4])
491+
glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx)
475492
}
476493
delete(pool.pending, hash)
494+
495+
// Track the smallest invalid nonce to postpone subsequent transactions
496+
if !past {
497+
if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev {
498+
gaps[sender] = tx.Nonce()
499+
}
500+
}
501+
}
502+
}
503+
// Move all transactions after a gap back to the future queue
504+
if len(gaps) > 0 {
505+
for hash, tx := range pool.pending {
506+
sender, _ := tx.From()
507+
if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap {
508+
if glog.V(logger.Core) {
509+
glog.Infof("postponed tx (%v) due to introduced gap\n", tx)
510+
}
511+
pool.queueTx(hash, tx)
512+
delete(pool.pending, hash)
513+
}
477514
}
478515
}
479516
}

0 commit comments

Comments
 (0)