Skip to content

Commit

Permalink
refactor: Separate matcher logic in EventFilter (#2546)
Browse files Browse the repository at this point in the history
  • Loading branch information
infrmtcs authored Feb 25, 2025
1 parent f9b4aed commit 7239e37
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 137 deletions.
151 changes: 14 additions & 137 deletions blockchain/event_filter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blockchain

import (
"encoding/binary"
"errors"
"fmt"
"io"
Expand All @@ -10,7 +9,6 @@ import (
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
"github.com/bits-and-blooms/bloom/v3"
)

var errChunkSizeReached = errors.New("chunk size reached")
Expand All @@ -26,13 +24,12 @@ type EventFilterer interface {
}

type EventFilter struct {
txn db.Transaction
fromBlock uint64
toBlock uint64
contractAddress *felt.Felt
keys [][]felt.Felt
maxScanned uint // maximum number of scanned blocks in single call.
pendingBlockFn func() *core.Block
txn db.Transaction
fromBlock uint64
toBlock uint64
matcher EventMatcher
maxScanned uint // maximum number of scanned blocks in single call.
pendingBlockFn func() *core.Block
}

type EventFilterRange uint
Expand All @@ -46,13 +43,12 @@ func newEventFilter(txn db.Transaction, contractAddress *felt.Felt, keys [][]fel
pendingBlockFn func() *core.Block,
) *EventFilter {
return &EventFilter{
txn: txn,
contractAddress: contractAddress,
keys: keys,
fromBlock: fromBlock,
toBlock: toBlock,
maxScanned: math.MaxUint,
pendingBlockFn: pendingBlockFn,
txn: txn,
matcher: NewEventMatcher(contractAddress, keys),
fromBlock: fromBlock,
toBlock: toBlock,
maxScanned: math.MaxUint,
pendingBlockFn: pendingBlockFn,
}
}

Expand Down Expand Up @@ -128,8 +124,6 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
}
}

filterKeysMaps := makeKeysMaps(e.keys)

curBlock := e.fromBlock
// skip the blocks that we previously processed for this request
if cToken != nil {
Expand All @@ -151,7 +145,7 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
header = pending.Header
}

if possibleMatches := e.testBloom(header.EventsBloom, filterKeysMaps); !possibleMatches {
if possibleMatches := e.matcher.TestBloom(header.EventsBloom); !possibleMatches {
// bloom filter says no events match the filter, skip this block entirely if from is not nil
continue
}
Expand All @@ -167,7 +161,7 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
}

var processedEvents uint64
matchedEvents, processedEvents, err = e.appendBlockEvents(matchedEvents, header, receipts, filterKeysMaps, cToken, chunkSize)
matchedEvents, processedEvents, err = e.matcher.AppendBlockEvents(matchedEvents, header, receipts, cToken, chunkSize)
if err != nil {
if errors.Is(err, errChunkSizeReached) {
rToken = &ContinuationToken{fromBlock: curBlock, processedEvents: processedEvents}
Expand All @@ -182,120 +176,3 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
}
return matchedEvents, rToken, nil
}

func (e *EventFilter) testBloom(bloomFilter *bloom.BloomFilter, keysMap []map[felt.Felt]struct{}) bool {
possibleMatches := true
if e.contractAddress != nil {
addrBytes := e.contractAddress.Bytes()
possibleMatches = bloomFilter.Test(addrBytes[:])
// bloom filter says no events from this contract
if !possibleMatches {
return possibleMatches
}
}

for index, kMap := range keysMap {
for key := range kMap {
keyBytes := key.Bytes()
keyAndIndexBytes := binary.AppendVarint(keyBytes[:], int64(index))

// check if block possibly contains the event we are looking for
possibleMatches = bloomFilter.Test(keyAndIndexBytes)
// possible match for this index, no need to continue checking the rest of the keys
if possibleMatches {
break
}
}

// no key on this index matches the filter
if !possibleMatches {
break
}
}

return possibleMatches
}

func (e *EventFilter) appendBlockEvents(matchedEventsSofar []*FilteredEvent, header *core.Header,
receipts []*core.TransactionReceipt, keysMap []map[felt.Felt]struct{}, cToken *ContinuationToken, chunkSize uint64,
) ([]*FilteredEvent, uint64, error) {
processedEvents := uint64(0)
for _, receipt := range receipts {
for i, event := range receipt.Events {
var blockNumber *uint64
// if header.Hash == nil it's a pending block
if header.Hash != nil {
blockNumber = &header.Number
}

// if last request was interrupted mid-block, and we are still processing that block, skip events
// that were already processed
if cToken != nil && header.Number == cToken.fromBlock && processedEvents < cToken.processedEvents {
processedEvents++
continue
}

if e.contractAddress != nil && !event.From.Equal(e.contractAddress) {
processedEvents++
continue
}

if e.matchesEventKeys(event.Keys, keysMap) {
if uint64(len(matchedEventsSofar)) < chunkSize {
matchedEventsSofar = append(matchedEventsSofar, &FilteredEvent{
BlockNumber: blockNumber,
BlockHash: header.Hash,
TransactionHash: receipt.TransactionHash,
EventIndex: i,
Event: event,
})
} else {
// we are at the capacity, return what we have accumulated so far and a continuation token
return matchedEventsSofar, processedEvents, errChunkSizeReached
}
}
// count the events we processed for this block to include in the continuation token
processedEvents++
}
}
return matchedEventsSofar, processedEvents, nil
}

