Skip to content

Commit 51ae856

Browse files
committed
Add Configurable Worker Pool for Transaction Processing
1 parent 68dda3a commit 51ae856

File tree

4 files changed

+354
-0
lines changed

4 files changed

+354
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package integration_test
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/aptos-labs/aptos-go-sdk"
9+
"github.com/aptos-labs/aptos-go-sdk/internal/testutil"
10+
)
11+
12+
func TestBuildSignAndSubmitTransactionsWithSignFnAndWorkerPoolWithMultipleSenders(t *testing.T) {
13+
const (
14+
numSenders = 3
15+
txPerSender = 5
16+
initialFunding = uint64(100_000_000)
17+
transfer_amount = uint64(100)
18+
)
19+
20+
clients := testutil.SetupTestClients(t)
21+
22+
// Create and fund senders
23+
senders := make([]testutil.TestAccount, numSenders)
24+
for i := 0; i < numSenders; i++ {
25+
senders[i] = testutil.SetupTestAccount(t, clients.Client, initialFunding)
26+
}
27+
28+
receiver := testutil.SetupTestAccount(t, clients.Client, 0)
29+
30+
startTime := time.Now()
31+
32+
// Process transactions for each sender
33+
doneCh := make(chan struct{})
34+
35+
for senderIdx := 0; senderIdx < numSenders; senderIdx++ {
36+
go func(senderIdx int) {
37+
defer func() {
38+
doneCh <- struct{}{}
39+
}()
40+
41+
sender := senders[senderIdx]
42+
payloads := make(chan aptos.TransactionBuildPayload, txPerSender)
43+
responses := make(chan aptos.TransactionSubmissionResponse, txPerSender)
44+
45+
go clients.NodeClient.BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool(
46+
sender.Account.Address,
47+
payloads,
48+
responses,
49+
func(rawTxn aptos.RawTransactionImpl) (*aptos.SignedTransaction, error) {
50+
switch txn := rawTxn.(type) {
51+
case *aptos.RawTransaction:
52+
return txn.SignedTransaction(sender.Account)
53+
default:
54+
return nil, fmt.Errorf("unsupported transaction type")
55+
}
56+
},
57+
aptos.WorkerPoolConfig{NumWorkers: 3},
58+
)
59+
60+
workerStartTime := time.Now()
61+
for txNum := 0; txNum < txPerSender; txNum++ {
62+
payload := testutil.CreateTransferPayload(t, receiver.Account.Address, transfer_amount)
63+
payloads <- aptos.TransactionBuildPayload{
64+
Id: uint64(txNum),
65+
Inner: payload,
66+
Type: aptos.TransactionSubmissionTypeSingle,
67+
}
68+
}
69+
close(payloads)
70+
71+
for i := 0; i < txPerSender; i++ {
72+
resp := <-responses
73+
if resp.Err != nil {
74+
t.Errorf("Transaction failed: %v", resp.Err)
75+
continue
76+
}
77+
fmt.Printf("[%s] Worker %d → hash: %s\n",
78+
time.Now().Format("15:04:05.000"),
79+
senderIdx,
80+
resp.Response.Hash)
81+
}
82+
83+
fmt.Printf("[%s] Worker %d completed all transactions (t+%v)\n",
84+
time.Now().Format("15:04:05.000"),
85+
senderIdx,
86+
time.Since(workerStartTime).Round(time.Millisecond))
87+
}(senderIdx)
88+
}
89+
90+
// Wait for all senders to complete
91+
for i := 0; i < numSenders; i++ {
92+
<-doneCh
93+
}
94+
95+
duration := time.Since(startTime)
96+
fmt.Printf("\nTotal Duration: %v\n", duration)
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package integration_test
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/aptos-labs/aptos-go-sdk"
9+
"github.com/aptos-labs/aptos-go-sdk/internal/testutil"
10+
)
11+
12+
func TestBuildSignAndSubmitTransactionsWithSignFnAndWorkerPoolWithOneSender(t *testing.T) {
13+
const (
14+
numTransactions = 5
15+
transferAmount = uint64(100)
16+
numWorkers = 3
17+
initialFunding = uint64(100_000_000)
18+
)
19+
20+
clients := testutil.SetupTestClients(t)
21+
sender := testutil.SetupTestAccount(t, clients.Client, initialFunding)
22+
receiver := testutil.SetupTestAccount(t, clients.Client, 0)
23+
24+
payloads := make(chan aptos.TransactionBuildPayload, numTransactions)
25+
responses := make(chan aptos.TransactionSubmissionResponse, numTransactions)
26+
workerPoolConfig := aptos.WorkerPoolConfig{
27+
NumWorkers: numWorkers,
28+
BuildResponseBuffer: numTransactions,
29+
SubmissionBuffer: numTransactions,
30+
}
31+
32+
startTime := time.Now()
33+
34+
go clients.NodeClient.BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool(
35+
sender.Account.Address,
36+
payloads,
37+
responses,
38+
func(rawTxn aptos.RawTransactionImpl) (*aptos.SignedTransaction, error) {
39+
switch txn := rawTxn.(type) {
40+
case *aptos.RawTransaction:
41+
return txn.SignedTransaction(sender.Account)
42+
default:
43+
return nil, fmt.Errorf("unsupported transaction type")
44+
}
45+
},
46+
workerPoolConfig,
47+
)
48+
49+
for txNum := 0; txNum < numTransactions; txNum++ {
50+
payload := testutil.CreateTransferPayload(t, receiver.Account.Address, transferAmount)
51+
payloads <- aptos.TransactionBuildPayload{
52+
Id: uint64(txNum),
53+
Inner: payload,
54+
Type: aptos.TransactionSubmissionTypeSingle,
55+
}
56+
}
57+
close(payloads)
58+
59+
for i := 0; i < numTransactions; i++ {
60+
resp := <-responses
61+
if resp.Err != nil {
62+
t.Errorf("Transaction failed: %v", resp.Err)
63+
continue
64+
}
65+
fmt.Printf("[%s] hash: %s\n",
66+
time.Now().Format("15:04:05.000"),
67+
resp.Response.Hash)
68+
}
69+
70+
duration := time.Since(startTime)
71+
fmt.Printf("\nTotal Duration: %v\n", duration)
72+
}

