Skip to content

Commit

Permalink
Ensure order of delivery for event subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
infrmtcs committed Feb 26, 2025
1 parent 804c548 commit 5892b03
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 59 deletions.
86 changes: 30 additions & 56 deletions rpc/v8/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
}
h.subscriptions.Store(id, sub)

newHeadsSub := h.newHeads.Subscribe()
reorgSub := h.reorgs.Subscribe() // as per the spec, reorgs are also sent in the events subscription
pendingSub := h.pendingBlock.Subscribe()
newHeadsSub := h.newHeads.SubscribeKeepLast()
reorgSub := h.reorgs.SubscribeKeepLast() // as per the spec, reorgs are also sent in the events subscription
pendingSub := h.pendingBlock.SubscribeKeepLast()
sub.wg.Go(func() {
defer func() {
h.unsubscribe(sub, id)
Expand All @@ -141,62 +141,29 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
pendingSub.Unsubscribe()
}()

// The specification doesn't enforce ordering of events, therefore, events from new blocks can be sent before
// old blocks.
var wg conc.WaitGroup
wg.Go(func() {
// Stores the transaction hash -> number of events
eventsPreviouslySent := make(map[SentEvent]struct{})

for {
select {
case <-subscriptionCtx.Done():
return
case head := <-newHeadsSub.Recv():
// During syncing the events from the new head still need to be sent as there is no pending block.
// However, it is not easy to tell when the node is syncing.
// To solve this issue, we can send the events regardless, and if the node is done syncing, then the
// latest header events would have been sent when the pending block was updated. Hence,
// trying to resend the event should be of no consequences and the map can be safely emptied.
h.processEvents(subscriptionCtx, w, id, head.Number, head.Number, fromAddr, keys, eventsPreviouslySent)

block, err := h.bcReader.BlockByNumber(head.Number)
if err != nil {
h.log.Warnw("Error retrieving block", "block number", head.Number, "err", err)
return
}
// We still need to run this separately outside of the loop to capture the latest block before subscription.
h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys, nil)

for _, r := range block.Receipts {
for i := range r.Events {
sentEvent := SentEvent{
TransactionHash: *r.TransactionHash,
EventIndex: i,
}
delete(eventsPreviouslySent, sentEvent)
}
}
case <-pendingSub.Recv():
height, err := h.bcReader.Height()
if err != nil {
h.log.Warnw("Error retrieving block height", "err", err)
continue
}
nextBlock := headHeader.Number + 1
eventsPreviouslySent := make(map[SentEvent]struct{})

// TODO: This is a hack to only query pending block events
h.processEvents(subscriptionCtx, w, id, height+1, height+1, fromAddr, keys, eventsPreviouslySent)
for {
select {
case <-subscriptionCtx.Done():
return
case reorg := <-reorgSub.Recv():
if err := h.sendReorg(w, reorg, id); err != nil {
h.log.Warnw("Error sending reorg", "err", err)
return

Check warning on line 157 in rpc/v8/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/v8/subscriptions.go#L156-L157

Added lines #L156 - L157 were not covered by tests
}
nextBlock = reorg.StartBlockNum
case head := <-newHeadsSub.Recv():
h.processEvents(subscriptionCtx, w, id, nextBlock, head.Number, fromAddr, keys, eventsPreviouslySent)
nextBlock = head.Number + 1
case <-pendingSub.Recv():
h.processEvents(subscriptionCtx, w, id, nextBlock, nextBlock, fromAddr, keys, eventsPreviouslySent)
}
})

wg.Go(func() {
h.processReorgs(subscriptionCtx, reorgSub, w, id)
})

wg.Go(func() {
h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys, nil)
})

wg.Wait()
}
})

return SubscriptionID(id), nil
Expand Down Expand Up @@ -408,7 +375,14 @@ func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.Filter
if _, ok := eventsPreviouslySent[sentEvent]; ok {
continue
}
eventsPreviouslySent[sentEvent] = struct{}{}
// This describe the lifecycle of SentEvent.
// It's added when the event is received from a pending block.
// It's deleted when the event is received from a head block.
if isPending := event.BlockHash == nil; isPending {
eventsPreviouslySent[sentEvent] = struct{}{}
} else {
delete(eventsPreviouslySent, sentEvent)
}

Check warning on line 385 in rpc/v8/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/v8/subscriptions.go#L384-L385

Added lines #L384 - L385 were not covered by tests
}

emittedEvent := &EmittedEvent{
Expand Down
3 changes: 0 additions & 3 deletions rpc/v8/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,17 +232,14 @@ func TestSubscribeEvents(t *testing.T) {

assertNextMessages(t, clientConn, id, b1Emitted)

mockChain.EXPECT().Height().Return(b1.Number, nil)
mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return(pending1Filtered, nil, nil)
handler.pendingBlock.Send(pending1)
assertNextMessages(t, clientConn, id, pending1Emitted)

mockChain.EXPECT().Height().Return(b1.Number, nil)
mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return(pending2Filtered, nil, nil)
handler.pendingBlock.Send(pending2)
assertNextMessages(t, clientConn, id, pending2Emitted[len(pending1Emitted):])

mockChain.EXPECT().BlockByNumber(b2.Number).Return(b2, nil)
mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return(b2Filtered, nil, nil)
handler.newHeads.Send(b2)
assertNextMessages(t, clientConn, id, b2Emitted[len(pending2Emitted):])
Expand Down

0 comments on commit 5892b03

Please sign in to comment.