Skip to content

Commit 777d608

Browse files
authored
Merge branch 'main' into worker-pool-tx
2 parents c8b06f8 + affcbec commit 777d608

File tree

3 files changed

+209
-1
lines changed

3 files changed

+209
-1
lines changed

client.go

+26
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,22 @@ type AptosRpcClient interface {
217217
// client.AccountTransactions(AccountOne, 1, 100) // Returns 100 transactions for 0x1
218218
AccountTransactions(address AccountAddress, start *uint64, limit *uint64) (data []*api.CommittedTransaction, err error)
219219

220+
// EventsByHandle retrieves events by event handle and field name for a given account.
221+
//
222+
// Arguments:
223+
// - account - The account address to get events for
224+
// - eventHandle - The event handle struct tag
225+
// - fieldName - The field in the event handle struct
226+
// - start - The starting sequence number. nil for most recent events
227+
// - limit - The number of events to return, 100 by default
228+
EventsByHandle(
229+
account AccountAddress,
230+
eventHandle string,
231+
fieldName string,
232+
start *uint64,
233+
limit *uint64,
234+
) ([]*api.Event, error)
235+
220236
// SubmitTransaction Submits an already signed transaction to the blockchain
221237
//
222238
// sender := NewEd25519Account()
@@ -649,6 +665,16 @@ func (client *Client) AccountTransactions(address AccountAddress, start *uint64,
649665
return client.nodeClient.AccountTransactions(address, start, limit)
650666
}
651667

668+
// EventsByHandle Get events by handle and field name for an account.
669+
// Start is a sequence number. Nil for most recent events.
670+
// Limit is a number of events to return, 100 by default.
671+
//
672+
// client.EventsByHandle(AccountOne, "0x2", "transfer", 0, 2) // Returns 2 events
673+
// client.EventsByHandle(AccountOne, "0x2", "transfer", 1, 100) // Returns 100 events
674+
func (client *Client) EventsByHandle(account AccountAddress, eventHandle string, fieldName string, start *uint64, limit *uint64) ([]*api.Event, error) {
675+
return client.nodeClient.EventsByHandle(account, eventHandle, fieldName, start, limit)
676+
}
677+
652678
// SubmitTransaction Submits an already signed transaction to the blockchain
653679
//
654680
// sender := NewEd25519Account()

nodeClient.go

+85
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,91 @@ func (rc *NodeClient) AccountTransactions(account AccountAddress, start *uint64,
407407
})
408408
}
409409

410+
func (rc *NodeClient) EventsByHandle(
411+
account AccountAddress,
412+
eventHandle string,
413+
fieldName string,
414+
start *uint64,
415+
limit *uint64,
416+
) (data []*api.Event, err error) {
417+
basePath := fmt.Sprintf("accounts/%s/events/%s/%s",
418+
account.String(),
419+
eventHandle,
420+
fieldName)
421+
422+
baseUrl := rc.baseUrl.JoinPath(basePath)
423+
424+
const eventsPageSize = 100
425+
var effectiveLimit uint64
426+
if limit == nil {
427+
effectiveLimit = eventsPageSize
428+
} else {
429+
effectiveLimit = *limit
430+
}
431+
432+
var effectiveStart uint64
433+
if start == nil {
434+
effectiveStart = 0
435+
} else {
436+
effectiveStart = *start
437+
}
438+
439+
if effectiveLimit <= eventsPageSize {
440+
params := url.Values{}
441+
params.Set("start", strconv.FormatUint(effectiveStart, 10))
442+
params.Set("limit", strconv.FormatUint(effectiveLimit, 10))
443+
444+
requestUrl := *baseUrl
445+
requestUrl.RawQuery = params.Encode()
446+
447+
data, err = Get[[]*api.Event](rc, requestUrl.String())
448+
if err != nil {
449+
return nil, fmt.Errorf("get events api err: %w", err)
450+
}
451+
return data, nil
452+
}
453+
454+
pages := (effectiveLimit + eventsPageSize - 1) / eventsPageSize
455+
channels := make([]chan ConcResponse[[]*api.Event], pages)
456+
457+
for i := uint64(0); i < pages; i++ {
458+
channels[i] = make(chan ConcResponse[[]*api.Event], 1)
459+
pageStart := effectiveStart + (i * eventsPageSize)
460+
pageLimit := min(eventsPageSize, effectiveLimit-(i*eventsPageSize))
461+
462+
go fetch(func() ([]*api.Event, error) {
463+
params := url.Values{}
464+
params.Set("start", strconv.FormatUint(pageStart, 10))
465+
params.Set("limit", strconv.FormatUint(pageLimit, 10))
466+
467+
requestUrl := *baseUrl
468+
requestUrl.RawQuery = params.Encode()
469+
470+
events, err := Get[[]*api.Event](rc, requestUrl.String())
471+
if err != nil {
472+
return nil, fmt.Errorf("get events api err: %w", err)
473+
}
474+
return events, nil
475+
}, channels[i])
476+
}
477+
478+
events := make([]*api.Event, 0, effectiveLimit)
479+
for i, ch := range channels {
480+
response := <-ch
481+
if response.Err != nil {
482+
return nil, response.Err
483+
}
484+
events = append(events, response.Result...)
485+
close(channels[i])
486+
}
487+
488+
sort.Slice(events, func(i, j int) bool {
489+
return events[i].SequenceNumber < events[j].SequenceNumber
490+
})
491+
492+
return events, nil
493+
}
494+
410495
// handleTransactions is a helper function for fetching transactions
411496
//
412497
// It will fetch the transactions from the node in a single request if possible, otherwise it will fetch them concurrently.

