Skip to content

Commit

Permalink
Use flattened format for subscribe responses (#2443)
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann authored Feb 10, 2025
1 parent 959166e commit 69786c2
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 36 deletions.
4 changes: 1 addition & 3 deletions rpc/v8/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ type EventsArg struct {
ResultPageRequest
}

type SubscriptionID struct {
ID uint64 `json:"subscription_id"`
}
type SubscriptionID uint64

type EventFilter struct {
FromBlock *BlockID `json:"from_block"`
Expand Down
38 changes: 19 additions & 19 deletions rpc/v8/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,23 @@ func (b *SubscriptionBlockID) UnmarshalJSON(data []byte) error {
// SubscribeEvents creates a WebSocket stream which will fire events for new Starknet events with applied filters
func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys [][]felt.Felt,
blockID *SubscriptionBlockID,
) (*SubscriptionID, *jsonrpc.Error) {
) (SubscriptionID, *jsonrpc.Error) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
return 0, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

lenKeys := len(keys)
for _, k := range keys {
lenKeys += len(k)
}
if lenKeys > rpccore.MaxEventFilterKeys {
return nil, rpccore.ErrTooManyKeysInFilter
return 0, rpccore.ErrTooManyKeysInFilter
}

requestedHeader, headHeader, rpcErr := h.resolveBlockRange(blockID)
if rpcErr != nil {
return nil, rpcErr
return 0, rpcErr
}

id := h.idgen()
Expand Down Expand Up @@ -188,20 +188,20 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
wg.Wait()
})

return &SubscriptionID{ID: id}, nil
return SubscriptionID(id), nil
}

// SubscribeTransactionStatus subscribes to status changes of a transaction. It checks for updates each time a new block is added.
// Later updates are sent only when the transaction status changes.
// The optional block_id parameter is ignored, as status changes are not stored and historical data cannot be sent.
//
//nolint:gocyclo,funlen
func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Felt) (*SubscriptionID,
func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Felt) (SubscriptionID,
*jsonrpc.Error,
) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
return 0, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

// If the error is transaction not found that means the transaction has not been submitted to the feeder gateway,
Expand All @@ -211,7 +211,7 @@ func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Fe
curStatus, rpcErr := h.TransactionStatus(ctx, txHash)
if rpcErr != nil {
if rpcErr != rpccore.ErrTxnHashNotFound {
return nil, rpcErr
return 0, rpcErr
}

timeout := time.NewTimer(subscribeTxStatusTimeout)
Expand All @@ -222,12 +222,12 @@ func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Fe
select {
case <-timeout.C:
ticker.Stop()
return nil, rpcErr
return 0, rpcErr
case <-ticker.C:
curStatus, rpcErr = h.TransactionStatus(ctx, txHash)
if rpcErr != nil {
if rpcErr != rpccore.ErrTxnHashNotFound {
return nil, rpcErr
return 0, rpcErr
}
continue
}
Expand Down Expand Up @@ -335,7 +335,7 @@ func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Fe
wg.Wait()
})

return &SubscriptionID{ID: id}, nil
return SubscriptionID(id), nil
}

func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt,
Expand Down Expand Up @@ -429,15 +429,15 @@ func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.Filter
}

// SubscribeNewHeads creates a WebSocket stream which will fire events when a new block header is added.
func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *SubscriptionBlockID) (*SubscriptionID, *jsonrpc.Error) {
func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *SubscriptionBlockID) (SubscriptionID, *jsonrpc.Error) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
return 0, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

startHeader, latestHeader, rpcErr := h.resolveBlockRange(blockID)
if rpcErr != nil {
return nil, rpcErr
return 0, rpcErr
}

id := h.idgen()
Expand Down Expand Up @@ -477,20 +477,20 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *SubscriptionBl
wg.Wait()
})

return &SubscriptionID{ID: id}, nil
return SubscriptionID(id), nil
}

// SubscribePendingTxs creates a WebSocket stream which will fire events when a new pending transaction is added.
// The getDetails flag controls if the response will contain the transaction details or just the transaction hashes.
// The senderAddr flag is used to filter the transactions by sender address.
func (h *Handler) SubscribePendingTxs(ctx context.Context, getDetails *bool, senderAddr []felt.Felt) (*SubscriptionID, *jsonrpc.Error) {
func (h *Handler) SubscribePendingTxs(ctx context.Context, getDetails *bool, senderAddr []felt.Felt) (SubscriptionID, *jsonrpc.Error) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
return 0, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

if len(senderAddr) > rpccore.MaxEventFilterKeys {
return nil, rpccore.ErrTooManyAddressesInFilter
return 0, rpccore.ErrTooManyAddressesInFilter
}

id := h.idgen()
Expand All @@ -511,7 +511,7 @@ func (h *Handler) SubscribePendingTxs(ctx context.Context, getDetails *bool, sen
h.processPendingTxs(subscriptionCtx, getDetails != nil && *getDetails, senderAddr, pendingSub, w, id)
})

return &SubscriptionID{ID: id}, nil
return SubscriptionID(id), nil
}

