Skip to content

Commit ac8164f

Browse files
jonastheisThegaram
andauthored
feat: follower node sync from DA (#1098)
* port changes from #1013 * port changes from #1068 * go.mod tidy * fix compile error * fix goimports * fix log * address review comments * upgrade golang.org/x/net to 0.23.0 * bump version * remove unused flag * update da-codec commit --------- Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
1 parent bdf64cf commit ac8164f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2653
-468
lines changed

cmd/geth/main.go

+4
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,10 @@ var (
171171
utils.CircuitCapacityCheckWorkersFlag,
172172
utils.RollupVerifyEnabledFlag,
173173
utils.ShadowforkPeersFlag,
174+
utils.DASyncEnabledFlag,
175+
utils.DABlockNativeAPIEndpointFlag,
176+
utils.DABlobScanAPIEndpointFlag,
177+
utils.DABeaconNodeAPIEndpointFlag,
174178
}
175179

176180
rpcFlags = []cli.Flag{

cmd/utils/flags.go

+38
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,24 @@ var (
875875
Name: "net.shadowforkpeers",
876876
Usage: "peer ids of shadow fork peers",
877877
}
878+
879+
// DA syncing settings
880+
DASyncEnabledFlag = &cli.BoolFlag{
881+
Name: "da.sync",
882+
Usage: "Enable node syncing from DA",
883+
}
884+
DABlobScanAPIEndpointFlag = &cli.StringFlag{
885+
Name: "da.blob.blobscan",
886+
Usage: "BlobScan blob API endpoint",
887+
}
888+
DABlockNativeAPIEndpointFlag = &cli.StringFlag{
889+
Name: "da.blob.blocknative",
890+
Usage: "BlockNative blob API endpoint",
891+
}
892+
DABeaconNodeAPIEndpointFlag = &cli.StringFlag{
893+
Name: "da.blob.beaconnode",
894+
Usage: "Beacon node API endpoint",
895+
}
878896
)
879897

880898
// MakeDataDir retrieves the currently requested data directory, terminating
@@ -1319,6 +1337,10 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
13191337
setSmartCard(ctx, cfg)
13201338
setL1(ctx, cfg)
13211339

1340+
if ctx.IsSet(DASyncEnabledFlag.Name) {
1341+
cfg.DaSyncingEnabled = ctx.Bool(DASyncEnabledFlag.Name)
1342+
}
1343+
13221344
if ctx.GlobalIsSet(ExternalSignerFlag.Name) {
13231345
cfg.ExternalSigner = ctx.GlobalString(ExternalSignerFlag.Name)
13241346
}
@@ -1604,6 +1626,21 @@ func setEnableRollupVerify(ctx *cli.Context, cfg *ethconfig.Config) {
16041626
}
16051627
}
16061628

