Skip to content

Commit d21d3b7

Browse files
authored
feat: dispersing blobs to eigenda (#19)
* feat: dispersing blobs to eigenda * feat: return blob ID on confirmation instead of finalization * fix: review changes * feat: include RequestID in blobID - add explorer link for eigenDA * fix: cast ID to eigen value not pointer - rename var
1 parent 5d3ec22 commit d21d3b7

File tree

10 files changed

+343
-2201
lines changed

10 files changed

+343
-2201
lines changed

cmd/blob-server/constants.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package main
22

33
// Constants
44
const (
5-
EigenDaRpcUrl = "disperser-goerli.eigenda.xyz:443"
5+
EigenDaRpcUrl = "disperser-holesky.eigenda.xyz:443"
66
celestiaRpcUrl = "https://celestia-mocha-rpc.publicnode.com:443"
77
availLightClientRpcUrl = "http://localhost:26658"
88
)

cmd/blob-server/main.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strconv"
88
"sync"
99

10+
"github.com/Layr-Labs/eigenda/api/clients"
1011
"github.com/joho/godotenv"
1112

1213
"github.com/cenkalti/backoff/v4"
@@ -47,13 +48,14 @@ func main() {
4748
router := gin.Default()
4849
ctx := context.Background()
4950

50-
envFile, err := godotenv.Read("../../.env") // read from root
51+
envFile, err := godotenv.Read(".env")
5152
if err != nil {
5253
fmt.Println("Error reading .env file")
5354

5455
return
5556
}
5657
authToken := envFile["CELESTIA_AUTH_TOKEN"]
58+
eigenPrivateKey := envFile["EIGENDA_AUTH_PRIVATE_KEY"]
5759

5860
server := NewBlobServer()
5961
// Initialise all DA clients
@@ -64,6 +66,10 @@ func main() {
6466
authToken,
6567
availLightClientRpcUrl,
6668
celestiaRpcUrl,
69+
clients.EigenDAClientConfig{
70+
RPC: EigenDaRpcUrl,
71+
SignerPrivateKeyHex: eigenPrivateKey,
72+
},
6773
)
6874
if err != nil {
6975
fmt.Printf("failed to build DA clients: %v", err)

da/interface.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type Client interface {
2323
// Submit submits the Blobs to Data Availability layer.
2424
//
2525
// This method is synchronous. Upon successful submission to Data Availability layer, it returns ID identifying blob
26-
// in DA and Proof of inclusion.
26+
// in DA.
2727
// If options is nil, default options are used.
2828
Submit(ctx context.Context, blob Blob, gasPrice float64) (ID, error)
2929

daash.go

+27-6
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"fmt"
77
"log"
88
"strings"
9-
"time"
109

10+
"github.com/Layr-Labs/eigenda/api/clients"
1111
"github.com/cenkalti/backoff"
1212
"github.com/stackrlabs/go-daash/avail"
1313
"github.com/stackrlabs/go-daash/celestia"
@@ -52,6 +52,7 @@ func (d *ClientBuilder) InitClients(
5252
celestiaAuthToken string,
5353
celestiaLightClientUrl string,
5454
celestiaNodeUrl string,
55+
eigenConfig clients.EigenDAClientConfig,
5556
) (*ClientBuilder, error) {
5657
if len(layers) == 0 {
5758
return nil, fmt.Errorf("no da layers provided")
@@ -88,7 +89,7 @@ func (d *ClientBuilder) InitClients(
8889
d.Clients[Celestia] = celestia
8990

9091
case Eigen:
91-
eigen, err := eigen.NewClient("disperser-goerli.eigenda.xyz:443", time.Second*90, time.Second*5)
92+
eigen, err := eigen.NewClient(eigenConfig)
9293
if err != nil {
9394
return nil, err
9495
}
@@ -115,7 +116,7 @@ func GetHumanReadableID(id da.ID, daLayer DALayer) any {
115116
}
116117
return availID
117118
case Celestia:
118-
id, ok := id.(celestia.ID)
119+
celestiaID, ok := id.(celestia.ID)
119120
if !ok {
120121
return ""
121122
}
@@ -124,9 +125,23 @@ func GetHumanReadableID(id da.ID, daLayer DALayer) any {
124125
TxHash string `json:"txHash"`
125126
Commitment da.Commitment `json:"commitment"`
126127
}{
127-
BlockHeight: id.Height,
128-
TxHash: id.TxHash,
129-
Commitment: id.ShareCommitment,
128+
BlockHeight: celestiaID.Height,
129+
TxHash: celestiaID.TxHash,
130+
Commitment: celestiaID.ShareCommitment,
131+
}
132+
case Eigen:
133+
eigenID, ok := id.(eigen.ID)
134+
if !ok {
135+
return ""
136+
}
137+
return struct {
138+
BatchHeaderHash []byte
139+
BlobIndex uint32
140+
RequestID string
141+
}{
142+
BatchHeaderHash: eigenID.BlobInfo.BlobVerificationProof.BatchMetadata.BatchHeaderHash,
143+
BlobIndex: eigenID.BlobInfo.BlobVerificationProof.BlobIndex,
144+
RequestID: eigenID.RequestID,
130145
}
131146
default:
132147
return ""
@@ -154,6 +169,12 @@ func GetExplorerLink(client da.Client, id da.ID) (string, error) {
154169
extString := strings.Trim(string(extBytes), "\"")
155170
fmt.Println(extString)
156171
return fmt.Sprintf("https://goldberg.avail.tools/#/extrinsics/decode/%s", extString), nil
172+
case *eigen.Client:
173+
eigenID, ok := id.(eigen.ID)
174+
if !ok {
175+
return "", fmt.Errorf("invalid ID")
176+
}
177+
return fmt.Sprintf("https://blobs-holesky.eigenda.xyz/blobs/%s", eigenID.RequestID), nil
157178
default:
158179
return "", nil
159180
}

eigen/client.go

+108-99
Original file line numberDiff line numberDiff line change
@@ -5,85 +5,62 @@ import (
55
"encoding/base64"
66
"encoding/hex"
77
"fmt"
8-
"log"
8+
"log/slog"
9+
"os"
910
"time"
1011

12+
"github.com/Layr-Labs/eigenda/api/clients"
13+
grpcdisperser "github.com/Layr-Labs/eigenda/api/grpc/disperser"
14+
"github.com/Layr-Labs/eigenda/disperser"
15+
"github.com/ethereum/go-ethereum/log"
1116
"github.com/stackrlabs/go-daash/da"
12-
"google.golang.org/grpc"
13-
"google.golang.org/grpc/credentials"
1417
)
1518

1619
type Client struct {
17-
// DaRpc is the HTTP provider URL for the Data Availability node.
18-
DARpc string
19-
20-
// DisperserClient is the gRPC client for the Disperser service.
21-
disperserClient DisperserClient
22-
23-
// Quorum IDs and SecurityParams to use when dispersing and retrieving blobs
24-
DADisperserSecurityParams []*SecurityParams
25-
26-
// The total amount of time that the batcher will spend waiting for EigenDA to confirm a blob
27-
DAStatusQueryTimeout time.Duration
28-
29-
// The amount of time to wait between status queries of a newly dispersed blob
30-
DAStatusQueryRetryInterval time.Duration
20+
// internalClient is used to interact with the EigenDA API
21+
internalClient clients.EigenDAClient
3122
}
3223

3324
// NewClient returns a new instance of the EigenDA client.
34-
func NewClient(daRpc string, daStatusQueryTimeout time.Duration, daStatusQueryRetryInterval time.Duration) (*Client, error) {
35-
conn, err := grpc.Dial(daRpc, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")))
25+
func NewClient(config clients.EigenDAClientConfig) (*Client, error) {
26+
logger := log.NewLogger(slog.NewTextHandler(os.Stdout, nil))
27+
client, err := clients.NewEigenDAClient(logger, config)
3628
if err != nil {
37-
fmt.Println("Unable to connect to EigenDA, aborting", "err", err)
38-
return nil, err
29+
return nil, fmt.Errorf("failed to create EigenDA client: %v", err)
3930
}
40-
daClient := NewDisperserClient(conn)
41-
42-
disperserSecurityParams := []*SecurityParams{}
43-
disperserSecurityParams = append(disperserSecurityParams, &SecurityParams{
44-
QuorumId: 0,
45-
AdversaryThreshold: 25,
46-
QuorumThreshold: 50,
47-
})
48-
log.Println("🟢 EigenDA client initalised")
31+
4932
return &Client{
50-
DARpc: daRpc,
51-
disperserClient: daClient,
52-
DADisperserSecurityParams: disperserSecurityParams,
53-
DAStatusQueryTimeout: daStatusQueryTimeout,
54-
DAStatusQueryRetryInterval: daStatusQueryRetryInterval,
33+
internalClient: *client,
5534
}, nil
5635
}
5736

5837
func (e *Client) MaxBlobSize(ctx context.Context) (uint64, error) {
59-
return 512 * 1024, nil // Currently set at 512KB
38+
return 2 * 1024 * 1024, nil // Currently set at 2MB
6039
}
6140

62-
func (e *Client) Submit(ctx context.Context, daBlob da.Blob, gasPrice float64) (da.ID, error) {
63-
blobInfo, err := e.disperseBlob(ctx, daBlob)
41+
func (c *Client) Submit(ctx context.Context, daBlob da.Blob, gasPrice float64) (da.ID, error) {
42+
start := time.Now()
43+
blobID, err := c.PutBlob(ctx, daBlob)
6444
if err != nil {
6545
return nil, fmt.Errorf("failed to disperse blob: %v", err)
6646
}
67-
blobID := ID{
68-
BlobIndex: blobInfo.BlobVerificationProof.BlobIndex,
69-
BatchHeaderHash: blobInfo.BlobVerificationProof.BatchMetadata.BatchHeaderHash,
70-
}
47+
end := time.Now()
48+
fmt.Println("Time taken to disperse blob:", end.Sub(start))
49+
7150
return blobID, nil
7251
}
7352

74-
func (e *Client) Get(ctx context.Context, id da.ID) (da.Blob, error) {
75-
blobID, ok := id.(ID)
53+
func (c *Client) Get(ctx context.Context, id da.ID) (da.Blob, error) {
54+
blobID, ok := id.(*ID)
7655
if !ok {
7756
return nil, fmt.Errorf("invalid ID type")
7857
}
79-
resp, err := e.disperserClient.RetrieveBlob(ctx, &RetrieveBlobRequest{
80-
BlobIndex: blobID.BlobIndex,
81-
BatchHeaderHash: blobID.BatchHeaderHash,
82-
})
58+
blob, err := c.internalClient.GetBlob(ctx, blobID.BlobInfo.BlobVerificationProof.BatchMetadata.BatchHeaderHash, blobID.BlobInfo.BlobVerificationProof.BlobIndex)
8359
if err != nil {
8460
return nil, fmt.Errorf("failed to retrieve blob: %v", err)
8561
}
86-
return resp.Data, nil
62+
63+
return blob, nil
8764
}
8865

8966
func (e *Client) Commit(ctx context.Context, daBlob da.Blob) (da.Commitment, error) {
@@ -98,66 +75,98 @@ func (e *Client) GetProof(ctx context.Context, id da.ID) (da.Proof, error) {
9875
return nil, nil
9976
}
10077

101-
type ID struct {
102-
BlobIndex uint32
103-
BatchHeaderHash []byte
78+
// PutBlob encodes and writes a blob to EigenDA, waiting for it to be confirmed
79+
// before returning. This function is resiliant to transient failures and
80+
// timeouts.
81+
func (c *Client) PutBlob(ctx context.Context, data []byte) (ID, error) {
82+
resultChan, errorChan := c.PutBlobAsync(ctx, data)
83+
select { // no timeout here because we depend on the configured timeout in PutBlobAsync
84+
case result := <-resultChan:
85+
return result, nil
86+
case err := <-errorChan:
87+
return ID{}, err
88+
}
10489
}
10590

106-
func (e *Client) disperseBlob(ctx context.Context, txData []byte) (*BlobInfo, error) {
107-
fmt.Println("Attempting to disperse blob to EigenDA")
91+
func (c *Client) PutBlobAsync(ctx context.Context, data []byte) (resultChan chan ID, errChan chan error) {
92+
resultChan = make(chan ID, 1)
93+
errChan = make(chan error, 1)
94+
go c.putBlob(ctx, data, resultChan, errChan)
95+
return
96+
}
10897

109-
disperseReq := &DisperseBlobRequest{
110-
Data: txData,
111-
SecurityParams: e.DADisperserSecurityParams,
98+
func (c *Client) putBlob(ctx context.Context, rawData []byte, resultChan chan ID, errChan chan error) {
99+
// encode blob
100+
if c.internalClient.Codec == nil {
101+
errChan <- fmt.Errorf("codec cannot be nil")
102+
return
112103
}
113-
daClient := e.disperserClient
114-
disperseRes, err := daClient.DisperseBlob(ctx, disperseReq)
115-
fmt.Println("DisperseBlob response", "disperseRes", disperseRes, "err", err)
116104

105+
data, err := c.internalClient.Codec.EncodeBlob(rawData)
117106
if err != nil {
118-
fmt.Printf("Unable to disperse blob to EigenDA, aborting", "err", err)
119-
return nil, err
107+
errChan <- fmt.Errorf("error encoding blob: %w", err)
108+
return
120109
}
121110

122-
if disperseRes.Result == BlobStatus_UNKNOWN ||
123-
disperseRes.Result == BlobStatus_FAILED {
124-
fmt.Printf("Unable to disperse blob to EigenDA, aborting", "err", err)
125-
return nil, fmt.Errorf("reply status is %d", disperseRes.Result)
111+
customQuorumNumbers := make([]uint8, len(c.internalClient.Config.CustomQuorumIDs))
112+
for i, id := range c.internalClient.Config.CustomQuorumIDs {
113+
customQuorumNumbers[i] = uint8(id)
114+
}
115+
// disperse blob
116+
blobStatus, requestID, err := c.internalClient.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers)
117+
if err != nil {
118+
errChan <- fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err)
119+
return
126120
}
127121

128-
base64RequestID := base64.StdEncoding.EncodeToString(disperseRes.RequestId)
129-
130-
fmt.Println("Blob disepersed to EigenDA, now waiting for confirmation", "requestID", base64RequestID)
131-
132-
var statusRes *BlobStatusReply
133-
timeoutTime := time.Now().Add(e.DAStatusQueryTimeout)
134-
// Wait before first status check
135-
time.Sleep(e.DAStatusQueryRetryInterval)
136-
for time.Now().Before(timeoutTime) {
137-
statusRes, err = daClient.GetBlobStatus(ctx, &BlobStatusRequest{
138-
RequestId: disperseRes.RequestId,
139-
})
140-
if err != nil {
141-
fmt.Printf("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
142-
} else if statusRes.Status == BlobStatus_CONFIRMED {
143-
// TODO(eigenlayer): As long as fault proofs are disabled, we can move on once a blob is confirmed
144-
// but not yet finalized, without further logic. Once fault proofs are enabled, we will need to update
145-
// the proposer to wait until the blob associated with an L2 block has been finalized, i.e. the EigenDA
146-
// contracts on Ethereum have confirmed the full availability of the blob on EigenDA.
147-
batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash))
148-
fmt.Println("Successfully dispersed blob to EigenDA", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex)
149-
return statusRes.Info, nil
150-
} else if statusRes.Status == BlobStatus_UNKNOWN ||
151-
statusRes.Status == BlobStatus_FAILED {
152-
fmt.Println("EigenDA blob dispersal failed in processing", "requestID", base64RequestID, "err", err)
153-
return nil, fmt.Errorf("eigenDA blob dispersal failed in processing with reply status %d", statusRes.Status)
154-
} else {
155-
fmt.Println("Still waiting for confirmation from EigenDA", "requestID", base64RequestID)
156-
}
157-
158-
// Wait before first status check
159-
time.Sleep(e.DAStatusQueryRetryInterval)
122+
// process response
123+
if *blobStatus == disperser.Failed {
124+
errChan <- fmt.Errorf("reply status is %d", blobStatus)
125+
return
160126
}
161127

162-
return nil, fmt.Errorf("timed out getting EigenDA status for dispersed blob key: %s", base64RequestID)
128+
base64RequestID := base64.StdEncoding.EncodeToString(requestID)
129+
fmt.Println("Blob dispersed to EigenDA, now waiting for confirmation", "requestID", base64RequestID)
130+
131+
ticker := time.NewTicker(c.internalClient.Config.StatusQueryRetryInterval)
132+
defer ticker.Stop()
133+
134+
var cancel context.CancelFunc
135+
ctx, cancel = context.WithTimeout(ctx, c.internalClient.Config.StatusQueryTimeout)
136+
defer cancel()
137+
138+
for {
139+
select {
140+
case <-ctx.Done():
141+
errChan <- fmt.Errorf("timed out waiting for EigenDA blob to confirm blob with request id=%s: %w", base64RequestID, ctx.Err())
142+
return
143+
case <-ticker.C:
144+
statusRes, err := c.internalClient.Client.GetBlobStatus(ctx, requestID)
145+
if err != nil {
146+
c.internalClient.Log.Error("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
147+
continue
148+
}
149+
150+
switch statusRes.Status {
151+
case grpcdisperser.BlobStatus_PROCESSING, grpcdisperser.BlobStatus_DISPERSING:
152+
fmt.Println("Blob submitted, waiting for dispersal from EigenDA", "requestID", base64RequestID)
153+
case grpcdisperser.BlobStatus_FAILED:
154+
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing, requestID=%s: %w", base64RequestID, err)
155+
return
156+
case grpcdisperser.BlobStatus_INSUFFICIENT_SIGNATURES:
157+
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with insufficient signatures, requestID=%s: %w", base64RequestID, err)
158+
return
159+
case grpcdisperser.BlobStatus_CONFIRMED:
160+
fmt.Println("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID)
161+
resultChan <- ID{BlobInfo: statusRes.Info, RequestID: string(requestID)}
162+
case grpcdisperser.BlobStatus_FINALIZED:
163+
batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash))
164+
fmt.Println("Successfully dispersed blob to EigenDA", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex)
165+
return
166+
default:
167+
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with reply status %d", statusRes.Status)
168+
return
169+
}
170+
}
171+
}
163172
}

0 commit comments

Comments
 (0)