func (e *EventFilter) matchesEventKeys(eventKeys []*felt.Felt, keysMap []map[felt.Felt]struct{}) bool {
// short circuit if event doest have enough keys
for i := len(eventKeys); i < len(keysMap); i++ {
if len(keysMap[i]) > 0 {
return false
}
}

/// e.keys = [["V1", "V2"], [], ["V3"]] means:
/// ((event.Keys[0] == "V1" OR event.Keys[0] == "V2") AND (event.Keys[2] == "V3")).
//
// Essentially
// for each event.Keys[i], (len(e.keys[i]) == 0 OR event.Keys[i] is in e.keys[i]) should hold
for index, eventKey := range eventKeys {
// empty filter keys means match all
if index >= len(keysMap) || len(keysMap[index]) == 0 {
break
}
if _, found := keysMap[index][*eventKey]; !found {
return false
}
}

return true
}

func makeKeysMaps(filterKeys [][]felt.Felt) []map[felt.Felt]struct{} {
filterKeysMaps := make([]map[felt.Felt]struct{}, len(filterKeys))
for index, keys := range filterKeys {
kMap := make(map[felt.Felt]struct{}, len(keys))
for _, key := range keys {
kMap[key] = struct{}{}
}
filterKeysMaps[index] = kMap
}

return filterKeysMaps
}
141 changes: 141 additions & 0 deletions blockchain/event_matcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package blockchain

import (
"encoding/binary"

"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/bits-and-blooms/bloom/v3"
)

type EventMatcher struct {
contractAddress *felt.Felt
keysMap []map[felt.Felt]struct{}
}

func NewEventMatcher(contractAddress *felt.Felt, keys [][]felt.Felt) EventMatcher {
return EventMatcher{
contractAddress: contractAddress,
keysMap: makeKeysMaps(keys),
}
}

func makeKeysMaps(filterKeys [][]felt.Felt) []map[felt.Felt]struct{} {
filterKeysMaps := make([]map[felt.Felt]struct{}, len(filterKeys))
for index, keys := range filterKeys {
kMap := make(map[felt.Felt]struct{}, len(keys))
for _, key := range keys {
kMap[key] = struct{}{}
}
filterKeysMaps[index] = kMap
}

return filterKeysMaps
}

func (e *EventMatcher) matchesEventKeys(eventKeys []*felt.Felt) bool {
// short circuit if event doest have enough keys
for i := len(eventKeys); i < len(e.keysMap); i++ {
if len(e.keysMap[i]) > 0 {
return false
}
}

/// e.keys = [["V1", "V2"], [], ["V3"]] means:
/// ((event.Keys[0] == "V1" OR event.Keys[0] == "V2") AND (event.Keys[2] == "V3")).
//
// Essentially
// for each event.Keys[i], (len(e.keys[i]) == 0 OR event.Keys[i] is in e.keys[i]) should hold
for index, eventKey := range eventKeys {
// empty filter keys means match all
if index >= len(e.keysMap) || len(e.keysMap[index]) == 0 {
break
}
if _, found := e.keysMap[index][*eventKey]; !found {
return false
}
}

return true
}

func (e *EventMatcher) TestBloom(bloomFilter *bloom.BloomFilter) bool {
possibleMatches := true
if e.contractAddress != nil {
addrBytes := e.contractAddress.Bytes()
possibleMatches = bloomFilter.Test(addrBytes[:])
// bloom filter says no events from this contract
if !possibleMatches {
return possibleMatches
}
}

for index, kMap := range e.keysMap {
for key := range kMap {
keyBytes := key.Bytes()
keyAndIndexBytes := binary.AppendVarint(keyBytes[:], int64(index))

// check if block possibly contains the event we are looking for
possibleMatches = bloomFilter.Test(keyAndIndexBytes)
// possible match for this index, no need to continue checking the rest of the keys
if possibleMatches {
break
}
}

// no key on this index matches the filter
if !possibleMatches {
break
}
}

return possibleMatches
}

func (e *EventMatcher) AppendBlockEvents(matchedEventsSofar []*FilteredEvent, header *core.Header, receipts []*core.TransactionReceipt,
cToken *ContinuationToken, chunkSize uint64,
) ([]*FilteredEvent, uint64, error) {
processedEvents := uint64(0)
for _, receipt := range receipts {
for i, event := range receipt.Events {
var blockNumber *uint64
// if header.Hash == nil it's a pending block
if header.Hash != nil {
blockNumber = &header.Number
}

// if last request was interrupted mid-block, and we are still processing that block, skip events
// that were already processed
if cToken != nil && header.Number == cToken.fromBlock && processedEvents < cToken.processedEvents {
processedEvents++
continue
}

if e.contractAddress != nil && !event.From.Equal(e.contractAddress) {
processedEvents++
continue
}

if !e.matchesEventKeys(event.Keys) {
processedEvents++
continue
}

if uint64(len(matchedEventsSofar)) < chunkSize {
matchedEventsSofar = append(matchedEventsSofar, &FilteredEvent{
BlockNumber: blockNumber,
BlockHash: header.Hash,
TransactionHash: receipt.TransactionHash,
EventIndex: i,
Event: event,
})
} else {
// we are at the capacity, return what we have accumulated so far and a continuation token
return matchedEventsSofar, processedEvents, errChunkSizeReached
}
// count the events we processed for this block to include in the continuation token
processedEvents++
}
}
return matchedEventsSofar, processedEvents, nil
}

0 comments on commit 7239e37

Please sign in to comment.