Skip to content

Commit

Permalink
Refactor new
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Jun 2, 2024
1 parent fb85836 commit cc736ca
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 11 deletions.
51 changes: 42 additions & 9 deletions handler/batch_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type BatchWorker[T any] struct {
Reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string)
HandleError func(context.Context, []byte, map[string]string) error
Retry func(context.Context, []byte, map[string]string) error
limitRetry int
LimitRetry int
RetryCountName string
Goroutine bool
messages []Message[T]
Expand All @@ -31,17 +31,49 @@ type BatchWorker[T any] struct {
LogInfo func(context.Context, string)
}

type BatchConfig struct {
RetryCountName string `yaml:"retry_count_name" mapstructure:"retry_count_name" json:"retryCountName,omitempty" gorm:"column:retrycountname" bson:"retryCountName,omitempty" dynamodbav:"retryCountName,omitempty" firestore:"retryCountName,omitempty"`
LimitRetry int `yaml:"limit_retry" mapstructure:"limit_retry" json:"limitRetry,omitempty" gorm:"column:limitretry" bson:"limitRetry,omitempty" dynamodbav:"limitRetry,omitempty" firestore:"limitRetry,omitempty"`
Goroutines bool `yaml:"goroutines" mapstructure:"goroutines" json:"goroutines,omitempty" gorm:"column:goroutines" bson:"goroutines,omitempty" dynamodbav:"goroutines,omitempty" firestore:"goroutines,omitempty"`
Key string `yaml:"key" mapstructure:"key" json:"key,omitempty" gorm:"column:key" bson:"key,omitempty" dynamodbav:"key,omitempty" firestore:"key,omitempty"`
Timeout int64 `yaml:"timeout" mapstructure:"timeout" json:"timeout,omitempty" gorm:"column:timeout" bson:"timeout,omitempty" dynamodbav:"timeout,omitempty" firestore:"timeout,omitempty"`
BatchSize int `yaml:"batch_size" mapstructure:"batch_size" json:"batchSize,omitempty" gorm:"column:batchsize" bson:"batchSize,omitempty" dynamodbav:"batchSize,omitempty" firestore:"batchSize,omitempty"`
}

