Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Treat Clique block interval as a timeout #692

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions consensus/clique/clique.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (c *Clique) verifyCascadingFields(chain consensus.ChainHeaderReader, header
if parent == nil || parent.Number.Uint64() != number-1 || parent.Hash() != header.ParentHash {
return consensus.ErrUnknownAncestor
}
if parent.Time+c.config.Period > header.Time {
if parent.Time > header.Time {
return errInvalidTimestamp
}
// Verify that the gasUsed is <= gasLimit
Expand Down Expand Up @@ -554,10 +554,6 @@ func (c *Clique) Prepare(chain consensus.ChainHeaderReader, header *types.Header
if parent == nil {
return consensus.ErrUnknownAncestor
}
header.Time = parent.Time + c.config.Period
if header.Time < uint64(time.Now().Unix()) {
header.Time = uint64(time.Now().Unix())
}
return nil
}

Expand Down
191 changes: 42 additions & 149 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
traceEnv *tracing.TraceEnv // env for tracing
accRows *types.RowConsumption // accumulated row consumption for a block
nextL1MsgIndex uint64 // next L1 queue index to be processed
start time.Time // time we started building this block
}

// task contains all information for consensus engine sealing and result submitting.
Expand All @@ -141,7 +142,7 @@
const (
commitInterruptNone int32 = iota
commitInterruptNewHead
commitInterruptResubmit

Check failure on line 145 in miner/worker.go

View workflow job for this annotation

GitHub Actions / check

`commitInterruptResubmit` is unused (deadcode)
)

// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
Expand Down Expand Up @@ -399,7 +400,7 @@
}

// recalcRecommit recalculates the resubmitting interval upon feedback.
func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration {

Check failure on line 403 in miner/worker.go

View workflow job for this annotation

GitHub Actions / check

`recalcRecommit` is unused (deadcode)
var (
prevF = float64(prev.Nanoseconds())
next float64
Expand All @@ -424,15 +425,10 @@
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()
var (
interrupt *int32
minRecommit = recommit // minimal resubmit interval specified by user.
timestamp int64 // timestamp for each round of mining.
interrupt *int32
timestamp int64 // timestamp for each round of mining.
)

timer := time.NewTimer(0)
defer timer.Stop()
<-timer.C // discard the initial tick

// commit aborts in-flight transaction execution with given signal and resubmits a new one.
commit := func(noempty bool, s int32) {
if interrupt != nil {
Expand All @@ -444,7 +440,6 @@
case <-w.exitCh:
return
}
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
atomic.StoreInt32(&w.newL1Msgs, 0)
}
Expand All @@ -465,54 +460,10 @@
clearPending(w.chain.CurrentBlock().NumberU64())
timestamp = time.Now().Unix()
commit(false, commitInterruptNewHead)

case head := <-w.chainHeadCh:
clearPending(head.Block.NumberU64())
timestamp = time.Now().Unix()
commit(true, commitInterruptNewHead)

case <-timer.C:
// If mining is running resubmit a new work cycle periodically to pull in
// higher priced transactions. Disable this overhead for pending blocks.
if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) {
// Short circuit if no new transaction arrives.
if atomic.LoadInt32(&w.newTxs) == 0 && atomic.LoadInt32(&w.newL1Msgs) == 0 {
timer.Reset(recommit)
continue
}
commit(true, commitInterruptResubmit)
}

case interval := <-w.resubmitIntervalCh:
// Adjust resubmit interval explicitly by user.
if interval < minRecommitInterval {
log.Warn("Sanitizing miner recommit interval", "provided", interval, "updated", minRecommitInterval)
interval = minRecommitInterval
}
log.Info("Miner recommit interval update", "from", minRecommit, "to", interval)
minRecommit, recommit = interval, interval

if w.resubmitHook != nil {
w.resubmitHook(minRecommit, recommit)
}

case adjust := <-w.resubmitAdjustCh:
// Adjust resubmit interval by feedback.
if adjust.inc {
before := recommit
target := float64(recommit.Nanoseconds()) / adjust.ratio
recommit = recalcRecommit(minRecommit, recommit, target, true)
log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
} else {
before := recommit
recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false)
log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
}

if w.resubmitHook != nil {
w.resubmitHook(minRecommit, recommit)
}

case <-w.exitCh:
return
}
Expand All @@ -532,58 +483,29 @@
}
}()

sealTimer := time.NewTimer(0)
defer sealTimer.Stop()

