diff --git a/handler/batch_worker.go b/handler/batch_worker.go index cb86f7b..bd83d49 100644 --- a/handler/batch_worker.go +++ b/handler/batch_worker.go @@ -20,7 +20,7 @@ type BatchWorker[T any] struct { Reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string) HandleError func(context.Context, []byte, map[string]string) error Retry func(context.Context, []byte, map[string]string) error - limitRetry int + LimitRetry int RetryCountName string Goroutine bool messages []Message[T] @@ -31,17 +31,49 @@ type BatchWorker[T any] struct { LogInfo func(context.Context, string) } +type BatchConfig struct { + RetryCountName string `yaml:"retry_count_name" mapstructure:"retry_count_name" json:"retryCountName,omitempty" gorm:"column:retrycountname" bson:"retryCountName,omitempty" dynamodbav:"retryCountName,omitempty" firestore:"retryCountName,omitempty"` + LimitRetry int `yaml:"limit_retry" mapstructure:"limit_retry" json:"limitRetry,omitempty" gorm:"column:limitretry" bson:"limitRetry,omitempty" dynamodbav:"limitRetry,omitempty" firestore:"limitRetry,omitempty"` + Goroutines bool `yaml:"goroutines" mapstructure:"goroutines" json:"goroutines,omitempty" gorm:"column:goroutines" bson:"goroutines,omitempty" dynamodbav:"goroutines,omitempty" firestore:"goroutines,omitempty"` + Key string `yaml:"key" mapstructure:"key" json:"key,omitempty" gorm:"column:key" bson:"key,omitempty" dynamodbav:"key,omitempty" firestore:"key,omitempty"` + Timeout int64 `yaml:"timeout" mapstructure:"timeout" json:"timeout,omitempty" gorm:"column:timeout" bson:"timeout,omitempty" dynamodbav:"timeout,omitempty" firestore:"timeout,omitempty"` + BatchSize int `yaml:"batch_size" mapstructure:"batch_size" json:"batchSize,omitempty" gorm:"column:batchsize" bson:"batchSize,omitempty" dynamodbav:"batchSize,omitempty" firestore:"batchSize,omitempty"` +} + +func NewBatchWorkerByConfig[T any]( + c BatchConfig, + handle func(context.Context, []Message[T]) ([]Message[T], error), + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handleError func(context.Context, []byte, map[string]string) error, + retry func(context.Context, []byte, map[string]string) error, + logs ...func(context.Context, string)) *BatchWorker[T] { + return NewBatchWorker[T](c.BatchSize, c.Timeout, nil, handle, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...) +} +func NewBatchWorkerByConfigAndUnmarshal[T any]( + c BatchConfig, + unmarshal func(data []byte, v any) error, + handle func(context.Context, []Message[T]) ([]Message[T], error), + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handleError func(context.Context, []byte, map[string]string) error, + retry func(context.Context, []byte, map[string]string) error, + logs ...func(context.Context, string)) *BatchWorker[T] { + return NewBatchWorker[T](c.BatchSize, c.Timeout, unmarshal, handle, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...) +} func NewBatchWorker[T any]( batchSize int, timeout int64, unmarshal func(data []byte, v any) error, + handle func(context.Context, []Message[T]) ([]Message[T], error), validate func(context.Context, *T) ([]ErrorMessage, error), reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), - handle func(context.Context, []Message[T]) ([]Message[T], error), + handleError func(context.Context, []byte, map[string]string) error, retry func(context.Context, []byte, map[string]string) error, limitRetry int, retryCountName string, - handleError func(context.Context, []byte, map[string]string) error, - goroutine bool, logs ...func(context.Context, string)) *BatchWorker[T] { + goroutine bool, + key string, + logs ...func(context.Context, string)) *BatchWorker[T] { if len(retryCountName) == 0 { retryCountName = "retryCount" } @@ -52,14 +84,15 @@ func NewBatchWorker[T any]( batchSize: batchSize, timeout: timeout, Unmarshal: unmarshal, + handle: handle, Validate: validate, Reject: reject, - handle: handle, + HandleError: handleError, Retry: retry, - limitRetry: limitRetry, + LimitRetry: limitRetry, RetryCountName: retryCountName, - HandleError: handleError, Goroutine: goroutine, + Key: key, } if len(logs) >= 1 { w.LogError = logs[0] @@ -167,10 +200,10 @@ func (w *BatchWorker[T]) execute(ctx context.Context) { } } retryCount++ - if retryCount > w.limitRetry { + if retryCount > w.LimitRetry { if w.LogInfo != nil { x := CreateLog(errList[i].Data, errList[i].Attributes) - w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, w.limitRetry, x)) + w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, w.LimitRetry, x)) } if w.HandleError != nil { w.HandleError(ctx, errList[i].Data, errList[i].Attributes) diff --git a/handler/handler.go b/handler/handler.go index 192aacb..79d6004 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -21,16 +21,25 @@ type Handler[T any] struct { } func NewHandlerByConfig[T any](c *RetryConfig, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte), + handleError func(context.Context, []byte), + goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] { + return NewHandlerByConfigAndUnmarshal[T](c, nil, write, validate, reject, handleError, goroutines, key, logs...) +} +func NewHandlerByConfigAndUnmarshal[T any](c *RetryConfig, + unmarshal func(data []byte, v any) error, write func(context.Context, *T) error, validate func(context.Context, *T) ([]ErrorMessage, error), reject func(context.Context, *T, []ErrorMessage, []byte), handleError func(context.Context, []byte), goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] { if c == nil { - return NewHandlerWithKey[T](nil, write, validate, reject, handleError, nil, goroutines, key, logs...) + return NewHandlerWithKey[T](unmarshal, write, validate, reject, handleError, nil, goroutines, key, logs...) } else { retries := DurationsFromValue(*c, "Retry", 20) - return NewHandlerWithKey[T](nil, write, validate, reject, handleError, retries, goroutines, key, logs...) + return NewHandlerWithKey[T](unmarshal, write, validate, reject, handleError, retries, goroutines, key, logs...) } } func NewHandlerWithKey[T any]( diff --git a/handler/retry_handler.go b/handler/retry_handler.go index 62c6c80..a1c005d 100644 --- a/handler/retry_handler.go +++ b/handler/retry_handler.go @@ -29,6 +29,27 @@ type RetryHandler[T any] struct { Key string } +func NewRetryHandlerByConfig[T any]( + c HandlerConfig, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handleError func(context.Context, []byte, map[string]string), + retry func(context.Context, []byte, map[string]string) error, + logs ...func(context.Context, string)) *RetryHandler[T] { + return NewRetryHandler[T](nil, write, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...) +} +func NewRetryHandlerByConfigAndUnmarshal[T any]( + c HandlerConfig, + unmarshal func(data []byte, v any) error, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handleError func(context.Context, []byte, map[string]string), + retry func(context.Context, []byte, map[string]string) error, + logs ...func(context.Context, string)) *RetryHandler[T] { + return NewRetryHandler[T](unmarshal, write, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...) +} func NewRetryHandler[T any]( unmarshal func(data []byte, v any) error, write func(context.Context, *T) error,