Skip to content

Commit a3b512a

Browse files
omerfirmak0xmountaintop
authored andcommitted
feat: defer txpool reorg until worker fetches txns for the next block (#905)
1 parent c37f493 commit a3b512a

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

core/txpool/legacypool/legacypool.go

+23-1
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ type LegacyPool struct {
227227
queueTxEventCh chan *types.Transaction
228228
reorgDoneCh chan chan struct{}
229229
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
230+
reorgPauseCh chan bool // requests to pause scheduleReorgLoop
230231
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
231232
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
232233

@@ -258,6 +259,7 @@ func New(config Config, chain BlockChain) *LegacyPool {
258259
queueTxEventCh: make(chan *types.Transaction),
259260
reorgDoneCh: make(chan chan struct{}),
260261
reorgShutdownCh: make(chan struct{}),
262+
reorgPauseCh: make(chan bool),
261263
initDoneCh: make(chan struct{}),
262264
}
263265
pool.locals = newAccountSet(pool.signer)
@@ -1198,13 +1200,14 @@ func (pool *LegacyPool) scheduleReorgLoop() {
11981200
curDone chan struct{} // non-nil while runReorg is active
11991201
nextDone = make(chan struct{})
12001202
launchNextRun bool
1203+
reorgsPaused bool
12011204
reset *txpoolResetRequest
12021205
dirtyAccounts *accountSet
12031206
queuedEvents = make(map[common.Address]*sortedMap)
12041207
)
12051208
for {
12061209
// Launch next background reorg if needed
1207-
if curDone == nil && launchNextRun {
1210+
if curDone == nil && launchNextRun && !reorgsPaused {
12081211
// Run the background reorg and announcements
12091212
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
12101213

@@ -1256,6 +1259,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
12561259
}
12571260
close(nextDone)
12581261
return
1262+
case reorgsPaused = <-pool.reorgPauseCh:
12591263
}
12601264
}
12611265
}
@@ -1705,6 +1709,24 @@ func (pool *LegacyPool) demoteUnexecutables() {
17051709
}
17061710
}
17071711

1712+
// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight
1713+
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
1714+
func (pool *TxPool) PauseReorgs() {
1715+
select {
1716+
case pool.reorgPauseCh <- true:
1717+
case <-pool.reorgShutdownCh:
1718+
}
1719+
}
1720+
1721+
// ResumeReorgs allows new reorg jobs to be started.
1722+
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
1723+
func (pool *TxPool) ResumeReorgs() {
1724+
select {
1725+
case pool.reorgPauseCh <- false:
1726+
case <-pool.reorgShutdownCh:
1727+
}
1728+
}
1729+
17081730
// addressByHeartbeat is an account address tagged with its last activity timestamp.
17091731
type addressByHeartbeat struct {
17101732
address common.Address

miner/scroll_worker.go

+7
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,9 @@ func (w *worker) startNewPipeline(timestamp int64) {
400400
}
401401
collectL2Timer.UpdateSince(tidyPendingStart)
402402

403+
// Allow txpool to be reorged as we build current block
404+
w.eth.TxPool().ResumeReorgs()
405+
403406
var nextL1MsgIndex uint64
404407
if dbIndex := rawdb.ReadFirstQueueIndexNotInL2Block(w.chain.Database(), parent.Hash()); dbIndex != nil {
405408
nextL1MsgIndex = *dbIndex
@@ -668,6 +671,10 @@ func (w *worker) commit(res *pipeline.Result) error {
668671
"accRows", res.Rows,
669672
)
670673

674+
// A new block event will trigger a reorg in the txpool, pause reorgs to defer this until we fetch txns for next block.
675+
// We may end up trying to process txns that we already included in the previous block, but they will all fail the nonce check
676+
w.eth.TxPool().PauseReorgs()
677+
671678
rawdb.WriteBlockRowConsumption(w.eth.ChainDb(), blockHash, res.Rows)
672679
// Commit block and state to database.
673680
_, err = w.chain.WriteBlockAndSetHead(block, res.FinalBlock.Receipts, logs, res.FinalBlock.State, true)

0 commit comments

Comments
 (0)