Skip to content

Commit 2ad2a4a

Browse files
omerfirmaklwedge99
authored andcommitted
feat: defer txpool reorg until worker fetches txns for the next block (scroll-tech#905)
1 parent f8d46a8 commit 2ad2a4a

File tree

3 files changed

+31
-2
lines changed

3 files changed

+31
-2
lines changed

core/tx_pool.go

+23-1
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ type TxPool struct {
268268
queueTxEventCh chan *types.Transaction
269269
reorgDoneCh chan chan struct{}
270270
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
271+
reorgPauseCh chan bool // requests to pause scheduleReorgLoop
271272
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
272273
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
273274

@@ -300,6 +301,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
300301
queueTxEventCh: make(chan *types.Transaction),
301302
reorgDoneCh: make(chan chan struct{}),
302303
reorgShutdownCh: make(chan struct{}),
304+
reorgPauseCh: make(chan bool),
303305
initDoneCh: make(chan struct{}),
304306
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
305307
}
@@ -1160,13 +1162,14 @@ func (pool *TxPool) scheduleReorgLoop() {
11601162
curDone chan struct{} // non-nil while runReorg is active
11611163
nextDone = make(chan struct{})
11621164
launchNextRun bool
1165+
reorgsPaused bool
11631166
reset *txpoolResetRequest
11641167
dirtyAccounts *accountSet
11651168
queuedEvents = make(map[common.Address]*txSortedMap)
11661169
)
11671170
for {
11681171
// Launch next background reorg if needed
1169-
if curDone == nil && launchNextRun {
1172+
if curDone == nil && launchNextRun && !reorgsPaused {
11701173
// Run the background reorg and announcements
11711174
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
11721175

@@ -1218,6 +1221,7 @@ func (pool *TxPool) scheduleReorgLoop() {
12181221
}
12191222
close(nextDone)
12201223
return
1224+
case reorgsPaused = <-pool.reorgPauseCh:
12211225
}
12221226
}
12231227
}
@@ -1677,6 +1681,24 @@ func (pool *TxPool) demoteUnexecutables() {
16771681
}
16781682
}
16791683

1684+
// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight
1685+
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
1686+
func (pool *TxPool) PauseReorgs() {
1687+
select {
1688+
case pool.reorgPauseCh <- true:
1689+
case <-pool.reorgShutdownCh:
1690+
}
1691+
}
1692+
1693+
// ResumeReorgs allows new reorg jobs to be started.
1694+
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
1695+
func (pool *TxPool) ResumeReorgs() {
1696+
select {
1697+
case pool.reorgPauseCh <- false:
1698+
case <-pool.reorgShutdownCh:
1699+
}
1700+
}
1701+
16801702
// addressByHeartbeat is an account address tagged with its last activity timestamp.
16811703
type addressByHeartbeat struct {
16821704
address common.Address

miner/scroll_worker.go

+7
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,9 @@ func (w *worker) startNewPipeline(timestamp int64) {
453453
}
454454
collectL2Timer.UpdateSince(tidyPendingStart)
455455

456+
// Allow txpool to be reorged as we build current block
457+
w.eth.TxPool().ResumeReorgs()
458+
456459
var nextL1MsgIndex uint64
457460
if dbIndex := rawdb.ReadFirstQueueIndexNotInL2Block(w.chain.Database(), parent.Hash()); dbIndex != nil {
458461
nextL1MsgIndex = *dbIndex
@@ -719,6 +722,10 @@ func (w *worker) commit(res *pipeline.Result) error {
719722
"accRows", res.Rows,
720723
)
721724

725+
// A new block event will trigger a reorg in the txpool, pause reorgs to defer this until we fetch txns for next block.
726+
// We may end up trying to process txns that we already included in the previous block, but they will all fail the nonce check
727+
w.eth.TxPool().PauseReorgs()
728+
722729
rawdb.WriteBlockRowConsumption(w.eth.ChainDb(), blockHash, res.Rows)
723730
// Commit block and state to database.
724731
_, err = w.chain.WriteBlockWithState(block, res.FinalBlock.Receipts, res.FinalBlock.CoalescedLogs, res.FinalBlock.State, true)

params/version.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
const (
2525
VersionMajor = 5 // Major version component of the current release
2626
VersionMinor = 5 // Minor version component of the current release
27-
VersionPatch = 17 // Patch version component of the current release
27+
VersionPatch = 18 // Patch version component of the current release
2828
VersionMeta = "mainnet" // Version metadata to append to the version string
2929
)
3030

0 commit comments

Comments
 (0)