internal/testutil/test_helpers.go

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package testutil
2+
3+
import (
4+
"testing"
5+
6+
"github.com/aptos-labs/aptos-go-sdk"
7+
)
8+
9+
// TestClients holds the clients needed for testing
10+
type TestClients struct {
11+
NodeClient *aptos.NodeClient
12+
Client *aptos.Client
13+
}
14+
15+
func CreateTestClient() (*aptos.Client, error) {
16+
return aptos.NewClient(aptos.DevnetConfig)
17+
}
18+
19+
func CreateTestNodeClient() (*aptos.NodeClient, error) {
20+
return aptos.NewNodeClient(aptos.DevnetConfig.NodeUrl, aptos.DevnetConfig.ChainId)
21+
}
22+
23+
func SetupTestClients(t *testing.T) *TestClients {
24+
nodeClient, err := CreateTestNodeClient()
25+
if err != nil {
26+
t.Fatalf("Failed to create NodeClient: %v", err)
27+
}
28+
29+
client, err := CreateTestClient()
30+
if err != nil {
31+
t.Fatalf("Failed to create Client: %v", err)
32+
}
33+
34+
return &TestClients{
35+
NodeClient: nodeClient,
36+
Client: client,
37+
}
38+
}
39+
40+
func CreateTransferPayload(t *testing.T, receiver aptos.AccountAddress, amount uint64) aptos.TransactionPayload {
41+
p, err := aptos.CoinTransferPayload(nil, receiver, amount)
42+
if err != nil {
43+
t.Fatalf("Failed to create transfer payload: %v", err)
44+
}
45+
return aptos.TransactionPayload{Payload: p}
46+
}
47+
48+
// TestAccount represents a funded account for testing
49+
type TestAccount struct {
50+
Account *aptos.Account
51+
InitialBalance uint64
52+
}
53+
54+
func SetupTestAccount(t *testing.T, client *aptos.Client, funding uint64) TestAccount {
55+
account, err := aptos.NewEd25519Account()
56+
if err != nil {
57+
t.Fatalf("Failed to create account: %v", err)
58+
}
59+
60+
err = client.Fund(account.Address, funding)
61+
if err != nil {
62+
t.Fatalf("Failed to fund account: %v", err)
63+
}
64+
65+
balance, err := client.AccountAPTBalance(account.Address)
66+
if err != nil {
67+
t.Fatalf("Failed to get initial balance: %v", err)
68+
}
69+
70+
return TestAccount{
71+
Account: account,
72+
InitialBalance: balance,
73+
}
74+
}

transactionSubmission.go

+111
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const (
1616
TransactionSubmissionTypeSingle TransactionSubmissionType = iota
1717
// TransactionSubmissionTypeMultiAgent represents a multi-agent or fee payer transaction
1818
TransactionSubmissionTypeMultiAgent TransactionSubmissionType = iota
19+
defaultWorkerCount uint32 = 20
1920
)
2021

