Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Configurable Worker Pool for Transaction Processing #131

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions integration_test/transactionSubmission/multi_sender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package integration_test

import (
"fmt"
"testing"
"time"

"github.com/aptos-labs/aptos-go-sdk"
"github.com/aptos-labs/aptos-go-sdk/internal/testutil"
)

func TestBuildSignAndSubmitTransactionsWithSignFnAndWorkerPoolWithMultipleSenders(t *testing.T) {
const (
numSenders = 3
txPerSender = 5
initialFunding = uint64(100_000_000)
transfer_amount = uint64(100)
)

clients := testutil.SetupTestClients(t)

// Create and fund senders
senders := make([]testutil.TestAccount, numSenders)
for i := 0; i < numSenders; i++ {
senders[i] = testutil.SetupTestAccount(t, clients.Client, initialFunding)
}

receiver := testutil.SetupTestAccount(t, clients.Client, 0)

startTime := time.Now()

// Process transactions for each sender
doneCh := make(chan struct{})

for senderIdx := 0; senderIdx < numSenders; senderIdx++ {
go func(senderIdx int) {
defer func() {
doneCh <- struct{}{}
}()

sender := senders[senderIdx]
payloads := make(chan aptos.TransactionBuildPayload, txPerSender)
responses := make(chan aptos.TransactionSubmissionResponse, txPerSender)

go clients.NodeClient.BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool(
sender.Account.Address,
payloads,
responses,
func(rawTxn aptos.RawTransactionImpl) (*aptos.SignedTransaction, error) {
switch txn := rawTxn.(type) {
case *aptos.RawTransaction:
return txn.SignedTransaction(sender.Account)
default:
return nil, fmt.Errorf("unsupported transaction type")
}
},
aptos.WorkerPoolConfig{NumWorkers: 3},
)

workerStartTime := time.Now()
for txNum := 0; txNum < txPerSender; txNum++ {
payload := testutil.CreateTransferPayload(t, receiver.Account.Address, transfer_amount)
payloads <- aptos.TransactionBuildPayload{
Id: uint64(txNum),
Inner: payload,
Type: aptos.TransactionSubmissionTypeSingle,
}
}
close(payloads)

for i := 0; i < txPerSender; i++ {
resp := <-responses
if resp.Err != nil {
t.Errorf("Transaction failed: %v", resp.Err)
continue
}
fmt.Printf("[%s] Worker %d → hash: %s\n",
time.Now().Format("15:04:05.000"),
senderIdx,
resp.Response.Hash)
}

fmt.Printf("[%s] Worker %d completed all transactions (t+%v)\n",
time.Now().Format("15:04:05.000"),
senderIdx,
time.Since(workerStartTime).Round(time.Millisecond))
}(senderIdx)
}

// Wait for all senders to complete
for i := 0; i < numSenders; i++ {
<-doneCh
}

duration := time.Since(startTime)
fmt.Printf("\nTotal Duration: %v\n", duration)
}
72 changes: 72 additions & 0 deletions integration_test/transactionSubmission/single_sender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package integration_test

import (
"fmt"
"testing"
"time"

"github.com/aptos-labs/aptos-go-sdk"
"github.com/aptos-labs/aptos-go-sdk/internal/testutil"
)

