diff --git a/blockchain/event_filter.go b/blockchain/event_filter.go index 0d3c8a8e86..4e10109c2f 100644 --- a/blockchain/event_filter.go +++ b/blockchain/event_filter.go @@ -1,7 +1,6 @@ package blockchain import ( - "encoding/binary" "errors" "fmt" "io" @@ -10,7 +9,6 @@ import ( "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db" - "github.com/bits-and-blooms/bloom/v3" ) var errChunkSizeReached = errors.New("chunk size reached") @@ -26,13 +24,12 @@ type EventFilterer interface { } type EventFilter struct { - txn db.Transaction - fromBlock uint64 - toBlock uint64 - contractAddress *felt.Felt - keys [][]felt.Felt - maxScanned uint // maximum number of scanned blocks in single call. - pendingBlockFn func() *core.Block + txn db.Transaction + fromBlock uint64 + toBlock uint64 + matcher EventMatcher + maxScanned uint // maximum number of scanned blocks in single call. + pendingBlockFn func() *core.Block } type EventFilterRange uint @@ -46,13 +43,12 @@ func newEventFilter(txn db.Transaction, contractAddress *felt.Felt, keys [][]fel pendingBlockFn func() *core.Block, ) *EventFilter { return &EventFilter{ - txn: txn, - contractAddress: contractAddress, - keys: keys, - fromBlock: fromBlock, - toBlock: toBlock, - maxScanned: math.MaxUint, - pendingBlockFn: pendingBlockFn, + txn: txn, + matcher: NewEventMatcher(contractAddress, keys), + fromBlock: fromBlock, + toBlock: toBlock, + maxScanned: math.MaxUint, + pendingBlockFn: pendingBlockFn, } } @@ -128,8 +124,6 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi } } - filterKeysMaps := makeKeysMaps(e.keys) - curBlock := e.fromBlock // skip the blocks that we previously processed for this request if cToken != nil { @@ -151,7 +145,7 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi header = pending.Header } - if possibleMatches := e.testBloom(header.EventsBloom, filterKeysMaps); !possibleMatches { + if possibleMatches := e.matcher.TestBloom(header.EventsBloom); !possibleMatches { // bloom filter says no events match the filter, skip this block entirely if from is not nil continue } @@ -167,7 +161,7 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi } var processedEvents uint64 - matchedEvents, processedEvents, err = e.appendBlockEvents(matchedEvents, header, receipts, filterKeysMaps, cToken, chunkSize) + matchedEvents, processedEvents, err = e.matcher.AppendBlockEvents(matchedEvents, header, receipts, cToken, chunkSize) if err != nil { if errors.Is(err, errChunkSizeReached) { rToken = &ContinuationToken{fromBlock: curBlock, processedEvents: processedEvents} @@ -182,120 +176,3 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi } return matchedEvents, rToken, nil } - -func (e *EventFilter) testBloom(bloomFilter *bloom.BloomFilter, keysMap []map[felt.Felt]struct{}) bool { - possibleMatches := true - if e.contractAddress != nil { - addrBytes := e.contractAddress.Bytes() - possibleMatches = bloomFilter.Test(addrBytes[:]) - // bloom filter says no events from this contract - if !possibleMatches { - return possibleMatches - } - } - - for index, kMap := range keysMap { - for key := range kMap { - keyBytes := key.Bytes() - keyAndIndexBytes := binary.AppendVarint(keyBytes[:], int64(index)) - - // check if block possibly contains the event we are looking for - possibleMatches = bloomFilter.Test(keyAndIndexBytes) - // possible match for this index, no need to continue checking the rest of the keys - if possibleMatches { - break - } - } - - // no key on this index matches the filter - if !possibleMatches { - break - } - } - - return possibleMatches -} - -func (e *EventFilter) appendBlockEvents(matchedEventsSofar []*FilteredEvent, header *core.Header, - receipts []*core.TransactionReceipt, keysMap []map[felt.Felt]struct{}, cToken *ContinuationToken, chunkSize uint64, -) ([]*FilteredEvent, uint64, error) { - processedEvents := uint64(0) - for _, receipt := range receipts { - for i, event := range receipt.Events { - var blockNumber *uint64 - // if header.Hash == nil it's a pending block - if header.Hash != nil { - blockNumber = &header.Number - } - - // if last request was interrupted mid-block, and we are still processing that block, skip events - // that were already processed - if cToken != nil && header.Number == cToken.fromBlock && processedEvents < cToken.processedEvents { - processedEvents++ - continue - } - - if e.contractAddress != nil && !event.From.Equal(e.contractAddress) { - processedEvents++ - continue - } - - if e.matchesEventKeys(event.Keys, keysMap) { - if uint64(len(matchedEventsSofar)) < chunkSize { - matchedEventsSofar = append(matchedEventsSofar, &FilteredEvent{ - BlockNumber: blockNumber, - BlockHash: header.Hash, - TransactionHash: receipt.TransactionHash, - EventIndex: i, - Event: event, - }) - } else { - // we are at the capacity, return what we have accumulated so far and a continuation token - return matchedEventsSofar, processedEvents, errChunkSizeReached - } - } - // count the events we processed for this block to include in the continuation token - processedEvents++ - } - } - return matchedEventsSofar, processedEvents, nil -} - -func (e *EventFilter) matchesEventKeys(eventKeys []*felt.Felt, keysMap []map[felt.Felt]struct{}) bool { - // short circuit if event doest have enough keys - for i := len(eventKeys); i < len(keysMap); i++ { - if len(keysMap[i]) > 0 { - return false - } - } - - /// e.keys = [["V1", "V2"], [], ["V3"]] means: - /// ((event.Keys[0] == "V1" OR event.Keys[0] == "V2") AND (event.Keys[2] == "V3")). - // - // Essentially - // for each event.Keys[i], (len(e.keys[i]) == 0 OR event.Keys[i] is in e.keys[i]) should hold - for index, eventKey := range eventKeys { - // empty filter keys means match all - if index >= len(keysMap) || len(keysMap[index]) == 0 { - break - } - if _, found := keysMap[index][*eventKey]; !found { - return false - } - } - - return true -} - -func makeKeysMaps(filterKeys [][]felt.Felt) []map[felt.Felt]struct{} { - filterKeysMaps := make([]map[felt.Felt]struct{}, len(filterKeys)) - for index, keys := range filterKeys { - kMap := make(map[felt.Felt]struct{}, len(keys)) - for _, key := range keys { - kMap[key] = struct{}{} - } - filterKeysMaps[index] = kMap - } - - return filterKeysMaps -} diff --git a/blockchain/event_matcher.go b/blockchain/event_matcher.go new file mode 100644 index 0000000000..969c7e4862 --- /dev/null +++ b/blockchain/event_matcher.go @@ -0,0 +1,141 @@ +package blockchain + +import ( + "encoding/binary" + + "github.com/NethermindEth/juno/core" + "github.com/NethermindEth/juno/core/felt" + "github.com/bits-and-blooms/bloom/v3" +) + +type EventMatcher struct { + contractAddress *felt.Felt + keysMap []map[felt.Felt]struct{} +} + +func NewEventMatcher(contractAddress *felt.Felt, keys [][]felt.Felt) EventMatcher { + return EventMatcher{ + contractAddress: contractAddress, + keysMap: makeKeysMaps(keys), + } +} + +func makeKeysMaps(filterKeys [][]felt.Felt) []map[felt.Felt]struct{} { + filterKeysMaps := make([]map[felt.Felt]struct{}, len(filterKeys)) + for index, keys := range filterKeys { + kMap := make(map[felt.Felt]struct{}, len(keys)) + for _, key := range keys { + kMap[key] = struct{}{} + } + filterKeysMaps[index] = kMap + } + + return filterKeysMaps +} + +func (e *EventMatcher) matchesEventKeys(eventKeys []*felt.Felt) bool { + // short circuit if event doest have enough keys + for i := len(eventKeys); i < len(e.keysMap); i++ { + if len(e.keysMap[i]) > 0 { + return false + } + } + + /// e.keys = [["V1", "V2"], [], ["V3"]] means: + /// ((event.Keys[0] == "V1" OR event.Keys[0] == "V2") AND (event.Keys[2] == "V3")). + // + // Essentially + // for each event.Keys[i], (len(e.keys[i]) == 0 OR event.Keys[i] is in e.keys[i]) should hold + for index, eventKey := range eventKeys { + // empty filter keys means match all + if index >= len(e.keysMap) || len(e.keysMap[index]) == 0 { + break + } + if _, found := e.keysMap[index][*eventKey]; !found { + return false + } + } + + return true +} + +func (e *EventMatcher) TestBloom(bloomFilter *bloom.BloomFilter) bool { + possibleMatches := true + if e.contractAddress != nil { + addrBytes := e.contractAddress.Bytes() + possibleMatches = bloomFilter.Test(addrBytes[:]) + // bloom filter says no events from this contract + if !possibleMatches { + return possibleMatches + } + } + + for index, kMap := range e.keysMap { + for key := range kMap { + keyBytes := key.Bytes() + keyAndIndexBytes := binary.AppendVarint(keyBytes[:], int64(index)) + + // check if block possibly contains the event we are looking for + possibleMatches = bloomFilter.Test(keyAndIndexBytes) + // possible match for this index, no need to continue checking the rest of the keys + if possibleMatches { + break + } + } + + // no key on this index matches the filter + if !possibleMatches { + break + } + } + + return possibleMatches +} + +func (e *EventMatcher) AppendBlockEvents(matchedEventsSofar []*FilteredEvent, header *core.Header, receipts []*core.TransactionReceipt, + cToken *ContinuationToken, chunkSize uint64, +) ([]*FilteredEvent, uint64, error) { + processedEvents := uint64(0) + for _, receipt := range receipts { + for i, event := range receipt.Events { + var blockNumber *uint64 + // if header.Hash == nil it's a pending block + if header.Hash != nil { + blockNumber = &header.Number + } + + // if last request was interrupted mid-block, and we are still processing that block, skip events + // that were already processed + if cToken != nil && header.Number == cToken.fromBlock && processedEvents < cToken.processedEvents { + processedEvents++ + continue + } + + if e.contractAddress != nil && !event.From.Equal(e.contractAddress) { + processedEvents++ + continue + } + + if !e.matchesEventKeys(event.Keys) { + processedEvents++ + continue + } + + if uint64(len(matchedEventsSofar)) < chunkSize { + matchedEventsSofar = append(matchedEventsSofar, &FilteredEvent{ + BlockNumber: blockNumber, + BlockHash: header.Hash, + TransactionHash: receipt.TransactionHash, + EventIndex: i, + Event: event, + }) + } else { + // we are at the capacity, return what we have accumulated so far and a continuation token + return matchedEventsSofar, processedEvents, errChunkSizeReached + } + // count the events we processed for this block to include in the continuation token + processedEvents++ + } + } + return matchedEventsSofar, processedEvents, nil +}