Skip to content

Commit

Permalink
feat: bridge_getClaims endpoint (#210)
Browse files Browse the repository at this point in the history
Co-authored-by: Stefan Negovanović <stefan@ethernal.tech>
  • Loading branch information
rachit77 and Stefan-Ethernal committed Feb 24, 2025
1 parent 9da627c commit 223ee49
Show file tree
Hide file tree
Showing 14 changed files with 545 additions and 50 deletions.
10 changes: 10 additions & 0 deletions bridgesync/bridgesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ func newBridgeSync(
}, nil
}

func (s *BridgeSync) GetClaimsPaged(
ctx context.Context,
page, pageSize uint32,
) ([]*ClaimResponse, int, error) {
if s.processor.isHalted() {
return nil, 0, sync.ErrInconsistentState
}
return s.processor.GetClaimsPaged(ctx, page, pageSize)
}

// Start starts the synchronization process
func (s *BridgeSync) Start(ctx context.Context) {
s.driver.Sync(ctx)
Expand Down
6 changes: 6 additions & 0 deletions bridgesync/bridgesync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,9 @@ func TestGetBridgePaged(t *testing.T) {
_, _, err := s.GetBridgesPaged(context.Background(), 0, 0, nil)
require.ErrorIs(t, err, sync.ErrInconsistentState)
}

func TestGetClaimPaged(t *testing.T) {
s := BridgeSync{processor: &processor{halted: true}}
_, _, err := s.GetClaimsPaged(context.Background(), 0, 0)
require.ErrorIs(t, err, sync.ErrInconsistentState)
}
3 changes: 3 additions & 0 deletions bridgesync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func buildAppender(client EthClienter, bridge common.Address, syncFullClaims boo
OriginAddress: claimEvent.OriginAddress,
DestinationAddress: claimEvent.DestinationAddress,
Amount: claimEvent.Amount,
BlockTimestamp: b.Timestamp,
TxHash: l.TxHash,
FromAddress: l.Address,
}
if syncFullClaims {
if err := setClaimCalldata(client, bridge, l.TxHash, claim); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions bridgesync/migrations/bridgesync0002.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ ALTER TABLE claim DROP COLUMN tx_hash;
ALTER TABLE bridge DROP COLUMN block_timestamp;
ALTER TABLE claim DROP COLUMN block_timestamp;
ALTER TABLE bridge DROP COLUMN from_address;
ALTER TABLE claim DROP COLUMN from_address;

-- +migrate Up
CREATE TABLE
Expand All @@ -24,3 +25,4 @@ ALTER TABLE claim ADD COLUMN tx_hash VARCHAR;
ALTER TABLE bridge ADD COLUMN block_timestamp INTEGER;
ALTER TABLE claim ADD COLUMN block_timestamp INTEGER;
ALTER TABLE bridge ADD COLUMN from_address VARCHAR;
ALTER TABLE claim ADD COLUMN from_address VARCHAR;
13 changes: 9 additions & 4 deletions bridgesync/migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ func TestMigration0002(t *testing.T) {
metadata,
deposit_count,
block_timestamp,
tx_hash
) VALUES (1, 0, 0, 0, '0x3', 0, '0x0000', 0, NULL, 0, 1739270804, '0xabcd');
tx_hash,
from_address
) VALUES (1, 0, 0, 0, '0x3', 0, '0x0000', 0, NULL, 0, 1739270804, '0xabcd', '0x123');
INSERT INTO claim (
block_num,
Expand All @@ -118,8 +119,9 @@ func TestMigration0002(t *testing.T) {
metadata,
is_message,
block_timestamp,
tx_hash
) VALUES (1, 0, 0, 0, '0x3', '0x0000', 0, 0, NULL, FALSE, 1739270804, '0xabcd');
tx_hash,
from_address
) VALUES (1, 0, 0, 0, '0x3', '0x0000', 0, 0, NULL, FALSE, 1739270804, '0xabcd', '0x123');
`)
require.NoError(t, err)
err = tx.Commit()
Expand Down Expand Up @@ -160,6 +162,7 @@ func TestMigration0002(t *testing.T) {
DepositCount uint32 `meddler:"deposit_count"`
BlockTimestamp uint64 `meddler:"block_timestamp"`
TxHash string `meddler:"tx_hash"`
FromAddress string `meddler:"from_address"`
}

err = meddler.QueryRow(db, &bridge,
Expand All @@ -181,11 +184,13 @@ func TestMigration0002(t *testing.T) {
IsMessage bool `meddler:"is_message"`
BlockTimestamp uint64 `meddler:"block_timestamp"`
TxHash string `meddler:"tx_hash"`
FromAddress string `meddler:"from_address"`
}

err = meddler.QueryRow(db, &claim,
`SELECT * FROM claim`)
require.NoError(t, err)
require.NotNil(t, claim)
require.Equal(t, uint64(1739270804), claim.BlockTimestamp)
require.Equal(t, "0x123", claim.FromAddress)
}
123 changes: 99 additions & 24 deletions bridgesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,25 @@ var (

// Bridge is the representation of a bridge event
type Bridge struct {
BlockNum uint64 `meddler:"block_num"`
BlockPos uint64 `meddler:"block_pos"`
LeafType uint8 `meddler:"leaf_type"`
OriginNetwork uint32 `meddler:"origin_network"`
OriginAddress common.Address `meddler:"origin_address"`
DestinationNetwork uint32 `meddler:"destination_network"`
DestinationAddress common.Address `meddler:"destination_address"`
Amount *big.Int `meddler:"amount,bigint"`
Metadata []byte `meddler:"metadata"`
DepositCount uint32 `meddler:"deposit_count"`
BlockTimestamp uint64 `meddler:"block_timestamp"`
TxHash common.Hash `meddler:"tx_hash,hash"`
FromAddress common.Address `meddler:"from_address,address"`
BlockNum uint64 `meddler:"block_num" json:"block_num"`
BlockPos uint64 `meddler:"block_pos" json:"block_pos"`
BlockTimestamp uint64 `meddler:"block_timestamp" json:"block_timestamp"`
LeafType uint8 `meddler:"leaf_type" json:"leaf_type"`
OriginNetwork uint32 `meddler:"origin_network" json:"origin_network"`
OriginAddress common.Address `meddler:"origin_address" json:"origin_address"`
DestinationNetwork uint32 `meddler:"destination_network" json:"destination_network"`
DestinationAddress common.Address `meddler:"destination_address" json:"destination_address"`
Amount *big.Int `meddler:"amount,bigint" json:"amount"`
Metadata []byte `meddler:"metadata" json:"metadata"`
DepositCount uint32 `meddler:"deposit_count" json:"deposit_count"`
TxHash common.Hash `meddler:"tx_hash,hash" json:"tx_hash"`
FromAddress common.Address `meddler:"from_address,address" json:"from_address"`
}

// BridgeResponse is the representation of a bridge event with additional fields
type BridgeResponse struct {
Bridge
BridgeHash common.Hash
BridgeHash common.Hash `json:"bridge_hash"`
}

// Cant change the Hash() here after adding BlockTimestamp, TxHash. Might affect previous versions
Expand Down Expand Up @@ -111,18 +111,33 @@ type Claim struct {
IsMessage bool `meddler:"is_message"`
BlockTimestamp uint64 `meddler:"block_timestamp"`
TxHash common.Hash `meddler:"tx_hash,hash"`
FromAddress common.Address `meddler:"from_address,address"`
}

// ClaimResponse is the representation of a claim event with trimmed fields
type ClaimResponse struct {
BlockNum uint64 `json:"block_num"`
BlockTimestamp uint64 `json:"block_timestamp"`
TxHash common.Hash `json:"tx_hash"`
GlobalIndex *big.Int `json:"global_index"`
OriginAddress common.Address `json:"origin_address"`
OriginNetwork uint32 `json:"origin_network"`
DestinationAddress common.Address `json:"destination_address"`
DestinationNetwork uint32 `json:"destination_network"`
Amount *big.Int `json:"amount"`
FromAddress common.Address `json:"from_address"`
}

// TokenMapping representation of a NewWrappedToken event, that is emitted by the bridge contract
type TokenMapping struct {
BlockNum uint64 `meddler:"block_num"`
BlockPos uint64 `meddler:"block_pos"`
BlockTimestamp uint64 `meddler:"block_timestamp"`
TxHash common.Hash `meddler:"tx_hash,hash"`
OriginNetwork uint32 `meddler:"origin_network"`
OriginTokenAddress common.Address `meddler:"origin_token_address,address"`
WrappedTokenAddress common.Address `meddler:"wrapped_token_address,address"`
Metadata []byte `meddler:"metadata"`
BlockNum uint64 `meddler:"block_num" json:"block_num"`
BlockPos uint64 `meddler:"block_pos" json:"block_pos"`
BlockTimestamp uint64 `meddler:"block_timestamp" json:"block_timestamp"`
TxHash common.Hash `meddler:"tx_hash,hash" json:"tx_hash"`
OriginNetwork uint32 `meddler:"origin_network" json:"origin_network"`
OriginTokenAddress common.Address `meddler:"origin_token_address,address" json:"origin_token_address"`
WrappedTokenAddress common.Address `meddler:"wrapped_token_address,address" json:"wrapped_token_address"`
Metadata []byte `meddler:"metadata" json:"metadata"`
}

// Event combination of bridge, claim and token mapping events
Expand Down Expand Up @@ -249,7 +264,7 @@ func (p *processor) GetBridgesPaged(
pageSize = 1
}
offset := (pageNumber - 1) * pageSize
if int(offset) >= count {
if offset >= uint32(count) {
p.log.Debugf("offset is larger than total bridges (page number=%d, page size=%d, total bridges=%d)",
pageNumber, pageSize, count)
return nil, 0, db.ErrNotFound
Expand Down Expand Up @@ -280,6 +295,66 @@ func (p *processor) GetBridgesPaged(
return bridgeResponsePtrs, count, nil
}

func (p *processor) GetClaimsPaged(
ctx context.Context, pageNumber, pageSize uint32,
) ([]*ClaimResponse, int, error) {
tx, err := p.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, 0, err
}
defer func() {
if err := tx.Rollback(); err != nil {
log.Warnf("error rolling back tx: %v", err)
}
}()
count, err := p.GetTotalNumberOfRecords(claimTableName)
if err != nil {
return nil, 0, err
}

offset := (pageNumber - 1) * pageSize
if offset >= uint32(count) {
p.log.Debugf("offset is larger than total claims (page number=%d, page size=%d, total claims=%d)",
pageNumber, pageSize, count)
return nil, 0, db.ErrNotFound
}

orderBy := "global_index + 0"
order := "DESC"
whereClause := ""
rows, err := p.queryPaged(tx, offset, pageSize, claimTableName, orderBy, order, whereClause)
if err != nil {
return nil, 0, err
}
defer func() {
if err := rows.Close(); err != nil {
p.log.Warnf("error closing rows: %v", err)
}
}()
claimPtrs := []*Claim{}
if err = meddler.ScanAll(rows, &claimPtrs); err != nil {
return nil, 0, err
}

claimResponsePtrs := make([]*ClaimResponse, len(claimPtrs))
for i, bridgePtr := range claimPtrs {
claimResponsePtrs[i] = &ClaimResponse{
GlobalIndex: bridgePtr.GlobalIndex,
DestinationNetwork: bridgePtr.DestinationNetwork,
TxHash: bridgePtr.TxHash,
Amount: bridgePtr.Amount,
BlockNum: bridgePtr.BlockNum,
FromAddress: bridgePtr.FromAddress,
DestinationAddress: bridgePtr.DestinationAddress,
OriginAddress: bridgePtr.OriginAddress,
OriginNetwork: bridgePtr.OriginNetwork,
BlockTimestamp: bridgePtr.BlockTimestamp,
}
}

return claimResponsePtrs, count, nil
}

func (p *processor) queryBlockRange(tx db.Querier, fromBlock, toBlock uint64, table string) (*sql.Rows, error) {
if err := p.isBlockProcessed(tx, toBlock); err != nil {
return nil, err
Expand Down Expand Up @@ -475,7 +550,7 @@ func (p *processor) GetTokenMappings(ctx context.Context, pageNumber, pageSize u
}

offset := (pageNumber - 1) * pageSize
if int(offset) >= totalTokenMappings {
if offset >= uint32(totalTokenMappings) {
p.log.Debugf("offset is larger than total token mappings (page number=%d, page size=%d, total token mappings=%d)",
pageNumber, pageSize, totalTokenMappings)
return nil, 0, db.ErrNotFound
Expand Down
114 changes: 114 additions & 0 deletions bridgesync/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,120 @@ func TestGetBridgesPaged(t *testing.T) {
}
}

func TestGetClaimsPaged(t *testing.T) {
t.Parallel()
fromBlock := uint64(1)
toBlock := uint64(10)

// Compute uint256 max: 2^256 - 1
uint256Max := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(256), nil), big.NewInt(1))
// Compute uint64 max: 2^64 - 1
uint64Max := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(64), nil), big.NewInt(1))

claims :=
[]Claim{
{BlockNum: 1, GlobalIndex: big.NewInt(1), Amount: big.NewInt(1)},
{BlockNum: 2, GlobalIndex: big.NewInt(2), Amount: big.NewInt(1)},
{BlockNum: 3, GlobalIndex: big.NewInt(3), Amount: big.NewInt(1)},
{BlockNum: 4, GlobalIndex: big.NewInt(4), Amount: big.NewInt(1)},
{BlockNum: 5, GlobalIndex: uint64Max, Amount: big.NewInt(1)},
{BlockNum: 6, GlobalIndex: uint256Max, Amount: big.NewInt(1)},
}

path := path.Join(t.TempDir(), "bridgesyncGetClaimsPaged.sqlite")
require.NoError(t, migrationsBridge.RunMigrations(path))
logger := log.WithFields("module", "bridge-syncer")
p, err := newProcessor(path, logger)
require.NoError(t, err)

tx, err := p.db.BeginTx(context.Background(), nil)
require.NoError(t, err)

for i := fromBlock; i <= toBlock; i++ {
_, err = tx.Exec(`INSERT INTO block (num) VALUES ($1)`, i)
require.NoError(t, err)
}

for _, claim := range claims {
require.NoError(t, meddler.Insert(tx, "claim", &claim))
}
require.NoError(t, tx.Commit())

testCases := []struct {
name string
pageSize uint32
page uint32
expectedCount int
expectedClaims []*ClaimResponse
expectedError error
}{
{
name: "t1",
pageSize: 1,
page: 2,
expectedCount: 6,
expectedClaims: []*ClaimResponse{
{BlockNum: 5, GlobalIndex: uint64Max, Amount: big.NewInt(1)},
},
expectedError: nil,
},
{
name: "t2",
pageSize: 20,
page: 1,
expectedCount: 6,
expectedClaims: []*ClaimResponse{
{BlockNum: 6, GlobalIndex: uint256Max, Amount: big.NewInt(1)},
{BlockNum: 5, GlobalIndex: uint64Max, Amount: big.NewInt(1)},
{BlockNum: 4, GlobalIndex: big.NewInt(4), Amount: big.NewInt(1)},
{BlockNum: 3, GlobalIndex: big.NewInt(3), Amount: big.NewInt(1)},
{BlockNum: 2, GlobalIndex: big.NewInt(2), Amount: big.NewInt(1)},
{BlockNum: 1, GlobalIndex: big.NewInt(1), Amount: big.NewInt(1)},
},
expectedError: nil,
},
{
name: "t3",
pageSize: 3,
page: 2,
expectedCount: 6,
expectedClaims: []*ClaimResponse{
{BlockNum: 3, GlobalIndex: big.NewInt(3), Amount: big.NewInt(1)},
{BlockNum: 2, GlobalIndex: big.NewInt(2), Amount: big.NewInt(1)},
{BlockNum: 1, GlobalIndex: big.NewInt(1), Amount: big.NewInt(1)},
},
expectedError: nil,
},
{
name: "t4: offset is larger than total claims",
pageSize: 3,
page: 4,
expectedCount: 0,
expectedClaims: []*ClaimResponse{},
expectedError: db.ErrNotFound,
},
}

for _, tc := range testCases {
tc := tc

t.Run(tc.name, func(t *testing.T) {
t.Parallel()

ctx := context.Background()
claims, count, err := p.GetClaimsPaged(ctx, tc.page, tc.pageSize)

if tc.expectedError != nil {
require.Equal(t, tc.expectedError, err)
} else {
require.NoError(t, err)
require.Equal(t, tc.expectedClaims, claims)
require.Equal(t, tc.expectedCount, count)
}
})
}
}

func TestGetTokenMapping(t *testing.T) {
t.Parallel()

Expand Down
Loading

0 comments on commit 223ee49

Please sign in to comment.