2122
type TransactionBuildPayload struct {
@@ -41,6 +42,14 @@ type TransactionSubmissionResponse struct {
4142
Err error
4243
}
4344

45+
// WorkerPoolConfig contains configuration for the transaction processing worker pool
46+
type WorkerPoolConfig struct {
47+
NumWorkers uint32
48+
// Channel buffer sizes. If 0, defaults to NumWorkers
49+
BuildResponseBuffer uint32
50+
SubmissionBuffer uint32
51+
}
52+
4453
type SequenceNumberTracker struct {
4554
SequenceNumber atomic.Uint64
4655
}
@@ -391,3 +400,105 @@ func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFunction(
391400
wg.Wait()
392401
close(submissionRequests)
393402
}
403+
404+
func (cfg WorkerPoolConfig) getBufferSizes() (build, submission uint32) {
405+
workers := defaultWorkerCount
406+
if cfg.NumWorkers > 0 {
407+
workers = cfg.NumWorkers
408+
}
409+
410+
build = workers
411+
if cfg.BuildResponseBuffer > 0 {
412+
build = cfg.BuildResponseBuffer
413+
}
414+
415+
submission = workers
416+
if cfg.SubmissionBuffer > 0 {
417+
submission = cfg.SubmissionBuffer
418+
}
419+
420+
return build, submission
421+
}
422+
423+
// startSigningWorkers initializes and starts a pool of worker goroutines for signing transactions.
424+
func startSigningWorkers(
425+
numWorkers uint32,
426+
sign func(rawTxn RawTransactionImpl) (*SignedTransaction, error),
427+
submissionRequests chan<- TransactionSubmissionRequest,
428+
responses chan<- TransactionSubmissionResponse,
429+
signingWg *sync.WaitGroup,
430+
transactionWg *sync.WaitGroup,
431+
) chan TransactionBuildResponse {
432+
transactionsToSign := make(chan TransactionBuildResponse, numWorkers)
433+
434+
signingWg.Add(int(numWorkers))
435+
for i := uint32(0); i < numWorkers; i++ {
436+
go func() {
437+
defer signingWg.Done()
438+
for buildResponse := range transactionsToSign {
439+
signedTxn, err := sign(buildResponse.Response)
440+
if err != nil {
441+
responses <- TransactionSubmissionResponse{Id: buildResponse.Id, Err: err}
442+
} else {
443+
submissionRequests <- TransactionSubmissionRequest{
444+
Id: buildResponse.Id,
445+
SignedTxn: signedTxn,
446+
}
447+
}
448+
transactionWg.Done()
449+
}
450+
}()
451+
}
452+
453+
return transactionsToSign
454+
}
455+
456+
// BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool processes transactions using a fixed-size worker pool.
457+
// It coordinates three stages of the pipeline:
458+
// 1. Building transactions (BuildTransactions)
459+
// 2. Signing transactions (worker pool)
460+
// 3. Submitting transactions (SubmitTransactions)
461+
func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool(
462+
sender AccountAddress,
463+
payloads chan TransactionBuildPayload,
464+
responses chan TransactionSubmissionResponse,
465+
sign func(rawTxn RawTransactionImpl) (*SignedTransaction, error),
466+
workerPoolConfig WorkerPoolConfig,
467+
buildOptions ...any,
468+
) {
469+
buildBuffer, submissionBuffer := workerPoolConfig.getBufferSizes()
470+
numWorkers := workerPoolConfig.NumWorkers
471+
if numWorkers == 0 {
472+
numWorkers = defaultWorkerCount
473+
}
474+
475+
buildResponses := make(chan TransactionBuildResponse, buildBuffer)
476+
setSequenceNumber := make(chan uint64)
477+
go rc.BuildTransactions(sender, payloads, buildResponses, setSequenceNumber, buildOptions...)
478+
479+
submissionRequests := make(chan TransactionSubmissionRequest, submissionBuffer)
480+
go rc.SubmitTransactions(submissionRequests, responses)
481+
482+
var signingWg sync.WaitGroup
483+
var transactionWg sync.WaitGroup
484+
485+
transactionsToSign := startSigningWorkers(numWorkers, sign, submissionRequests, responses, &signingWg, &transactionWg)
486+
487+
for buildResponse := range buildResponses {
488+
if buildResponse.Err != nil {
489+
responses <- TransactionSubmissionResponse{Id: buildResponse.Id, Err: buildResponse.Err}
490+
continue
491+
}
492+
transactionWg.Add(1)
493+
transactionsToSign <- buildResponse
494+
}
495+
496+
// 1. Wait for all transactions to complete processing
497+
transactionWg.Wait()
498+
// 2. Close signing channel to signal workers to shut down
499+
close(transactionsToSign)
500+
// 3. Wait for all workers to finish and clean up
501+
signingWg.Wait()
502+
// 4. Close submission channel after all signing is done
503+
close(submissionRequests)
504+
}

0 commit comments

Comments
 (0)