func (h *Handler) processPendingTxs(ctx context.Context, getDetails bool, senderAddr []felt.Felt,
Expand Down
28 changes: 14 additions & 14 deletions rpc/v8/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestSubscribeEvents(t *testing.T) {

newHeadFeed.Send(&core.Header{Number: b1.Number})

resp, err := marshalSubEventsResp(emittedEvents[0], id.ID)
resp, err := marshalSubEventsResp(emittedEvents[0], id)
require.NoError(t, err)

got := make([]byte, len(resp))
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestSubscribeEvents(t *testing.T) {

var marshalledResponses [][]byte
for _, e := range emittedEvents {
resp, err := marshalSubEventsResp(e, id.ID)
resp, err := marshalSubEventsResp(e, id)
require.NoError(t, err)
marshalledResponses = append(marshalledResponses, resp)
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestSubscribeEvents(t *testing.T) {

var marshalledResponses [][]byte
for _, e := range emittedEvents {
resp, err := marshalSubEventsResp(e, id.ID)
resp, err := marshalSubEventsResp(e, id)
require.NoError(t, err)
marshalledResponses = append(marshalledResponses, resp)
}
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestSubscribeEvents(t *testing.T) {
id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, nil)
require.Nil(t, rpcErr)

resp, err := marshalSubEventsResp(emittedEvents[0], id.ID)
resp, err := marshalSubEventsResp(emittedEvents[0], id)
require.NoError(t, err)

got := make([]byte, len(resp))
Expand All @@ -335,7 +335,7 @@ func TestSubscribeEvents(t *testing.T) {

pendingFeed.Send(&core.Block{Header: &core.Header{Number: b1.Number + 1}})

resp, err = marshalSubEventsResp(emittedEvents[1], id.ID)
resp, err = marshalSubEventsResp(emittedEvents[1], id)
require.NoError(t, err)

got = make([]byte, len(resp))
Expand All @@ -351,7 +351,7 @@ func TestSubscribeEvents(t *testing.T) {

pendingFeed.Send(&core.Block{Header: &core.Header{Number: b1.Number + 1}})

resp, err = marshalSubEventsResp(emittedEvents[0], id.ID)
resp, err = marshalSubEventsResp(emittedEvents[0], id)
require.NoError(t, err)

got = make([]byte, len(resp))
Expand Down Expand Up @@ -390,7 +390,7 @@ func TestSubscribeTxnStatus(t *testing.T) {
subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn})

id, rpcErr := handler.SubscribeTransactionStatus(subCtx, *txHash)
assert.Nil(t, id)
assert.Equal(t, SubscriptionID(0), id)
assert.Equal(t, rpccore.ErrTxnHashNotFound, rpcErr)
})

Expand Down Expand Up @@ -425,7 +425,7 @@ func TestSubscribeTxnStatus(t *testing.T) {
b, err := TxnStatusRejected.MarshalText()
require.NoError(t, err)

resp := fmt.Sprintf(respStr, txHash, b, id.ID)
resp := fmt.Sprintf(respStr, txHash, b, id)
got := make([]byte, len(resp))
_, err = clientConn.Read(got)
require.NoError(t, err)
Expand Down Expand Up @@ -455,7 +455,7 @@ func TestSubscribeTxnStatus(t *testing.T) {
b, err := TxnStatusAcceptedOnL1.MarshalText()
require.NoError(t, err)

resp := fmt.Sprintf(respStr, txHash, b, id.ID)
resp := fmt.Sprintf(respStr, txHash, b, id)
got := make([]byte, len(resp))
_, err = clientConn.Read(got)
require.NoError(t, err)
Expand Down Expand Up @@ -503,7 +503,7 @@ func TestSubscribeTxnStatus(t *testing.T) {
b, err := TxnStatusReceived.MarshalText()
require.NoError(t, err)

resp := fmt.Sprintf(receivedRespStr, txHash, b, id.ID)
resp := fmt.Sprintf(receivedRespStr, txHash, b, id)
got := make([]byte, len(resp))
_, err = clientConn.Read(got)
require.NoError(t, err)
Expand All @@ -519,7 +519,7 @@ func TestSubscribeTxnStatus(t *testing.T) {
require.NoError(t, err)

l1AndL2RespStr := `{"jsonrpc":"2.0","method":"starknet_subscriptionTransactionsStatus","params":{"result":{"transaction_hash":"%v","status":{"finality_status":"%s","execution_status":"SUCCEEDED"}},"subscription_id":%v}}`
resp = fmt.Sprintf(l1AndL2RespStr, txHash, b, id.ID)
resp = fmt.Sprintf(l1AndL2RespStr, txHash, b, id)
got = make([]byte, len(resp))
_, err = clientConn.Read(got)
require.NoError(t, err)
Expand All @@ -535,7 +535,7 @@ func TestSubscribeTxnStatus(t *testing.T) {
b, err = TxnStatusAcceptedOnL1.MarshalText()
require.NoError(t, err)

resp = fmt.Sprintf(l1AndL2RespStr, txHash, b, id.ID)
resp = fmt.Sprintf(l1AndL2RespStr, txHash, b, id)
got = make([]byte, len(resp))
_, err = clientConn.Read(got)
require.NoError(t, err)
Expand Down Expand Up @@ -1103,7 +1103,7 @@ func createWsConn(t *testing.T, ctx context.Context, server *jsonrpc.Server) *we
}

func subResp(id uint64) string {
return fmt.Sprintf(`{"jsonrpc":"2.0","result":{"subscription_id":%d},"id":1}`, id)
return fmt.Sprintf(`{"jsonrpc":"2.0","result":%d,"id":1}`, id)
}

func subMsg(method string) string {
Expand Down Expand Up @@ -1167,7 +1167,7 @@ func sendWsMessage(t *testing.T, ctx context.Context, conn *websocket.Conn, mess
return string(response)
}

func marshalSubEventsResp(e *EmittedEvent, id uint64) ([]byte, error) {
func marshalSubEventsResp(e *EmittedEvent, id SubscriptionID) ([]byte, error) {
return json.Marshal(SubscriptionResponse{
Version: "2.0",
Method: "starknet_subscriptionEvents",
Expand Down

0 comments on commit 69786c2

Please sign in to comment.