for {
select {
case req := <-w.newWorkCh:
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
// new block created.

case ev := <-w.chainSideCh:
// Short circuit for duplicate side blocks
if _, exist := w.localUncles[ev.Block.Hash()]; exist {
continue
if w.chainConfig.Clique != nil {
sealTimer.Reset(time.Duration(w.chainConfig.Clique.Period) * time.Second)
}
if _, exist := w.remoteUncles[ev.Block.Hash()]; exist {
continue
}
// Add side block to possible uncle block set depending on the author.
if w.isLocalBlock != nil && w.isLocalBlock(ev.Block) {
w.localUncles[ev.Block.Hash()] = ev.Block
case <-sealTimer.C:
if w.current != nil && w.current.tcount != 0 {
w.commit(nil, w.fullTaskHook, true)
} else {
w.remoteUncles[ev.Block.Hash()] = ev.Block
sealTimer.Reset(time.Second)
}
// If our mining block contains less than 2 uncle blocks,
// add the new uncle block if valid and regenerate a mining block.
if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
start := time.Now()
if err := w.commitUncle(w.current, ev.Block.Header()); err == nil {
var uncles []*types.Header
w.current.uncles.Each(func(item interface{}) bool {
hash, ok := item.(common.Hash)
if !ok {
return false
}
uncle, exist := w.localUncles[hash]
if !exist {
uncle, exist = w.remoteUncles[hash]
}
if !exist {
return false
}
uncles = append(uncles, uncle.Header())
return false
})
w.commit(uncles, nil, true, start)
}
}

case ev := <-w.txsCh:
// Apply transactions to the pending state if we're not mining.
// Apply transactions to the pending state
//
// Note all transactions received may not be continuous with transactions
// already included in the current mining block. These transactions will
// be automatically eliminated.
if !w.isRunning() && w.current != nil {
if w.current != nil {
// If block is already full, abort
if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
continue
Expand All @@ -599,10 +521,11 @@
}
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
tcount := w.current.tcount
w.commitTransactions(txset, coinbase, nil)
// Only update the snapshot if any new transactons were added
// to the pending block
if tcount != w.current.tcount {
if seal := w.commitTransactions(txset, coinbase, nil); seal {
w.commit(nil, w.fullTaskHook, true)
} else if tcount != w.current.tcount {
// Only update the snapshot if any new transactons were added
// to the pending block
w.updateSnapshot()
}
} else {
Expand Down Expand Up @@ -799,7 +722,7 @@
}

// makeCurrent creates a new environment for the current cycle.
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
func (w *worker) makeCurrent(parent *types.Block, header *types.Header, start time.Time) error {
// Retrieve the parent state to execute on top and start a prefetcher for
// the miner to speed block sealing up a bit
state, err := w.chain.StateAt(parent.Root())
Expand Down Expand Up @@ -829,6 +752,7 @@
header: header,
traceEnv: traceEnv,
accRows: nil,
start: start,
}
// when 08 is processed ancestors contain 07 (quick block)
for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) {
Expand Down Expand Up @@ -995,7 +919,7 @@
return receipt.Logs, traces, nil
}

func (w *worker) commitTransactions(txs types.OrderedTransactionSet, coinbase common.Address, interrupt *int32) (bool, bool) {
func (w *worker) commitTransactions(txs types.OrderedTransactionSet, coinbase common.Address, interrupt *int32) bool {
defer func(t0 time.Time) {
l2CommitTxsTimer.Update(time.Since(t0))
}(time.Now())
Expand All @@ -1004,7 +928,7 @@

// Short circuit if current is nil
if w.current == nil {
return true, circuitCapacityReached
return false
}

gasLimit := w.current.header.GasLimit
Expand All @@ -1023,18 +947,7 @@
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(gasLimit-w.current.gasPool.Gas()) / float64(gasLimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead, circuitCapacityReached
return false
}
// If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas {
Expand Down Expand Up @@ -1274,12 +1187,7 @@
}
w.pendingLogsFeed.Send(cpy)
}
// Notify resubmit loop to decrease resubmitting interval if current interval is larger
// than the user-specified one.
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return false, circuitCapacityReached
return circuitCapacityReached
}

func (w *worker) checkCurrentTxNumWithCCC(expected int) {
Expand Down Expand Up @@ -1312,9 +1220,6 @@
w.circuitCapacityChecker.Reset()
log.Trace("Worker reset ccc", "id", w.circuitCapacityChecker.ID)

if parent.Time() >= uint64(timestamp) {
timestamp = int64(parent.Time() + 1)
}
num := parent.Number()
header := &types.Header{
ParentHash: parent.Hash(),
Expand Down Expand Up @@ -1359,7 +1264,7 @@
}
}
// Could potentially happen if starting to mine in an odd state.
err := w.makeCurrent(parent, header)
err := w.makeCurrent(parent, header, tstart)
if err != nil {
log.Error("Failed to create mining context", "err", err)
return
Expand Down Expand Up @@ -1394,11 +1299,6 @@
commitUncles(w.localUncles)
commitUncles(w.remoteUncles)

// Create an empty block based on temporary copied state for
// sealing in advance without waiting block execution finished.
if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
w.commit(uncles, nil, false, tstart)
}
// fetch l1Txs
var l1Messages []types.L1MessageTx
if w.chainConfig.Scroll.ShouldIncludeL1Messages() {
Expand All @@ -1423,60 +1323,52 @@
localTxs[account] = txs
}
}
var skipCommit, circuitCapacityReached bool

if w.chainConfig.Scroll.ShouldIncludeL1Messages() && len(l1Messages) > 0 {
log.Trace("Processing L1 messages for inclusion", "count", len(l1Messages))
txs, err := types.NewL1MessagesByQueueIndex(l1Messages)
if err != nil {
log.Error("Failed to create L1 message set", "l1Messages", l1Messages, "err", err)
return
}
skipCommit, circuitCapacityReached = w.commitTransactions(txs, w.coinbase, interrupt)
if skipCommit {

if seal := w.commitTransactions(txs, w.coinbase, interrupt); seal {
w.commit(uncles, w.fullTaskHook, true)
return
}
}
if w.prioritizedTx != nil && w.current.header.Number.Uint64() > w.prioritizedTx.blockNumber {
w.prioritizedTx = nil
}
if !circuitCapacityReached && w.prioritizedTx != nil && w.current.header.Number.Uint64() == w.prioritizedTx.blockNumber {
if w.prioritizedTx != nil && w.current.header.Number.Uint64() == w.prioritizedTx.blockNumber {
tx := w.prioritizedTx.tx
from, _ := types.Sender(w.current.signer, tx) // error already checked before
txList := map[common.Address]types.Transactions{from: []*types.Transaction{tx}}
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, txList, header.BaseFee)
skipCommit, circuitCapacityReached = w.commitTransactions(txs, w.coinbase, interrupt)
if skipCommit {
if seal := w.commitTransactions(txs, w.coinbase, interrupt); seal {
w.commit(uncles, w.fullTaskHook, true)
return
}
}
if len(localTxs) > 0 && !circuitCapacityReached {
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs, header.BaseFee)
skipCommit, circuitCapacityReached = w.commitTransactions(txs, w.coinbase, interrupt)
if skipCommit {
if seal := w.commitTransactions(txs, w.coinbase, interrupt); seal {
w.commit(uncles, w.fullTaskHook, true)
return
}
}
if len(remoteTxs) > 0 && !circuitCapacityReached {
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs, header.BaseFee)
// don't need to get `circuitCapacityReached` here because we don't have further `commitTransactions`
// after this one, and if we assign it won't take effect (`ineffassign`)
skipCommit, _ = w.commitTransactions(txs, w.coinbase, interrupt)
if skipCommit {
if seal := w.commitTransactions(txs, w.coinbase, interrupt); seal {
w.commit(uncles, w.fullTaskHook, true)
return
}
}

// do not produce empty blocks
if w.current.tcount == 0 {
return
}

w.commit(uncles, w.fullTaskHook, true, tstart)
}

// commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running.
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
func (w *worker) commit(uncles []*types.Header, interval func(), update bool) error {
defer func(t0 time.Time) {
l2CommitTimer.Update(time.Since(t0))
}(time.Now())
Expand Down Expand Up @@ -1536,7 +1428,7 @@
log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
"uncles", len(uncles), "txs", w.current.tcount,
"gas", block.GasUsed(), "fees", totalFees(block, receipts),
"elapsed", common.PrettyDuration(time.Since(start)))
"elapsed", common.PrettyDuration(time.Since(w.current.start)))

case <-w.exitCh:
log.Info("Worker has exited")
Expand All @@ -1545,6 +1437,7 @@
if update {
w.updateSnapshot()
}
w.current = nil
return nil
}

Expand Down
Loading
Loading