func TestBuildSignAndSubmitTransactionsWithSignFnAndWorkerPoolWithOneSender(t *testing.T) {
const (
numTransactions = 5
transferAmount = uint64(100)
numWorkers = 3
initialFunding = uint64(100_000_000)
)

clients := testutil.SetupTestClients(t)
sender := testutil.SetupTestAccount(t, clients.Client, initialFunding)
receiver := testutil.SetupTestAccount(t, clients.Client, 0)

payloads := make(chan aptos.TransactionBuildPayload, numTransactions)
responses := make(chan aptos.TransactionSubmissionResponse, numTransactions)
workerPoolConfig := aptos.WorkerPoolConfig{
NumWorkers: numWorkers,
BuildResponseBuffer: numTransactions,
SubmissionBuffer: numTransactions,
}

startTime := time.Now()

go clients.NodeClient.BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool(
sender.Account.Address,
payloads,
responses,
func(rawTxn aptos.RawTransactionImpl) (*aptos.SignedTransaction, error) {
switch txn := rawTxn.(type) {
case *aptos.RawTransaction:
return txn.SignedTransaction(sender.Account)
default:
return nil, fmt.Errorf("unsupported transaction type")
}
},
workerPoolConfig,
)

for txNum := 0; txNum < numTransactions; txNum++ {
payload := testutil.CreateTransferPayload(t, receiver.Account.Address, transferAmount)
payloads <- aptos.TransactionBuildPayload{
Id: uint64(txNum),
Inner: payload,
Type: aptos.TransactionSubmissionTypeSingle,
}
}
close(payloads)

for i := 0; i < numTransactions; i++ {
resp := <-responses
if resp.Err != nil {
t.Errorf("Transaction failed: %v", resp.Err)
continue
}
fmt.Printf("[%s] hash: %s\n",
time.Now().Format("15:04:05.000"),
resp.Response.Hash)
}

duration := time.Since(startTime)
fmt.Printf("\nTotal Duration: %v\n", duration)
}
74 changes: 74 additions & 0 deletions internal/testutil/test_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package testutil

import (
"testing"

"github.com/aptos-labs/aptos-go-sdk"
)

// TestClients holds the clients needed for testing
type TestClients struct {
NodeClient *aptos.NodeClient
Client *aptos.Client
}

func CreateTestClient() (*aptos.Client, error) {
return aptos.NewClient(aptos.DevnetConfig)
}

func CreateTestNodeClient() (*aptos.NodeClient, error) {
return aptos.NewNodeClient(aptos.DevnetConfig.NodeUrl, aptos.DevnetConfig.ChainId)
}

func SetupTestClients(t *testing.T) *TestClients {
nodeClient, err := CreateTestNodeClient()
if err != nil {
t.Fatalf("Failed to create NodeClient: %v", err)
}

client, err := CreateTestClient()
if err != nil {
t.Fatalf("Failed to create Client: %v", err)
}

return &TestClients{
NodeClient: nodeClient,
Client: client,
}
}

func CreateTransferPayload(t *testing.T, receiver aptos.AccountAddress, amount uint64) aptos.TransactionPayload {
p, err := aptos.CoinTransferPayload(nil, receiver, amount)
if err != nil {
t.Fatalf("Failed to create transfer payload: %v", err)
}
return aptos.TransactionPayload{Payload: p}
}

// TestAccount represents a funded account for testing
type TestAccount struct {
Account *aptos.Account
InitialBalance uint64
}

func SetupTestAccount(t *testing.T, client *aptos.Client, funding uint64) TestAccount {
account, err := aptos.NewEd25519Account()
if err != nil {
t.Fatalf("Failed to create account: %v", err)
}

err = client.Fund(account.Address, funding)
if err != nil {
t.Fatalf("Failed to fund account: %v", err)
}

balance, err := client.AccountAPTBalance(account.Address)
if err != nil {
t.Fatalf("Failed to get initial balance: %v", err)
}

return TestAccount{
Account: account,
InitialBalance: balance,
}
}
111 changes: 111 additions & 0 deletions transactionSubmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
TransactionSubmissionTypeSingle TransactionSubmissionType = iota
// TransactionSubmissionTypeMultiAgent represents a multi-agent or fee payer transaction
TransactionSubmissionTypeMultiAgent TransactionSubmissionType = iota
defaultWorkerCount uint32 = 20
)