nodeClient_test.go

+98-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
package aptos
22

33
import (
4-
"github.com/stretchr/testify/assert"
4+
"encoding/json"
5+
"fmt"
6+
"net/http"
7+
"net/http/httptest"
8+
"strconv"
59
"testing"
610
"time"
11+
12+
"github.com/stretchr/testify/assert"
713
)
814

915
func TestPollForTransaction(t *testing.T) {
@@ -20,3 +26,94 @@ func TestPollForTransaction(t *testing.T) {
2026
assert.Less(t, dt, 20*time.Millisecond)
2127
assert.Error(t, err)
2228
}
29+
30+
func TestEventsByHandle(t *testing.T) {
31+
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
32+
if r.URL.Path == "/" {
33+
// handle initial request from client
34+
w.WriteHeader(http.StatusOK)
35+
return
36+
}
37+
38+
assert.Equal(t, "/accounts/0x0/events/0x2/transfer", r.URL.Path)
39+
40+
start := r.URL.Query().Get("start")
41+
limit := r.URL.Query().Get("limit")
42+
43+
startInt, _ := strconv.ParseUint(start, 10, 64)
44+
limitInt, _ := strconv.ParseUint(limit, 10, 64)
45+
46+
events := make([]map[string]interface{}, 0, limitInt)
47+
for i := uint64(0); i < limitInt; i++ {
48+
events = append(events, map[string]interface{}{
49+
"type": "0x1::coin::TransferEvent",
50+
"guid": map[string]interface{}{
51+
"creation_number": "1",
52+
"account_address": AccountZero.String(),
53+
},
54+
"sequence_number": strconv.FormatUint(startInt+i, 10),
55+
"data": map[string]interface{}{
56+
"amount": fmt.Sprintf("%d", (startInt+i)*100),
57+
},
58+
})
59+
}
60+
61+
json.NewEncoder(w).Encode(events)
62+
}))
63+
defer mockServer.Close()
64+
65+
client, err := NewClient(NetworkConfig{
66+
Name: "mocknet",
67+
NodeUrl: mockServer.URL,
68+
})
69+
assert.NoError(t, err)
70+
71+
t.Run("pagination with concurrent fetching", func(t *testing.T) {
72+
start := uint64(0)
73+
limit := uint64(150)
74+
events, err := client.EventsByHandle(
75+
AccountZero,
76+
"0x2",
77+
"transfer",
78+
&start,
79+
&limit,
80+
)
81+
82+
assert.NoError(t, err)
83+
assert.Len(t, events, 150)
84+
})
85+
86+
t.Run("default page size when limit not provided", func(t *testing.T) {
87+
events, err := client.EventsByHandle(
88+
AccountZero,
89+
"0x2",
90+
"transfer",
91+
nil,
92+
nil,
93+
)
94+
95+
assert.NoError(t, err)
96+
assert.Len(t, events, 100)
97+
assert.Equal(t, uint64(99), events[99].SequenceNumber)
98+
})
99+
100+
t.Run("single page fetch", func(t *testing.T) {
101+
start := uint64(50)
102+
limit := uint64(5)
103+
events, err := client.EventsByHandle(
104+
AccountZero,
105+
"0x2",
106+
"transfer",
107+
&start,
108+
&limit,
109+
)
110+
111+
jsonBytes, _ := json.MarshalIndent(events, "", " ")
112+
t.Logf("JSON Response: %s", string(jsonBytes))
113+
114+
assert.NoError(t, err)
115+
assert.Len(t, events, 5)
116+
assert.Equal(t, uint64(50), events[0].SequenceNumber)
117+
assert.Equal(t, uint64(54), events[4].SequenceNumber)
118+
})
119+
}

0 commit comments

Comments
 (0)