Skip to content

Commit

Permalink
Merge branch 'main' into hudem1/rpc-cleanup-AddTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
hudem1 committed Feb 26, 2025
2 parents 1a7ac25 + 94071e4 commit fab8d2b
Show file tree
Hide file tree
Showing 45 changed files with 38,294 additions and 412 deletions.
11 changes: 5 additions & 6 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,12 @@ type Blockchain struct {
pendingBlockFn func() *core.Block
}

func New(database db.DB, network *utils.Network, pendingBlockFn func() *core.Block) *Blockchain {
func New(database db.DB, network *utils.Network) *Blockchain {
return &Blockchain{
database: database,
network: network,
listener: &SelectiveListener{},
l1HeadFeed: feed.New[*core.L1Head](),
pendingBlockFn: pendingBlockFn,
database: database,
network: network,
listener: &SelectiveListener{},
l1HeadFeed: feed.New[*core.L1Head](),
}
}

Expand Down
39 changes: 20 additions & 19 deletions blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestNew(t *testing.T) {
client := feeder.NewTestClient(t, &utils.Mainnet)
gw := adaptfeeder.New(client)
t.Run("empty blockchain's head is nil", func(t *testing.T) {
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
assert.Equal(t, &utils.Mainnet, chain.Network())
b, err := chain.Head()
assert.Nil(t, b)
Expand All @@ -38,10 +38,10 @@ func TestNew(t *testing.T) {
require.NoError(t, err)

testDB := pebble.NewMemTest(t)
chain := blockchain.New(testDB, &utils.Mainnet, nil)
chain := blockchain.New(testDB, &utils.Mainnet)
assert.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil))

chain = blockchain.New(testDB, &utils.Mainnet, nil)
chain = blockchain.New(testDB, &utils.Mainnet)
b, err := chain.Head()
require.NoError(t, err)
assert.Equal(t, block0, b)
Expand All @@ -52,7 +52,7 @@ func TestHeight(t *testing.T) {
client := feeder.NewTestClient(t, &utils.Mainnet)
gw := adaptfeeder.New(client)
t.Run("return nil if blockchain is empty", func(t *testing.T) {
chain := blockchain.New(pebble.NewMemTest(t), &utils.Sepolia, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Sepolia)
_, err := chain.Height()
assert.Error(t, err)
})
Expand All @@ -64,18 +64,18 @@ func TestHeight(t *testing.T) {
require.NoError(t, err)

testDB := pebble.NewMemTest(t)
chain := blockchain.New(testDB, &utils.Mainnet, nil)
chain := blockchain.New(testDB, &utils.Mainnet)
assert.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil))

chain = blockchain.New(testDB, &utils.Mainnet, nil)
chain = blockchain.New(testDB, &utils.Mainnet)
height, err := chain.Height()
require.NoError(t, err)
assert.Equal(t, block0.Number, height)
})
}