1629+
func setDA(ctx *cli.Context, cfg *ethconfig.Config) {
1630+
if ctx.IsSet(DASyncEnabledFlag.Name) {
1631+
cfg.EnableDASyncing = ctx.Bool(DASyncEnabledFlag.Name)
1632+
if ctx.IsSet(DABlobScanAPIEndpointFlag.Name) {
1633+
cfg.DA.BlobScanAPIEndpoint = ctx.String(DABlobScanAPIEndpointFlag.Name)
1634+
}
1635+
if ctx.IsSet(DABlockNativeAPIEndpointFlag.Name) {
1636+
cfg.DA.BlockNativeAPIEndpoint = ctx.String(DABlockNativeAPIEndpointFlag.Name)
1637+
}
1638+
if ctx.IsSet(DABeaconNodeAPIEndpointFlag.Name) {
1639+
cfg.DA.BeaconNodeAPIEndpoint = ctx.String(DABeaconNodeAPIEndpointFlag.Name)
1640+
}
1641+
}
1642+
}
1643+
16071644
func setMaxBlockRange(ctx *cli.Context, cfg *ethconfig.Config) {
16081645
if ctx.GlobalIsSet(MaxBlockRangeFlag.Name) {
16091646
cfg.MaxBlockRange = ctx.GlobalInt64(MaxBlockRangeFlag.Name)
@@ -1679,6 +1716,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
16791716
setLes(ctx, cfg)
16801717
setCircuitCapacityCheck(ctx, cfg)
16811718
setEnableRollupVerify(ctx, cfg)
1719+
setDA(ctx, cfg)
16821720
setMaxBlockRange(ctx, cfg)
16831721
if ctx.GlobalIsSet(ShadowforkPeersFlag.Name) {
16841722
cfg.ShadowForkPeerIDs = ctx.GlobalStringSlice(ShadowforkPeersFlag.Name)

common/backoff/exponential.go

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package backoff
2+
3+
import (
4+
"math"
5+
"math/rand"
6+
"time"
7+
)
8+
9+
// Exponential is a backoff strategy that increases the delay between retries exponentially.
10+
type Exponential struct {
11+
attempt int
12+
13+
maxJitter time.Duration
14+
15+
min time.Duration
16+
max time.Duration
17+
}
18+
19+
func NewExponential(minimum, maximum, maxJitter time.Duration) *Exponential {
20+
return &Exponential{
21+
min: minimum,
22+
max: maximum,
23+
maxJitter: maxJitter,
24+
}
25+
}
26+
27+
func (e *Exponential) NextDuration() time.Duration {
28+
var jitter time.Duration
29+
if e.maxJitter > 0 {
30+
jitter = time.Duration(rand.Int63n(e.maxJitter.Nanoseconds()))
31+
}
32+
33+
minFloat := float64(e.min)
34+
duration := math.Pow(2, float64(e.attempt)) * minFloat
35+
36+
// limit at configured maximum
37+
if duration > float64(e.max) {
38+
duration = float64(e.max)
39+
}
40+
41+
e.attempt++
42+
return time.Duration(duration) + jitter
43+
}
44+
45+
func (e *Exponential) Reset() {
46+
e.attempt = 0
47+
}
48+
49+
func (e *Exponential) Attempt() int {
50+
return e.attempt
51+
}

common/backoff/exponential_test.go

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package backoff
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestExponentialBackoff(t *testing.T) {
11+
t.Run("Multiple attempts", func(t *testing.T) {
12+
e := NewExponential(100*time.Millisecond, 10*time.Second, 0)
13+
expectedDurations := []time.Duration{
14+
100 * time.Millisecond,
15+
200 * time.Millisecond,
16+
400 * time.Millisecond,
17+
800 * time.Millisecond,
18+
1600 * time.Millisecond,
19+
3200 * time.Millisecond,
20+
6400 * time.Millisecond,
21+
10 * time.Second, // capped at max
22+
}
23+
for i, expected := range expectedDurations {
24+
require.Equal(t, expected, e.NextDuration(), "attempt %d", i)
25+
}
26+
})
27+
28+
t.Run("Jitter added", func(t *testing.T) {
29+
e := NewExponential(1*time.Second, 10*time.Second, 1*time.Second)
30+
duration := e.NextDuration()
31+
require.GreaterOrEqual(t, duration, 1*time.Second)
32+
require.Less(t, duration, 2*time.Second)
33+
})
34+
35+
t.Run("Edge case: min > max", func(t *testing.T) {
36+
e := NewExponential(10*time.Second, 5*time.Second, 0)
37+
require.Equal(t, 5*time.Second, e.NextDuration())
38+
})
39+
}

common/heap.go

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package common
2+
3+
import (
4+
"container/heap"
5+
)
6+
7+
// Heap is a generic min-heap (or max-heap, depending on Comparable behavior) implementation.
8+
type Heap[T Comparable[T]] struct {
9+
heap innerHeap[T]
10+
}
11+
12+
func NewHeap[T Comparable[T]]() *Heap[T] {
13+
return &Heap[T]{
14+
heap: make(innerHeap[T], 0),
15+
}
16+
}
17+
18+
func (h *Heap[T]) Len() int {
19+
return len(h.heap)
20+
}
21+
22+
func (h *Heap[T]) Push(element T) *HeapElement[T] {
23+
heapElement := NewHeapElement(element)
24+
heap.Push(&h.heap, heapElement)
25+
26+
return heapElement
27+
}
28+
29+
func (h *Heap[T]) Pop() *HeapElement[T] {
30+
return heap.Pop(&h.heap).(*HeapElement[T])
31+
}
32+
33+
func (h *Heap[T]) Peek() *HeapElement[T] {
34+
if h.Len() == 0 {
35+
return nil
36+
}
37+
38+
return h.heap[0]
39+
}
40+
41+
func (h *Heap[T]) Remove(element *HeapElement[T]) {
42+
heap.Remove(&h.heap, element.index)
43+
}
44+
45+
func (h *Heap[T]) Clear() {
46+
h.heap = make(innerHeap[T], 0)
47+
}
48+
49+
type innerHeap[T Comparable[T]] []*HeapElement[T]
50+
51+
func (h innerHeap[T]) Len() int {
52+
return len(h)
53+
}
54+
55+
func (h innerHeap[T]) Less(i, j int) bool {
56+
return h[i].Value().CompareTo(h[j].Value()) < 0
57+
}
58+
59+
func (h innerHeap[T]) Swap(i, j int) {
60+
h[i], h[j] = h[j], h[i]
61+
h[i].index, h[j].index = i, j
62+
}
63+
64+
func (h *innerHeap[T]) Push(x interface{}) {
65+
data := x.(*HeapElement[T])
66+
*h = append(*h, data)
67+
data.index = len(*h) - 1
68+
}
69+
70+
func (h *innerHeap[T]) Pop() interface{} {
71+
n := len(*h)
72+
element := (*h)[n-1]
73+
(*h)[n-1] = nil // avoid memory leak
74+
*h = (*h)[:n-1]
75+
element.index = -1
76+
77+
return element
78+
}
79+
80+
// Comparable is an interface for types that can be compared.
81+
type Comparable[T any] interface {
82+
// CompareTo compares x with other.
83+
// To create a min heap, return:
84+
// -1 if x < other
85+
// 0 if x == other
86+
// +1 if x > other
87+
// To create a max heap, return the opposite.
88+
CompareTo(other T) int
89+
}
90+
91+
// HeapElement is a wrapper around the value stored in the heap.
92+
type HeapElement[T Comparable[T]] struct {
93+
value T
94+
index int
95+
}
96+
97+
func NewHeapElement[T Comparable[T]](value T) *HeapElement[T] {
98+
return &HeapElement[T]{
99+
value: value,
100+
}
101+
}
102+
103+
func (h *HeapElement[T]) Value() T {
104+
return h.value
105+
}
106+
107+
func (h *HeapElement[T]) Index() int {
108+
return h.index
109+
}

common/heap_test.go

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package common
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
type Int int
10+
11+
func (i Int) CompareTo(other Int) int {
12+
if i < other {
13+
return -1
14+
} else if i > other {
15+
return 1
16+
} else {
17+
return 0
18+
}
19+
}
20+
21+
func TestHeap(t *testing.T) {
22+
h := NewHeap[Int]()
23+
24+
require.Equal(t, 0, h.Len(), "Heap should be empty initially")
25+
26+
h.Push(Int(3))
27+
h.Push(Int(1))
28+
h.Push(Int(2))
29+
30+
require.Equal(t, 3, h.Len(), "Heap should have three elements after pushing")
31+
32+
require.EqualValues(t, 1, h.Pop(), "Pop should return the smallest element")
33+
require.Equal(t, 2, h.Len(), "Heap should have two elements after popping")
34+
35+
require.EqualValues(t, 2, h.Pop(), "Pop should return the next smallest element")
36+
require.Equal(t, 1, h.Len(), "Heap should have one element after popping")
37+
38+
require.EqualValues(t, 3, h.Pop(), "Pop should return the last element")
39+
require.Equal(t, 0, h.Len(), "Heap should be empty after popping all elements")
40+
}

common/shrinkingmap.go

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package common
2+
3+
// ShrinkingMap is a map that shrinks itself (by allocating a new map) after a certain number of deletions have been performed.
4+
// If shrinkAfterDeletionsCount is set to <=0, the map will never shrink.
5+
// This is useful to prevent memory leaks in long-running processes that delete a lot of keys from a map.
6+
// See here for more details: https://github.com/golang/go/issues/20135
7+
type ShrinkingMap[K comparable, V any] struct {
8+
m map[K]V
9+
deletedKeys int
10+
11+
shrinkAfterDeletionsCount int
12+
}
13+
14+
func NewShrinkingMap[K comparable, V any](shrinkAfterDeletionsCount int) *ShrinkingMap[K, V] {
15+
return &ShrinkingMap[K, V]{
16+
m: make(map[K]V),
17+
shrinkAfterDeletionsCount: shrinkAfterDeletionsCount,
18+
}
19+
}
20+
21+
func (s *ShrinkingMap[K, V]) Set(key K, value V) {
22+
s.m[key] = value
23+
}
24+
25+
func (s *ShrinkingMap[K, V]) Get(key K) (value V, exists bool) {
26+
value, exists = s.m[key]
27+
return value, exists
28+
}
29+
30+
func (s *ShrinkingMap[K, V]) Has(key K) bool {
31+
_, exists := s.m[key]
32+
return exists
33+
}
34+
35+
func (s *ShrinkingMap[K, V]) Delete(key K) (deleted bool) {
36+
if _, exists := s.m[key]; !exists {
37+
return false
38+
}
39+
40+
delete(s.m, key)
41+
s.deletedKeys++
42+
43+
if s.shouldShrink() {
44+
s.shrink()
45+
}
46+
47+
return true
48+
}
49+
50+
func (s *ShrinkingMap[K, V]) Size() (size int) {
51+
return len(s.m)
52+
}
53+
54+
func (s *ShrinkingMap[K, V]) Clear() {
55+
s.m = make(map[K]V)
56+
s.deletedKeys = 0
57+
}
58+
59+
func (s *ShrinkingMap[K, V]) shouldShrink() bool {
60+
return s.shrinkAfterDeletionsCount > 0 && s.deletedKeys >= s.shrinkAfterDeletionsCount
61+
}
62+
63+
func (s *ShrinkingMap[K, V]) shrink() {
64+
newMap := make(map[K]V, len(s.m))
65+
for k, v := range s.m {
66+
newMap[k] = v
67+
}
68+
69+
s.m = newMap
70+
s.deletedKeys = 0
71+
}

0 commit comments

Comments
 (0)