Skip to content

Commit

Permalink
Add support for substreams tier1 global request pool address
Browse files Browse the repository at this point in the history
This commit introduces a new configuration option and related logic for specifying a global request pool address for Substreams tier1.
  • Loading branch information
billettc committed Feb 20, 2025
1 parent 0ac5e4e commit a032fbc
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 0 deletions.
11 changes: 11 additions & 0 deletions cmd/apps/substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
so they should end up on another tier1 instance, assuming you have proper auto-scaling of the number of instances available.
`))
cmd.Flags().String("substreams-tier1-global-worker-pool-address", "", "Address of the global worker pool to use for the substreams tier1. (disabled if empty)")
cmd.Flags().String("substreams-tier1-global-request-pool-address", "", "Address of the global worker pool to use for the substreams tier1. (disabled if empty)")
cmd.Flags().Duration("substreams-tier1-global-worker-pool-keep-alive-delay", 25*time.Second, "Delay between two keep alive call to the global worker pool. Default is 25s")
cmd.Flags().Duration("substreams-tier1-global-request-pool-keep-alive-delay", 25*time.Second, "Delay between two keep alive call to the global worker pool for request. Default is 25s")
cmd.Flags().Uint64("substreams-tier1-default-max-request-per-user", 3, "default max request per user, this will be use of the global worker pool is not reachable. Default is 5")
Expand Down Expand Up @@ -190,6 +191,16 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
viper.GetDuration("substreams-tier1-global-worker-pool-keep-alive-delay"),
).WorkerPool

}

substreamsGlobalRequestPoolAddress := viper.GetString("substreams-tier1-global-request-pool-address")
if substreamsGlobalRequestPoolAddress != "" {
grpcClientConnection, err := dgrpc.NewInternalNoWaitClientConn(substreamsGlobalRequestPoolAddress)
if err != nil {
return nil, fmt.Errorf("unable to create grpc client connection to global rewquest pool: %w", err)
}
workerPoolClient := pbworker.NewWorkerPoolClient(grpcClientConnection)

defaultMinimalWorkerLifeDuration := time.Duration(viper.GetInt("substreams-tier1-default-minimal-request-life-time-second")) * time.Second
globalRequestPool = service.NewGlobalRequestPool(
workerPoolClient,
Expand Down
1 change: 1 addition & 0 deletions devel/substreams/substreams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ start:
substreams-tier1-subrequests-endpoint: :9001

substreams-tier1-global-worker-pool-address: :9002
substreams-tier1-global-request-pool-address: :9002
substreams-tier1-global-worker-pool-keep-alive-delay: 1s

substreams-tier2-grpc-listen-addr: :9001

0 comments on commit a032fbc

Please sign in to comment.