From 5860236f59a4356cd5455424d2c80e27df66ca63 Mon Sep 17 00:00:00 2001 From: Phu Ngo <12547020+NgoKimPhu@users.noreply.github.com> Date: Wed, 3 Jul 2024 20:26:53 +0700 Subject: [PATCH] feat: redis client to route randomly for sentinel redis nodes by default --- pkg/client/backoff.go | 1 + pkg/client/eth.go | 2 ++ pkg/client/eth_batchable.go | 2 ++ pkg/client/grpc.go | 3 +++ pkg/client/grpcclient/client.go | 8 ++------ pkg/client/grpcclient/options.go | 2 ++ pkg/client/http.go | 3 +++ pkg/client/redis.go | 17 ++++++++++++++++- 8 files changed, 31 insertions(+), 7 deletions(-) diff --git a/pkg/client/backoff.go b/pkg/client/backoff.go index e8044ac..a0284e3 100644 --- a/pkg/client/backoff.go +++ b/pkg/client/backoff.go @@ -4,6 +4,7 @@ import ( "github.com/cenkalti/backoff/v4" ) +// BackoffCfg is a hotcfg to create a backoff.ExponentialBackOff type BackoffCfg struct { backoff.ExponentialBackOff `mapstructure:",squash"` MaxRetries uint64 diff --git a/pkg/client/eth.go b/pkg/client/eth.go index 73130b3..b07cf2f 100644 --- a/pkg/client/eth.go +++ b/pkg/client/eth.go @@ -18,6 +18,8 @@ import ( const EthCloseDelay = time.Minute +// EthCfg is hotcfg for eth client. It creates a client that automatically choose to use +// the provided full node rpc or archive node. type EthCfg struct { Url string ArchiveUrl string diff --git a/pkg/client/eth_batchable.go b/pkg/client/eth_batchable.go index 6073a84..c74ed65 100644 --- a/pkg/client/eth_batchable.go +++ b/pkg/client/eth_batchable.go @@ -12,6 +12,8 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) +// BatchableEthCfg is hotcfg for batchable eth client. +// It batches eth_call's up to be sent together within 1 request to the rpc node. type BatchableEthCfg struct { EthCfg `mapstructure:",squash"` BatchRate time.Duration diff --git a/pkg/client/grpc.go b/pkg/client/grpc.go index 8e60003..2a63c60 100644 --- a/pkg/client/grpc.go +++ b/pkg/client/grpc.go @@ -12,6 +12,9 @@ import ( const GrpcCloseDelay = time.Minute +// GrpcCfg is hotcfg for grpc client. On update, it +// creates a new grpc client with the provided factory generated by grpc using service proto. +// The client has interceptors for adding client id header, validating requests, adding timeout, metrics and tracing. type GrpcCfg[T any] struct { grpcclient.Config `mapstructure:",squash"` C T // the inner grpc client diff --git a/pkg/client/grpcclient/client.go b/pkg/client/grpcclient/client.go index 1e9ba53..2a6e864 100644 --- a/pkg/client/grpcclient/client.go +++ b/pkg/client/grpcclient/client.go @@ -28,7 +28,7 @@ type Config struct { BaseURL string MinConnectTimeout time.Duration ConnectBackoff backoff.Config - IsBlockConnect bool + IsBlockConnect bool // deprecated: see grpc.WithBlock GRPCCredentials credentials.TransportCredentials Insecure bool Compression Compression @@ -66,7 +66,7 @@ func New[T any](clientFactory func(grpc.ClientConnInterface) T, applyOptions ... } dialOpts := cfg.dialOptions() - grpcConn, err := grpc.Dial(cfg.BaseURL, dialOpts...) + grpcConn, err := grpc.NewClient(cfg.BaseURL, dialOpts...) if err != nil { return nil, err } @@ -123,10 +123,6 @@ func (c *Config) dialOptions() []grpc.DialOption { } dialOpts = append(dialOpts, grpc.WithConnectParams(connectParams)) - if c.IsBlockConnect { - dialOpts = append(dialOpts, grpc.WithBlock()) - } - requestHeaders := c.requestHeaders() unaryInterceptors := []grpc.UnaryClientInterceptor{ validator.UnaryClientInterceptor(), diff --git a/pkg/client/grpcclient/options.go b/pkg/client/grpcclient/options.go index d88f555..ebdd8df 100644 --- a/pkg/client/grpcclient/options.go +++ b/pkg/client/grpcclient/options.go @@ -64,6 +64,8 @@ func WithCompression(compression Compression) ApplyOption { } } +// WithBlock is a no-op. +// deprecated: see grpc.WithBlock func WithBlock() ApplyOption { return func(c *Config) { c.IsBlockConnect = true diff --git a/pkg/client/http.go b/pkg/client/http.go index 089bfd7..31e5204 100644 --- a/pkg/client/http.go +++ b/pkg/client/http.go @@ -7,6 +7,9 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) +// HttpCfg is hotcfg for a resty http client. On update, it +// creates a new resty client with the new config +// as well as instruments the client for metrics and tracing. type HttpCfg struct { kutils.HttpCfg `mapstructure:",squash"` C *resty.Client diff --git a/pkg/client/redis.go b/pkg/client/redis.go index bac1dc7..0ae6ce6 100644 --- a/pkg/client/redis.go +++ b/pkg/client/redis.go @@ -13,8 +13,12 @@ import ( const RedisCloseDelay = time.Minute +// RedisCfg is hotcfg for redis client. On update, it +// creates FailoverClusterClient with RouteRandomly for sentinel redis by default +// as well as instrumenting the client for metrics and tracing. type RedisCfg struct { redis.UniversalOptions `mapstructure:",squash"` + DisableRouteRandomly bool C redis.UniversalClient } @@ -28,7 +32,8 @@ func (*RedisCfg) OnUpdate(old, new *RedisCfg) { } }) } - new.C = redis.NewUniversalClient(&new.UniversalOptions) + new.RouteRandomly = new.RouteRandomly || !new.DisableRouteRandomly + new.C = newRedisClient(&new.UniversalOptions) if metric.Provider() != nil { if err := redisotel.InstrumentMetrics(new.C); err != nil { klog.Errorf(ctx, "RedisCfg.OnUpdate|redisotel.InstrumentMetrics failed|err=%v", err) @@ -40,3 +45,13 @@ func (*RedisCfg) OnUpdate(old, new *RedisCfg) { } } } + +func newRedisClient(opts *redis.UniversalOptions) redis.UniversalClient { + if opts.MasterName == "" { + return redis.NewUniversalClient(opts) + } + failoverOpts := opts.Failover() + failoverOpts.RouteByLatency = opts.RouteByLatency + failoverOpts.RouteRandomly = opts.RouteRandomly + return redis.NewFailoverClusterClient(failoverOpts) +}