diff --git a/integration_test/transactionSubmission/multi_sender_test.go b/integration_test/transactionSubmission/multi_sender_test.go new file mode 100644 index 0000000..0d63b18 --- /dev/null +++ b/integration_test/transactionSubmission/multi_sender_test.go @@ -0,0 +1,97 @@ +package integration_test + +import ( + "fmt" + "testing" + "time" + + "github.com/aptos-labs/aptos-go-sdk" + "github.com/aptos-labs/aptos-go-sdk/internal/testutil" +) + +func TestBuildSignAndSubmitTransactionsWithSignFnAndWorkerPoolWithMultipleSenders(t *testing.T) { + const ( + numSenders = 3 + txPerSender = 5 + initialFunding = uint64(100_000_000) + transfer_amount = uint64(100) + ) + + clients := testutil.SetupTestClients(t) + + // Create and fund senders + senders := make([]testutil.TestAccount, numSenders) + for i := 0; i < numSenders; i++ { + senders[i] = testutil.SetupTestAccount(t, clients.Client, initialFunding) + } + + receiver := testutil.SetupTestAccount(t, clients.Client, 0) + + startTime := time.Now() + + // Process transactions for each sender + doneCh := make(chan struct{}) + + for senderIdx := 0; senderIdx < numSenders; senderIdx++ { + go func(senderIdx int) { + defer func() { + doneCh <- struct{}{} + }() + + sender := senders[senderIdx] + payloads := make(chan aptos.TransactionBuildPayload, txPerSender) + responses := make(chan aptos.TransactionSubmissionResponse, txPerSender) + + go clients.NodeClient.BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool( + sender.Account.Address, + payloads, + responses, + func(rawTxn aptos.RawTransactionImpl) (*aptos.SignedTransaction, error) { + switch txn := rawTxn.(type) { + case *aptos.RawTransaction: + return txn.SignedTransaction(sender.Account) + default: + return nil, fmt.Errorf("unsupported transaction type") + } + }, + aptos.WorkerPoolConfig{NumWorkers: 3}, + ) + + workerStartTime := time.Now() + for txNum := 0; txNum < txPerSender; txNum++ { + payload := testutil.CreateTransferPayload(t, receiver.Account.Address, transfer_amount) + payloads <- aptos.TransactionBuildPayload{ + Id: uint64(txNum), + Inner: payload, + Type: aptos.TransactionSubmissionTypeSingle, + } + } + close(payloads) + + for i := 0; i < txPerSender; i++ { + resp := <-responses + if resp.Err != nil { + t.Errorf("Transaction failed: %v", resp.Err) + continue + } + fmt.Printf("[%s] Worker %d → hash: %s\n", + time.Now().Format("15:04:05.000"), + senderIdx, + resp.Response.Hash) + } + + fmt.Printf("[%s] Worker %d completed all transactions (t+%v)\n", + time.Now().Format("15:04:05.000"), + senderIdx, + time.Since(workerStartTime).Round(time.Millisecond)) + }(senderIdx) + } + + // Wait for all senders to complete + for i := 0; i < numSenders; i++ { + <-doneCh + } + + duration := time.Since(startTime) + fmt.Printf("\nTotal Duration: %v\n", duration) +} diff --git a/integration_test/transactionSubmission/single_sender_test.go b/integration_test/transactionSubmission/single_sender_test.go new file mode 100644 index 0000000..85577be --- /dev/null +++ b/integration_test/transactionSubmission/single_sender_test.go @@ -0,0 +1,72 @@ +package integration_test + +import ( + "fmt" + "testing" + "time" + + "github.com/aptos-labs/aptos-go-sdk" + "github.com/aptos-labs/aptos-go-sdk/internal/testutil" +) + +func TestBuildSignAndSubmitTransactionsWithSignFnAndWorkerPoolWithOneSender(t *testing.T) { + const ( + numTransactions = 5 + transferAmount = uint64(100) + numWorkers = 3 + initialFunding = uint64(100_000_000) + ) + + clients := testutil.SetupTestClients(t) + sender := testutil.SetupTestAccount(t, clients.Client, initialFunding) + receiver := testutil.SetupTestAccount(t, clients.Client, 0) + + payloads := make(chan aptos.TransactionBuildPayload, numTransactions) + responses := make(chan aptos.TransactionSubmissionResponse, numTransactions) + workerPoolConfig := aptos.WorkerPoolConfig{ + NumWorkers: numWorkers, + BuildResponseBuffer: numTransactions, + SubmissionBuffer: numTransactions, + } + + startTime := time.Now() + + go clients.NodeClient.BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool( + sender.Account.Address, + payloads, + responses, + func(rawTxn aptos.RawTransactionImpl) (*aptos.SignedTransaction, error) { + switch txn := rawTxn.(type) { + case *aptos.RawTransaction: + return txn.SignedTransaction(sender.Account) + default: + return nil, fmt.Errorf("unsupported transaction type") + } + }, + workerPoolConfig, + ) + + for txNum := 0; txNum < numTransactions; txNum++ { + payload := testutil.CreateTransferPayload(t, receiver.Account.Address, transferAmount) + payloads <- aptos.TransactionBuildPayload{ + Id: uint64(txNum), + Inner: payload, + Type: aptos.TransactionSubmissionTypeSingle, + } + } + close(payloads) + + for i := 0; i < numTransactions; i++ { + resp := <-responses + if resp.Err != nil { + t.Errorf("Transaction failed: %v", resp.Err) + continue + } + fmt.Printf("[%s] hash: %s\n", + time.Now().Format("15:04:05.000"), + resp.Response.Hash) + } + + duration := time.Since(startTime) + fmt.Printf("\nTotal Duration: %v\n", duration) +} diff --git a/internal/testutil/test_helpers.go b/internal/testutil/test_helpers.go new file mode 100644 index 0000000..94e7a25 --- /dev/null +++ b/internal/testutil/test_helpers.go @@ -0,0 +1,74 @@ +package testutil + +import ( + "testing" + + "github.com/aptos-labs/aptos-go-sdk" +) + +// TestClients holds the clients needed for testing +type TestClients struct { + NodeClient *aptos.NodeClient + Client *aptos.Client +} + +func CreateTestClient() (*aptos.Client, error) { + return aptos.NewClient(aptos.DevnetConfig) +} + +func CreateTestNodeClient() (*aptos.NodeClient, error) { + return aptos.NewNodeClient(aptos.DevnetConfig.NodeUrl, aptos.DevnetConfig.ChainId) +} + +func SetupTestClients(t *testing.T) *TestClients { + nodeClient, err := CreateTestNodeClient() + if err != nil { + t.Fatalf("Failed to create NodeClient: %v", err) + } + + client, err := CreateTestClient() + if err != nil { + t.Fatalf("Failed to create Client: %v", err) + } + + return &TestClients{ + NodeClient: nodeClient, + Client: client, + } +} + +func CreateTransferPayload(t *testing.T, receiver aptos.AccountAddress, amount uint64) aptos.TransactionPayload { + p, err := aptos.CoinTransferPayload(nil, receiver, amount) + if err != nil { + t.Fatalf("Failed to create transfer payload: %v", err) + } + return aptos.TransactionPayload{Payload: p} +} + +// TestAccount represents a funded account for testing +type TestAccount struct { + Account *aptos.Account + InitialBalance uint64 +} + +func SetupTestAccount(t *testing.T, client *aptos.Client, funding uint64) TestAccount { + account, err := aptos.NewEd25519Account() + if err != nil { + t.Fatalf("Failed to create account: %v", err) + } + + err = client.Fund(account.Address, funding) + if err != nil { + t.Fatalf("Failed to fund account: %v", err) + } + + balance, err := client.AccountAPTBalance(account.Address) + if err != nil { + t.Fatalf("Failed to get initial balance: %v", err) + } + + return TestAccount{ + Account: account, + InitialBalance: balance, + } +} diff --git a/transactionSubmission.go b/transactionSubmission.go index e0f454d..6713ce7 100644 --- a/transactionSubmission.go +++ b/transactionSubmission.go @@ -16,6 +16,7 @@ const ( TransactionSubmissionTypeSingle TransactionSubmissionType = iota // TransactionSubmissionTypeMultiAgent represents a multi-agent or fee payer transaction TransactionSubmissionTypeMultiAgent TransactionSubmissionType = iota + defaultWorkerCount uint32 = 20 ) type TransactionBuildPayload struct { @@ -41,6 +42,14 @@ type TransactionSubmissionResponse struct { Err error } +// WorkerPoolConfig contains configuration for the transaction processing worker pool +type WorkerPoolConfig struct { + NumWorkers uint32 + // Channel buffer sizes. If 0, defaults to NumWorkers + BuildResponseBuffer uint32 + SubmissionBuffer uint32 +} + type SequenceNumberTracker struct { SequenceNumber atomic.Uint64 } @@ -391,3 +400,105 @@ func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFunction( wg.Wait() close(submissionRequests) } + +func (cfg WorkerPoolConfig) getBufferSizes() (build, submission uint32) { + workers := defaultWorkerCount + if cfg.NumWorkers > 0 { + workers = cfg.NumWorkers + } + + build = workers + if cfg.BuildResponseBuffer > 0 { + build = cfg.BuildResponseBuffer + } + + submission = workers + if cfg.SubmissionBuffer > 0 { + submission = cfg.SubmissionBuffer + } + + return build, submission +} + +// startSigningWorkers initializes and starts a pool of worker goroutines for signing transactions. +func startSigningWorkers( + numWorkers uint32, + sign func(rawTxn RawTransactionImpl) (*SignedTransaction, error), + submissionRequests chan<- TransactionSubmissionRequest, + responses chan<- TransactionSubmissionResponse, + signingWg *sync.WaitGroup, + transactionWg *sync.WaitGroup, +) chan TransactionBuildResponse { + transactionsToSign := make(chan TransactionBuildResponse, numWorkers) + + signingWg.Add(int(numWorkers)) + for i := uint32(0); i < numWorkers; i++ { + go func() { + defer signingWg.Done() + for buildResponse := range transactionsToSign { + signedTxn, err := sign(buildResponse.Response) + if err != nil { + responses <- TransactionSubmissionResponse{Id: buildResponse.Id, Err: err} + } else { + submissionRequests <- TransactionSubmissionRequest{ + Id: buildResponse.Id, + SignedTxn: signedTxn, + } + } + transactionWg.Done() + } + }() + } + + return transactionsToSign +} + +// BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool processes transactions using a fixed-size worker pool. +// It coordinates three stages of the pipeline: +// 1. Building transactions (BuildTransactions) +// 2. Signing transactions (worker pool) +// 3. Submitting transactions (SubmitTransactions) +func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFnAndWorkerPool( + sender AccountAddress, + payloads chan TransactionBuildPayload, + responses chan TransactionSubmissionResponse, + sign func(rawTxn RawTransactionImpl) (*SignedTransaction, error), + workerPoolConfig WorkerPoolConfig, + buildOptions ...any, +) { + buildBuffer, submissionBuffer := workerPoolConfig.getBufferSizes() + numWorkers := workerPoolConfig.NumWorkers + if numWorkers == 0 { + numWorkers = defaultWorkerCount + } + + buildResponses := make(chan TransactionBuildResponse, buildBuffer) + setSequenceNumber := make(chan uint64) + go rc.BuildTransactions(sender, payloads, buildResponses, setSequenceNumber, buildOptions...) + + submissionRequests := make(chan TransactionSubmissionRequest, submissionBuffer) + go rc.SubmitTransactions(submissionRequests, responses) + + var signingWg sync.WaitGroup + var transactionWg sync.WaitGroup + + transactionsToSign := startSigningWorkers(numWorkers, sign, submissionRequests, responses, &signingWg, &transactionWg) + + for buildResponse := range buildResponses { + if buildResponse.Err != nil { + responses <- TransactionSubmissionResponse{Id: buildResponse.Id, Err: buildResponse.Err} + continue + } + transactionWg.Add(1) + transactionsToSign <- buildResponse + } + + // 1. Wait for all transactions to complete processing + transactionWg.Wait() + // 2. Close signing channel to signal workers to shut down + close(transactionsToSign) + // 3. Wait for all workers to finish and clean up + signingWg.Wait() + // 4. Close submission channel after all signing is done + close(submissionRequests) +}