From 56a604b2ecf6b1fbe67a7fe6042dcd59ead837c5 Mon Sep 17 00:00:00 2001 From: Sergei Drugalev Date: Mon, 3 Mar 2025 10:36:56 +0100 Subject: [PATCH 1/3] Add EventsByHandle to the client interface --- client.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/client.go b/client.go index 9e07b7d..8aa35b0 100644 --- a/client.go +++ b/client.go @@ -217,6 +217,22 @@ type AptosRpcClient interface { // client.AccountTransactions(AccountOne, 1, 100) // Returns 100 transactions for 0x1 AccountTransactions(address AccountAddress, start *uint64, limit *uint64) (data []*api.CommittedTransaction, err error) + // EventsByHandle retrieves events by event handle and field name for a given account. + // + // Arguments: + // - account - The account address to get events for + // - eventHandle - The event handle struct tag + // - fieldName - The field in the event handle struct + // - start - The starting sequence number. nil for most recent events + // - limit - The number of events to return, 100 by default + EventsByHandle( + account AccountAddress, + eventHandle string, + fieldName string, + start *uint64, + limit *uint64, + ) ([]*api.Event, error) + // SubmitTransaction Submits an already signed transaction to the blockchain // // sender := NewEd25519Account() @@ -649,6 +665,16 @@ func (client *Client) AccountTransactions(address AccountAddress, start *uint64, return client.nodeClient.AccountTransactions(address, start, limit) } +// EventsByHandle Get events by handle and field name for an account. +// Start is a sequence number. Nil for most recent events. +// Limit is a number of events to return, 100 by default. +// +// client.EventsByHandle(AccountOne, "0x2", "transfer", 0, 2) // Returns 2 events +// client.EventsByHandle(AccountOne, "0x2", "transfer", 1, 100) // Returns 100 events +func (client *Client) EventsByHandle(account AccountAddress, eventHandle string, fieldName string, start *uint64, limit *uint64) ([]*api.Event, error) { + return client.nodeClient.EventsByHandle(account, eventHandle, fieldName, start, limit) +} + // SubmitTransaction Submits an already signed transaction to the blockchain // // sender := NewEd25519Account() From 44ace21d12ee8384e1176d31608dfbd5eb70d004 Mon Sep 17 00:00:00 2001 From: Sergei Drugalev Date: Mon, 3 Mar 2025 10:37:20 +0100 Subject: [PATCH 2/3] EventByHandle implementation for the nodeClient --- nodeClient.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/nodeClient.go b/nodeClient.go index 11a76c5..7f67563 100644 --- a/nodeClient.go +++ b/nodeClient.go @@ -407,6 +407,91 @@ func (rc *NodeClient) AccountTransactions(account AccountAddress, start *uint64, }) } +func (rc *NodeClient) EventsByHandle( + account AccountAddress, + eventHandle string, + fieldName string, + start *uint64, + limit *uint64, +) (data []*api.Event, err error) { + basePath := fmt.Sprintf("accounts/%s/events/%s/%s", + account.String(), + eventHandle, + fieldName) + + baseUrl := rc.baseUrl.JoinPath(basePath) + + const eventsPageSize = 100 + var effectiveLimit uint64 + if limit == nil { + effectiveLimit = eventsPageSize + } else { + effectiveLimit = *limit + } + + var effectiveStart uint64 + if start == nil { + effectiveStart = 0 + } else { + effectiveStart = *start + } + + if effectiveLimit <= eventsPageSize { + params := url.Values{} + params.Set("start", strconv.FormatUint(effectiveStart, 10)) + params.Set("limit", strconv.FormatUint(effectiveLimit, 10)) + + requestUrl := *baseUrl + requestUrl.RawQuery = params.Encode() + + data, err = Get[[]*api.Event](rc, requestUrl.String()) + if err != nil { + return nil, fmt.Errorf("get events api err: %w", err) + } + return data, nil + } + + pages := (effectiveLimit + eventsPageSize - 1) / eventsPageSize + channels := make([]chan ConcResponse[[]*api.Event], pages) + + for i := uint64(0); i < pages; i++ { + channels[i] = make(chan ConcResponse[[]*api.Event], 1) + pageStart := effectiveStart + (i * eventsPageSize) + pageLimit := min(eventsPageSize, effectiveLimit-(i*eventsPageSize)) + + go fetch(func() ([]*api.Event, error) { + params := url.Values{} + params.Set("start", strconv.FormatUint(pageStart, 10)) + params.Set("limit", strconv.FormatUint(pageLimit, 10)) + + requestUrl := *baseUrl + requestUrl.RawQuery = params.Encode() + + events, err := Get[[]*api.Event](rc, requestUrl.String()) + if err != nil { + return nil, fmt.Errorf("get events api err: %w", err) + } + return events, nil + }, channels[i]) + } + + events := make([]*api.Event, 0, effectiveLimit) + for i, ch := range channels { + response := <-ch + if response.Err != nil { + return nil, response.Err + } + events = append(events, response.Result...) + close(channels[i]) + } + + sort.Slice(events, func(i, j int) bool { + return events[i].SequenceNumber < events[j].SequenceNumber + }) + + return events, nil +} + // handleTransactions is a helper function for fetching transactions // // It will fetch the transactions from the node in a single request if possible, otherwise it will fetch them concurrently. From 7eea493f8d8f9191df799d56d4dc47d18a9c09cc Mon Sep 17 00:00:00 2001 From: Sergei Drugalev Date: Mon, 3 Mar 2025 10:37:45 +0100 Subject: [PATCH 3/3] EventByHandle test --- nodeClient_test.go | 99 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 98 insertions(+), 1 deletion(-) diff --git a/nodeClient_test.go b/nodeClient_test.go index fa663bd..3e27d3f 100644 --- a/nodeClient_test.go +++ b/nodeClient_test.go @@ -1,9 +1,15 @@ package aptos import ( - "github.com/stretchr/testify/assert" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strconv" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestPollForTransaction(t *testing.T) { @@ -20,3 +26,94 @@ func TestPollForTransaction(t *testing.T) { assert.Less(t, dt, 20*time.Millisecond) assert.Error(t, err) } + +func TestEventsByHandle(t *testing.T) { + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + // handle initial request from client + w.WriteHeader(http.StatusOK) + return + } + + assert.Equal(t, "/accounts/0x0/events/0x2/transfer", r.URL.Path) + + start := r.URL.Query().Get("start") + limit := r.URL.Query().Get("limit") + + startInt, _ := strconv.ParseUint(start, 10, 64) + limitInt, _ := strconv.ParseUint(limit, 10, 64) + + events := make([]map[string]interface{}, 0, limitInt) + for i := uint64(0); i < limitInt; i++ { + events = append(events, map[string]interface{}{ + "type": "0x1::coin::TransferEvent", + "guid": map[string]interface{}{ + "creation_number": "1", + "account_address": AccountZero.String(), + }, + "sequence_number": strconv.FormatUint(startInt+i, 10), + "data": map[string]interface{}{ + "amount": fmt.Sprintf("%d", (startInt+i)*100), + }, + }) + } + + json.NewEncoder(w).Encode(events) + })) + defer mockServer.Close() + + client, err := NewClient(NetworkConfig{ + Name: "mocknet", + NodeUrl: mockServer.URL, + }) + assert.NoError(t, err) + + t.Run("pagination with concurrent fetching", func(t *testing.T) { + start := uint64(0) + limit := uint64(150) + events, err := client.EventsByHandle( + AccountZero, + "0x2", + "transfer", + &start, + &limit, + ) + + assert.NoError(t, err) + assert.Len(t, events, 150) + }) + + t.Run("default page size when limit not provided", func(t *testing.T) { + events, err := client.EventsByHandle( + AccountZero, + "0x2", + "transfer", + nil, + nil, + ) + + assert.NoError(t, err) + assert.Len(t, events, 100) + assert.Equal(t, uint64(99), events[99].SequenceNumber) + }) + + t.Run("single page fetch", func(t *testing.T) { + start := uint64(50) + limit := uint64(5) + events, err := client.EventsByHandle( + AccountZero, + "0x2", + "transfer", + &start, + &limit, + ) + + jsonBytes, _ := json.MarshalIndent(events, "", " ") + t.Logf("JSON Response: %s", string(jsonBytes)) + + assert.NoError(t, err) + assert.Len(t, events, 5) + assert.Equal(t, uint64(50), events[0].SequenceNumber) + assert.Equal(t, uint64(54), events[4].SequenceNumber) + }) +}