Skip to content

Commit

Permalink
Merge pull request #33 from KyberNetwork/ft/redis-failovercluster
Browse files Browse the repository at this point in the history
feat: redis client to route randomly for sentinel redis nodes by default
  • Loading branch information
NgoKimPhu authored Jul 3, 2024
2 parents f7aff30 + 5860236 commit 8349df3
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/client/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/cenkalti/backoff/v4"
)

// BackoffCfg is a hotcfg to create a backoff.ExponentialBackOff
type BackoffCfg struct {
backoff.ExponentialBackOff `mapstructure:",squash"`
MaxRetries uint64
Expand Down
2 changes: 2 additions & 0 deletions pkg/client/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

const EthCloseDelay = time.Minute

// EthCfg is hotcfg for eth client. It creates a client that automatically choose to use
// the provided full node rpc or archive node.
type EthCfg struct {
Url string
ArchiveUrl string
Expand Down
2 changes: 2 additions & 0 deletions pkg/client/eth_batchable.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)

// BatchableEthCfg is hotcfg for batchable eth client.
// It batches eth_call's up to be sent together within 1 request to the rpc node.
type BatchableEthCfg struct {
EthCfg `mapstructure:",squash"`
BatchRate time.Duration
Expand Down
3 changes: 3 additions & 0 deletions pkg/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (

const GrpcCloseDelay = time.Minute

// GrpcCfg is hotcfg for grpc client. On update, it
// creates a new grpc client with the provided factory generated by grpc using service proto.
// The client has interceptors for adding client id header, validating requests, adding timeout, metrics and tracing.
type GrpcCfg[T any] struct {
grpcclient.Config `mapstructure:",squash"`
C T // the inner grpc client
Expand Down
8 changes: 2 additions & 6 deletions pkg/client/grpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Config struct {
BaseURL string
MinConnectTimeout time.Duration
ConnectBackoff backoff.Config
IsBlockConnect bool
IsBlockConnect bool // deprecated: see grpc.WithBlock
GRPCCredentials credentials.TransportCredentials
Insecure bool
Compression Compression
Expand Down Expand Up @@ -66,7 +66,7 @@ func New[T any](clientFactory func(grpc.ClientConnInterface) T, applyOptions ...
}

dialOpts := cfg.dialOptions()
grpcConn, err := grpc.Dial(cfg.BaseURL, dialOpts...)
grpcConn, err := grpc.NewClient(cfg.BaseURL, dialOpts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -123,10 +123,6 @@ func (c *Config) dialOptions() []grpc.DialOption {
}
dialOpts = append(dialOpts, grpc.WithConnectParams(connectParams))

if c.IsBlockConnect {
dialOpts = append(dialOpts, grpc.WithBlock())
}

requestHeaders := c.requestHeaders()
unaryInterceptors := []grpc.UnaryClientInterceptor{
validator.UnaryClientInterceptor(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/client/grpcclient/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func WithCompression(compression Compression) ApplyOption {
}
}

// WithBlock is a no-op.
// deprecated: see grpc.WithBlock
func WithBlock() ApplyOption {
return func(c *Config) {
c.IsBlockConnect = true
Expand Down
3 changes: 3 additions & 0 deletions pkg/client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

// HttpCfg is hotcfg for a resty http client. On update, it
// creates a new resty client with the new config
// as well as instruments the client for metrics and tracing.
type HttpCfg struct {
kutils.HttpCfg `mapstructure:",squash"`
C *resty.Client
Expand Down
17 changes: 16 additions & 1 deletion pkg/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (

const RedisCloseDelay = time.Minute

// RedisCfg is hotcfg for redis client. On update, it
// creates FailoverClusterClient with RouteRandomly for sentinel redis by default
// as well as instrumenting the client for metrics and tracing.
type RedisCfg struct {
redis.UniversalOptions `mapstructure:",squash"`
DisableRouteRandomly bool
C redis.UniversalClient
}

Expand All @@ -28,7 +32,8 @@ func (*RedisCfg) OnUpdate(old, new *RedisCfg) {
}
})
}
new.C = redis.NewUniversalClient(&new.UniversalOptions)
new.RouteRandomly = new.RouteRandomly || !new.DisableRouteRandomly
new.C = newRedisClient(&new.UniversalOptions)
if metric.Provider() != nil {
if err := redisotel.InstrumentMetrics(new.C); err != nil {
klog.Errorf(ctx, "RedisCfg.OnUpdate|redisotel.InstrumentMetrics failed|err=%v", err)
Expand All @@ -40,3 +45,13 @@ func (*RedisCfg) OnUpdate(old, new *RedisCfg) {
}
}
}

func newRedisClient(opts *redis.UniversalOptions) redis.UniversalClient {
if opts.MasterName == "" {
return redis.NewUniversalClient(opts)
}
failoverOpts := opts.Failover()
failoverOpts.RouteByLatency = opts.RouteByLatency
failoverOpts.RouteRandomly = opts.RouteRandomly
return redis.NewFailoverClusterClient(failoverOpts)
}

0 comments on commit 8349df3

Please sign in to comment.