From a032fbc1c0b6323dff8778f2efb77aa34cbfb1b3 Mon Sep 17 00:00:00 2001 From: Charles Billette Date: Thu, 20 Feb 2025 10:04:16 -0500 Subject: [PATCH] Add support for substreams tier1 global request pool address This commit introduces a new configuration option and related logic for specifying a global request pool address for Substreams tier1. --- cmd/apps/substreams_tier1.go | 11 +++++++++++ devel/substreams/substreams.yaml | 1 + 2 files changed, 12 insertions(+) diff --git a/cmd/apps/substreams_tier1.go b/cmd/apps/substreams_tier1.go index f8042b3..74b3e8b 100644 --- a/cmd/apps/substreams_tier1.go +++ b/cmd/apps/substreams_tier1.go @@ -74,6 +74,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root so they should end up on another tier1 instance, assuming you have proper auto-scaling of the number of instances available. `)) cmd.Flags().String("substreams-tier1-global-worker-pool-address", "", "Address of the global worker pool to use for the substreams tier1. (disabled if empty)") + cmd.Flags().String("substreams-tier1-global-request-pool-address", "", "Address of the global worker pool to use for the substreams tier1. (disabled if empty)") cmd.Flags().Duration("substreams-tier1-global-worker-pool-keep-alive-delay", 25*time.Second, "Delay between two keep alive call to the global worker pool. Default is 25s") cmd.Flags().Duration("substreams-tier1-global-request-pool-keep-alive-delay", 25*time.Second, "Delay between two keep alive call to the global worker pool for request. Default is 25s") cmd.Flags().Uint64("substreams-tier1-default-max-request-per-user", 3, "default max request per user, this will be use of the global worker pool is not reachable. Default is 5") @@ -190,6 +191,16 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root viper.GetDuration("substreams-tier1-global-worker-pool-keep-alive-delay"), ).WorkerPool + } + + substreamsGlobalRequestPoolAddress := viper.GetString("substreams-tier1-global-request-pool-address") + if substreamsGlobalRequestPoolAddress != "" { + grpcClientConnection, err := dgrpc.NewInternalNoWaitClientConn(substreamsGlobalRequestPoolAddress) + if err != nil { + return nil, fmt.Errorf("unable to create grpc client connection to global rewquest pool: %w", err) + } + workerPoolClient := pbworker.NewWorkerPoolClient(grpcClientConnection) + defaultMinimalWorkerLifeDuration := time.Duration(viper.GetInt("substreams-tier1-default-minimal-request-life-time-second")) * time.Second globalRequestPool = service.NewGlobalRequestPool( workerPoolClient, diff --git a/devel/substreams/substreams.yaml b/devel/substreams/substreams.yaml index c035569..86af043 100644 --- a/devel/substreams/substreams.yaml +++ b/devel/substreams/substreams.yaml @@ -15,6 +15,7 @@ start: substreams-tier1-subrequests-endpoint: :9001 substreams-tier1-global-worker-pool-address: :9002 + substreams-tier1-global-request-pool-address: :9002 substreams-tier1-global-worker-pool-keep-alive-delay: 1s substreams-tier2-grpc-listen-addr: :9001