type TransactionBuildPayload struct {
Expand All @@ -41,6 +42,14 @@ type TransactionSubmissionResponse struct {
Err error
}

// WorkerPoolConfig contains configuration for the transaction processing worker pool
type WorkerPoolConfig struct {
NumWorkers uint32
// Channel buffer sizes. If 0, defaults to NumWorkers
BuildResponseBuffer uint32
SubmissionBuffer uint32
}

type SequenceNumberTracker struct {
SequenceNumber atomic.Uint64
}
Expand Down Expand Up @@ -391,3 +400,105 @@ func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFunction(
wg.Wait()
close(submissionRequests)
}

func (cfg WorkerPoolConfig) getBufferSizes() (build, submission uint32) {
workers := defaultWorkerCount
if cfg.NumWorkers > 0 {
workers = cfg.NumWorkers
}

build = workers
if cfg.BuildResponseBuffer > 0 {
build = cfg.BuildResponseBuffer
}

submission = workers
if cfg.SubmissionBuffer > 0 {
submission = cfg.SubmissionBuffer
}

return build, submission
}

// startSigningWorkers initializes and starts a pool of worker goroutines for signing transactions.
func startSigningWorkers(
numWorkers uint32,
sign func(rawTxn RawTransactionImpl) (*SignedTransaction, error),
submissionRequests chan<- TransactionSubmissionRequest,
responses chan<- TransactionSubmissionResponse,
signingWg *sync.WaitGroup,
transactionWg *sync.WaitGroup,
) chan TransactionBuildResponse {
transactionsToSign := make(chan TransactionBuildResponse, numWorkers)

signingWg.Add(int(numWorkers))
for i := uint32(0); i < numWorkers; i++ {
go func() {
defer signingWg.Done()
for buildResponse := range transactionsToSign {
signedTxn, err := sign(buildResponse.Response)
if err != nil {
responses <- TransactionSubmissionResponse{Id: buildResponse.Id, Err: err}
} else {
submissionRequests <- TransactionSubmissionRequest{
Id: buildResponse.Id,
SignedTxn: signedTxn,
}
}
transactionWg.Done()
}
}()
}

return transactionsToSign
}

// BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool processes transactions using a fixed-size worker pool.
// It coordinates three stages of the pipeline:
// 1. Building transactions (BuildTransactions)
// 2. Signing transactions (worker pool)
// 3. Submitting transactions (SubmitTransactions)
func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool(
sender AccountAddress,
payloads chan TransactionBuildPayload,
responses chan TransactionSubmissionResponse,
sign func(rawTxn RawTransactionImpl) (*SignedTransaction, error),
workerPoolConfig WorkerPoolConfig,
buildOptions ...any,
Comment on lines +466 to +467
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to put the WorkerPoolConfig in the ...any

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at what is passed in buildOptions, they are all blockchain TX related (MaxGasAmount, GasUnitPrice, ExpirationSeconds, SequenceNumber, ChainIdOption), therefore I think it'll be good to ahve these two as separate configs.

) {
buildBuffer, submissionBuffer := workerPoolConfig.getBufferSizes()
numWorkers := workerPoolConfig.NumWorkers
if numWorkers == 0 {
numWorkers = defaultWorkerCount
}

buildResponses := make(chan TransactionBuildResponse, buildBuffer)
setSequenceNumber := make(chan uint64)
go rc.BuildTransactions(sender, payloads, buildResponses, setSequenceNumber, buildOptions...)

submissionRequests := make(chan TransactionSubmissionRequest, submissionBuffer)
go rc.SubmitTransactions(submissionRequests, responses)

var signingWg sync.WaitGroup
var transactionWg sync.WaitGroup

transactionsToSign := startSigningWorkers(numWorkers, sign, submissionRequests, responses, &signingWg, &transactionWg)

for buildResponse := range buildResponses {
if buildResponse.Err != nil {
responses <- TransactionSubmissionResponse{Id: buildResponse.Id, Err: buildResponse.Err}
continue
}
transactionWg.Add(1)
transactionsToSign <- buildResponse
}

// 1. Wait for all transactions to complete processing
transactionWg.Wait()
// 2. Close signing channel to signal workers to shut down
close(transactionsToSign)
// 3. Wait for all workers to finish and clean up
signingWg.Wait()
// 4. Close submission channel after all signing is done
close(submissionRequests)
}