Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventsByHandle to the nodeClient #133

Merged
merged 3 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,22 @@ type AptosRpcClient interface {
// client.AccountTransactions(AccountOne, 1, 100) // Returns 100 transactions for 0x1
AccountTransactions(address AccountAddress, start *uint64, limit *uint64) (data []*api.CommittedTransaction, err error)

// EventsByHandle retrieves events by event handle and field name for a given account.
//
// Arguments:
// - account - The account address to get events for
// - eventHandle - The event handle struct tag
// - fieldName - The field in the event handle struct
// - start - The starting sequence number. nil for most recent events
// - limit - The number of events to return, 100 by default
EventsByHandle(
account AccountAddress,
eventHandle string,
fieldName string,
start *uint64,
limit *uint64,
) ([]*api.Event, error)

// SubmitTransaction Submits an already signed transaction to the blockchain
//
// sender := NewEd25519Account()
Expand Down Expand Up @@ -649,6 +665,16 @@ func (client *Client) AccountTransactions(address AccountAddress, start *uint64,
return client.nodeClient.AccountTransactions(address, start, limit)
}

// EventsByHandle Get events by handle and field name for an account.
// Start is a sequence number. Nil for most recent events.
// Limit is a number of events to return, 100 by default.
//
// client.EventsByHandle(AccountOne, "0x2", "transfer", 0, 2) // Returns 2 events
// client.EventsByHandle(AccountOne, "0x2", "transfer", 1, 100) // Returns 100 events
func (client *Client) EventsByHandle(account AccountAddress, eventHandle string, fieldName string, start *uint64, limit *uint64) ([]*api.Event, error) {
return client.nodeClient.EventsByHandle(account, eventHandle, fieldName, start, limit)
}

// SubmitTransaction Submits an already signed transaction to the blockchain
//
// sender := NewEd25519Account()
Expand Down
85 changes: 85 additions & 0 deletions nodeClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,91 @@ func (rc *NodeClient) AccountTransactions(account AccountAddress, start *uint64,
})
}

func (rc *NodeClient) EventsByHandle(
account AccountAddress,
eventHandle string,
fieldName string,
start *uint64,
limit *uint64,
) (data []*api.Event, err error) {
basePath := fmt.Sprintf("accounts/%s/events/%s/%s",
account.String(),
eventHandle,
fieldName)

baseUrl := rc.baseUrl.JoinPath(basePath)

const eventsPageSize = 100
var effectiveLimit uint64
if limit == nil {
effectiveLimit = eventsPageSize
} else {
effectiveLimit = *limit
}

var effectiveStart uint64
if start == nil {
effectiveStart = 0
} else {
effectiveStart = *start
}

if effectiveLimit <= eventsPageSize {
params := url.Values{}
params.Set("start", strconv.FormatUint(effectiveStart, 10))
params.Set("limit", strconv.FormatUint(effectiveLimit, 10))

requestUrl := *baseUrl
requestUrl.RawQuery = params.Encode()

data, err = Get[[]*api.Event](rc, requestUrl.String())
if err != nil {
return nil, fmt.Errorf("get events api err: %w", err)
}
return data, nil
}

pages := (effectiveLimit + eventsPageSize - 1) / eventsPageSize
channels := make([]chan ConcResponse[[]*api.Event], pages)

for i := uint64(0); i < pages; i++ {
channels[i] = make(chan ConcResponse[[]*api.Event], 1)
pageStart := effectiveStart + (i * eventsPageSize)
pageLimit := min(eventsPageSize, effectiveLimit-(i*eventsPageSize))

go fetch(func() ([]*api.Event, error) {
params := url.Values{}
params.Set("start", strconv.FormatUint(pageStart, 10))
params.Set("limit", strconv.FormatUint(pageLimit, 10))

requestUrl := *baseUrl
requestUrl.RawQuery = params.Encode()

events, err := Get[[]*api.Event](rc, requestUrl.String())
if err != nil {
return nil, fmt.Errorf("get events api err: %w", err)
}
return events, nil
}, channels[i])
}

events := make([]*api.Event, 0, effectiveLimit)
for i, ch := range channels {
response := <-ch
if response.Err != nil {
return nil, response.Err
}
events = append(events, response.Result...)
close(channels[i])
}

sort.Slice(events, func(i, j int) bool {
return events[i].SequenceNumber < events[j].SequenceNumber
})

return events, nil
}

// handleTransactions is a helper function for fetching transactions
//
// It will fetch the transactions from the node in a single request if possible, otherwise it will fetch them concurrently.
Expand Down
99 changes: 98 additions & 1 deletion nodeClient_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package aptos

import (
"github.com/stretchr/testify/assert"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestPollForTransaction(t *testing.T) {
Expand All @@ -20,3 +26,94 @@ func TestPollForTransaction(t *testing.T) {
assert.Less(t, dt, 20*time.Millisecond)
assert.Error(t, err)
}

func TestEventsByHandle(t *testing.T) {
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
// handle initial request from client
w.WriteHeader(http.StatusOK)
return
}

assert.Equal(t, "/accounts/0x0/events/0x2/transfer", r.URL.Path)

start := r.URL.Query().Get("start")
limit := r.URL.Query().Get("limit")

startInt, _ := strconv.ParseUint(start, 10, 64)
limitInt, _ := strconv.ParseUint(limit, 10, 64)

events := make([]map[string]interface{}, 0, limitInt)
for i := uint64(0); i < limitInt; i++ {
events = append(events, map[string]interface{}{
"type": "0x1::coin::TransferEvent",
"guid": map[string]interface{}{
"creation_number": "1",
"account_address": AccountZero.String(),
},
"sequence_number": strconv.FormatUint(startInt+i, 10),
"data": map[string]interface{}{
"amount": fmt.Sprintf("%d", (startInt+i)*100),
},
})
}

json.NewEncoder(w).Encode(events)
}))
defer mockServer.Close()

client, err := NewClient(NetworkConfig{
Name: "mocknet",
NodeUrl: mockServer.URL,
})
assert.NoError(t, err)

t.Run("pagination with concurrent fetching", func(t *testing.T) {
start := uint64(0)
limit := uint64(150)
events, err := client.EventsByHandle(
AccountZero,
"0x2",
"transfer",
&start,
&limit,
)

assert.NoError(t, err)
assert.Len(t, events, 150)
})

t.Run("default page size when limit not provided", func(t *testing.T) {
events, err := client.EventsByHandle(
AccountZero,
"0x2",
"transfer",
nil,
nil,
)

assert.NoError(t, err)
assert.Len(t, events, 100)
assert.Equal(t, uint64(99), events[99].SequenceNumber)
})

t.Run("single page fetch", func(t *testing.T) {
start := uint64(50)
limit := uint64(5)
events, err := client.EventsByHandle(
AccountZero,
"0x2",
"transfer",
&start,
&limit,
)

jsonBytes, _ := json.MarshalIndent(events, "", " ")
t.Logf("JSON Response: %s", string(jsonBytes))

assert.NoError(t, err)
assert.Len(t, events, 5)
assert.Equal(t, uint64(50), events[0].SequenceNumber)
assert.Equal(t, uint64(54), events[4].SequenceNumber)
})
}