func NewBatchWorkerByConfig[T any](
c BatchConfig,
handle func(context.Context, []Message[T]) ([]Message[T], error),
validate func(context.Context, *T) ([]ErrorMessage, error),
reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string),
handleError func(context.Context, []byte, map[string]string) error,
retry func(context.Context, []byte, map[string]string) error,
logs ...func(context.Context, string)) *BatchWorker[T] {
return NewBatchWorker[T](c.BatchSize, c.Timeout, nil, handle, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...)
}
func NewBatchWorkerByConfigAndUnmarshal[T any](
c BatchConfig,
unmarshal func(data []byte, v any) error,
handle func(context.Context, []Message[T]) ([]Message[T], error),
validate func(context.Context, *T) ([]ErrorMessage, error),
reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string),
handleError func(context.Context, []byte, map[string]string) error,
retry func(context.Context, []byte, map[string]string) error,
logs ...func(context.Context, string)) *BatchWorker[T] {
return NewBatchWorker[T](c.BatchSize, c.Timeout, unmarshal, handle, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...)
}
func NewBatchWorker[T any](
batchSize int, timeout int64,
unmarshal func(data []byte, v any) error,
handle func(context.Context, []Message[T]) ([]Message[T], error),
validate func(context.Context, *T) ([]ErrorMessage, error),
reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string),
handle func(context.Context, []Message[T]) ([]Message[T], error),
handleError func(context.Context, []byte, map[string]string) error,
retry func(context.Context, []byte, map[string]string) error,
limitRetry int,
retryCountName string,
handleError func(context.Context, []byte, map[string]string) error,
goroutine bool, logs ...func(context.Context, string)) *BatchWorker[T] {
goroutine bool,
key string,
logs ...func(context.Context, string)) *BatchWorker[T] {
if len(retryCountName) == 0 {
retryCountName = "retryCount"
}
Expand All @@ -52,14 +84,15 @@ func NewBatchWorker[T any](
batchSize: batchSize,
timeout: timeout,
Unmarshal: unmarshal,
handle: handle,
Validate: validate,
Reject: reject,
handle: handle,
HandleError: handleError,
Retry: retry,
limitRetry: limitRetry,
LimitRetry: limitRetry,
RetryCountName: retryCountName,
HandleError: handleError,
Goroutine: goroutine,
Key: key,
}
if len(logs) >= 1 {
w.LogError = logs[0]
Expand Down Expand Up @@ -167,10 +200,10 @@ func (w *BatchWorker[T]) execute(ctx context.Context) {
}
}
retryCount++
if retryCount > w.limitRetry {
if retryCount > w.LimitRetry {
if w.LogInfo != nil {
x := CreateLog(errList[i].Data, errList[i].Attributes)
w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, w.limitRetry, x))
w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, w.LimitRetry, x))
}
if w.HandleError != nil {
w.HandleError(ctx, errList[i].Data, errList[i].Attributes)
Expand Down
13 changes: 11 additions & 2 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,25 @@ type Handler[T any] struct {
}

func NewHandlerByConfig[T any](c *RetryConfig,
write func(context.Context, *T) error,
validate func(context.Context, *T) ([]ErrorMessage, error),
reject func(context.Context, *T, []ErrorMessage, []byte),
handleError func(context.Context, []byte),
goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] {
return NewHandlerByConfigAndUnmarshal[T](c, nil, write, validate, reject, handleError, goroutines, key, logs...)
}
func NewHandlerByConfigAndUnmarshal[T any](c *RetryConfig,
unmarshal func(data []byte, v any) error,
write func(context.Context, *T) error,
validate func(context.Context, *T) ([]ErrorMessage, error),
reject func(context.Context, *T, []ErrorMessage, []byte),
handleError func(context.Context, []byte),
goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] {
if c == nil {
return NewHandlerWithKey[T](nil, write, validate, reject, handleError, nil, goroutines, key, logs...)
return NewHandlerWithKey[T](unmarshal, write, validate, reject, handleError, nil, goroutines, key, logs...)
} else {
retries := DurationsFromValue(*c, "Retry", 20)
return NewHandlerWithKey[T](nil, write, validate, reject, handleError, retries, goroutines, key, logs...)
return NewHandlerWithKey[T](unmarshal, write, validate, reject, handleError, retries, goroutines, key, logs...)
}
}
func NewHandlerWithKey[T any](
Expand Down
21 changes: 21 additions & 0 deletions handler/retry_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,27 @@ type RetryHandler[T any] struct {
Key string
}

func NewRetryHandlerByConfig[T any](
c HandlerConfig,
write func(context.Context, *T) error,
validate func(context.Context, *T) ([]ErrorMessage, error),
reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string),
handleError func(context.Context, []byte, map[string]string),
retry func(context.Context, []byte, map[string]string) error,
logs ...func(context.Context, string)) *RetryHandler[T] {
return NewRetryHandler[T](nil, write, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...)
}
func NewRetryHandlerByConfigAndUnmarshal[T any](
c HandlerConfig,
unmarshal func(data []byte, v any) error,
write func(context.Context, *T) error,
validate func(context.Context, *T) ([]ErrorMessage, error),
reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string),
handleError func(context.Context, []byte, map[string]string),
retry func(context.Context, []byte, map[string]string) error,
logs ...func(context.Context, string)) *RetryHandler[T] {
return NewRetryHandler[T](unmarshal, write, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...)
}
func NewRetryHandler[T any](
unmarshal func(data []byte, v any) error,
write func(context.Context, *T) error,
Expand Down

0 comments on commit cc736ca

Please sign in to comment.