func TestBlockByNumberAndHash(t *testing.T) {
chain := blockchain.New(pebble.NewMemTest(t), &utils.Sepolia, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Sepolia)
t.Run("same block is returned for both GetBlockByNumber and GetBlockByHash", func(t *testing.T) {
client := feeder.NewTestClient(t, &utils.Mainnet)
gw := adaptfeeder.New(client)
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestVerifyBlock(t *testing.T) {
h1, err := new(felt.Felt).SetRandom()
require.NoError(t, err)

chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)

t.Run("error if chain is empty and incoming block number is not 0", func(t *testing.T) {
block := &core.Block{Header: &core.Header{Number: 10}}
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestSanityCheckNewHeight(t *testing.T) {
h1, err := new(felt.Felt).SetRandom()
require.NoError(t, err)

chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)

client := feeder.NewTestClient(t, &utils.Mainnet)

Expand Down Expand Up @@ -233,7 +233,7 @@ func TestStore(t *testing.T) {
require.NoError(t, err)

t.Run("add block to empty blockchain", func(t *testing.T) {
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
require.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil))

headBlock, err := chain.Head()
Expand All @@ -260,7 +260,7 @@ func TestStore(t *testing.T) {
stateUpdate1, err := gw.StateUpdate(context.Background(), 1)
require.NoError(t, err)

chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
require.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil))
require.NoError(t, chain.Store(block1, &emptyCommitments, stateUpdate1, nil))

Expand All @@ -285,7 +285,7 @@ func TestStore(t *testing.T) {
func TestStoreL1HandlerTxnHash(t *testing.T) {
client := feeder.NewTestClient(t, &utils.Sepolia)
gw := adaptfeeder.New(client)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Sepolia, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Sepolia)
var stateUpdate *core.StateUpdate
for i := range uint64(7) {
block, err := gw.BlockByNumber(context.Background(), i)
Expand All @@ -301,7 +301,7 @@ func TestStoreL1HandlerTxnHash(t *testing.T) {
}

func TestBlockCommitments(t *testing.T) {
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
client := feeder.NewTestClient(t, &utils.Mainnet)
gw := adaptfeeder.New(client)

Expand All @@ -326,7 +326,7 @@ func TestBlockCommitments(t *testing.T) {
}

func TestTransactionAndReceipt(t *testing.T) {
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)

client := feeder.NewTestClient(t, &utils.Mainnet)
gw := adaptfeeder.New(client)
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestTransactionAndReceipt(t *testing.T) {

func TestState(t *testing.T) {
testDB := pebble.NewMemTest(t)
chain := blockchain.New(testDB, &utils.Mainnet, nil)
chain := blockchain.New(testDB, &utils.Mainnet)

client := feeder.NewTestClient(t, &utils.Mainnet)
gw := adaptfeeder.New(client)
Expand Down Expand Up @@ -482,7 +482,8 @@ func TestEvents(t *testing.T) {
}

testDB := pebble.NewMemTest(t)
chain := blockchain.New(testDB, &utils.Goerli2, pendingBlockFn)
chain := blockchain.New(testDB, &utils.Goerli2)
chain = chain.WithPendingBlockFn(pendingBlockFn)

client := feeder.NewTestClient(t, &utils.Goerli2)
gw := adaptfeeder.New(client)
Expand Down Expand Up @@ -598,7 +599,7 @@ func TestEvents(t *testing.T) {

func TestRevert(t *testing.T) {
testdb := pebble.NewMemTest(t)
chain := blockchain.New(testdb, &utils.Mainnet, nil)
chain := blockchain.New(testdb, &utils.Mainnet)

client := feeder.NewTestClient(t, &utils.Mainnet)
gw := adaptfeeder.New(client)
Expand Down Expand Up @@ -682,7 +683,7 @@ func TestL1Update(t *testing.T) {

for _, head := range heads {
t.Run(fmt.Sprintf("update L1 head to block %d", head.BlockNumber), func(t *testing.T) {
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
require.NoError(t, chain.SetL1Head(head))
got, err := chain.L1Head()
require.NoError(t, err)
Expand All @@ -697,7 +698,7 @@ func TestSubscribeL1Head(t *testing.T) {
StateRoot: new(felt.Felt).SetUint64(2),
}

chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
sub := chain.SubscribeL1Head()
t.Cleanup(sub.Unsubscribe)

Expand Down
151 changes: 14 additions & 137 deletions blockchain/event_filter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blockchain

import (
"encoding/binary"
"errors"
"fmt"
"io"
Expand All @@ -10,7 +9,6 @@ import (
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
"github.com/bits-and-blooms/bloom/v3"
)

var errChunkSizeReached = errors.New("chunk size reached")
Expand All @@ -26,13 +24,12 @@ type EventFilterer interface {
}

type EventFilter struct {
txn db.Transaction
fromBlock uint64
toBlock uint64
contractAddress *felt.Felt
keys [][]felt.Felt
maxScanned uint // maximum number of scanned blocks in single call.
pendingBlockFn func() *core.Block
txn db.Transaction
fromBlock uint64
toBlock uint64
matcher EventMatcher
maxScanned uint // maximum number of scanned blocks in single call.
pendingBlockFn func() *core.Block
}

type EventFilterRange uint
Expand All @@ -46,13 +43,12 @@ func newEventFilter(txn db.Transaction, contractAddress *felt.Felt, keys [][]fel
pendingBlockFn func() *core.Block,
) *EventFilter {
return &EventFilter{
txn: txn,
contractAddress: contractAddress,
keys: keys,
fromBlock: fromBlock,
toBlock: toBlock,
maxScanned: math.MaxUint,
pendingBlockFn: pendingBlockFn,
txn: txn,
matcher: NewEventMatcher(contractAddress, keys),
fromBlock: fromBlock,
toBlock: toBlock,
maxScanned: math.MaxUint,
pendingBlockFn: pendingBlockFn,
}
}

Expand Down Expand Up @@ -128,8 +124,6 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
}
}

filterKeysMaps := makeKeysMaps(e.keys)

curBlock := e.fromBlock
// skip the blocks that we previously processed for this request
if cToken != nil {
Expand All @@ -151,7 +145,7 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
header = pending.Header
}

if possibleMatches := e.testBloom(header.EventsBloom, filterKeysMaps); !possibleMatches {
if possibleMatches := e.matcher.TestBloom(header.EventsBloom); !possibleMatches {
// bloom filter says no events match the filter, skip this block entirely if from is not nil
continue
}
Expand All @@ -167,7 +161,7 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
}

var processedEvents uint64
matchedEvents, processedEvents, err = e.appendBlockEvents(matchedEvents, header, receipts, filterKeysMaps, cToken, chunkSize)
matchedEvents, processedEvents, err = e.matcher.AppendBlockEvents(matchedEvents, header, receipts, cToken, chunkSize)
if err != nil {
if errors.Is(err, errChunkSizeReached) {
rToken = &ContinuationToken{fromBlock: curBlock, processedEvents: processedEvents}
Expand All @@ -182,120 +176,3 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
}
return matchedEvents, rToken, nil
}

func (e *EventFilter) testBloom(bloomFilter *bloom.BloomFilter, keysMap []map[felt.Felt]struct{}) bool {
possibleMatches := true
if e.contractAddress != nil {
addrBytes := e.contractAddress.Bytes()
possibleMatches = bloomFilter.Test(addrBytes[:])
// bloom filter says no events from this contract
if !possibleMatches {
return possibleMatches
}
}

for index, kMap := range keysMap {
for key := range kMap {
keyBytes := key.Bytes()
keyAndIndexBytes := binary.AppendVarint(keyBytes[:], int64(index))

// check if block possibly contains the event we are looking for
possibleMatches = bloomFilter.Test(keyAndIndexBytes)
// possible match for this index, no need to continue checking the rest of the keys
if possibleMatches {
break
}
}

// no key on this index matches the filter
if !possibleMatches {
break
}
}

return possibleMatches
}

func (e *EventFilter) appendBlockEvents(matchedEventsSofar []*FilteredEvent, header *core.Header,
receipts []*core.TransactionReceipt, keysMap []map[felt.Felt]struct{}, cToken *ContinuationToken, chunkSize uint64,
) ([]*FilteredEvent, uint64, error) {
processedEvents := uint64(0)
for _, receipt := range receipts {
for i, event := range receipt.Events {
var blockNumber *uint64
// if header.Hash == nil it's a pending block
if header.Hash != nil {
blockNumber = &header.Number
}

// if last request was interrupted mid-block, and we are still processing that block, skip events
// that were already processed
if cToken != nil && header.Number == cToken.fromBlock && processedEvents < cToken.processedEvents {
processedEvents++
continue
}

if e.contractAddress != nil && !event.From.Equal(e.contractAddress) {
processedEvents++
continue
}

if e.matchesEventKeys(event.Keys, keysMap) {
if uint64(len(matchedEventsSofar)) < chunkSize {
matchedEventsSofar = append(matchedEventsSofar, &FilteredEvent{
BlockNumber: blockNumber,
BlockHash: header.Hash,
TransactionHash: receipt.TransactionHash,
EventIndex: i,
Event: event,
})
} else {
// we are at the capacity, return what we have accumulated so far and a continuation token
return matchedEventsSofar, processedEvents, errChunkSizeReached
}
}
// count the events we processed for this block to include in the continuation token
processedEvents++
}
}
return matchedEventsSofar, processedEvents, nil
}

func (e *EventFilter) matchesEventKeys(eventKeys []*felt.Felt, keysMap []map[felt.Felt]struct{}) bool {
// short circuit if event doest have enough keys
for i := len(eventKeys); i < len(keysMap); i++ {
if len(keysMap[i]) > 0 {
return false
}
}

/// e.keys = [["V1", "V2"], [], ["V3"]] means:
/// ((event.Keys[0] == "V1" OR event.Keys[0] == "V2") AND (event.Keys[2] == "V3")).
//
// Essentially
// for each event.Keys[i], (len(e.keys[i]) == 0 OR event.Keys[i] is in e.keys[i]) should hold
for index, eventKey := range eventKeys {
// empty filter keys means match all
if index >= len(keysMap) || len(keysMap[index]) == 0 {
break
}
if _, found := keysMap[index][*eventKey]; !found {
return false
}
}

return true
}

func makeKeysMaps(filterKeys [][]felt.Felt) []map[felt.Felt]struct{} {
filterKeysMaps := make([]map[felt.Felt]struct{}, len(filterKeys))
for index, keys := range filterKeys {
kMap := make(map[felt.Felt]struct{}, len(keys))
for _, key := range keys {
kMap[key] = struct{}{}
}
filterKeysMaps[index] = kMap
}

return filterKeysMaps
}
Loading

0 comments on commit fab8d2b

Please sign in to comment.