From fb85836dec752311190864c28bcc8e8157b49533 Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Sun, 2 Jun 2024 15:19:36 +0700 Subject: [PATCH] Refactor handler --- handler/batch_handler.go | 7 ++-- handler/batch_worker.go | 52 ++++++++++++++++++++--------- handler/common.go | 21 +++++++++--- handler/handler.go | 37 +++++++++++++++++---- handler/retry_handler.go | 71 ++++++++++++++++++++++++++-------------- 5 files changed, 134 insertions(+), 54 deletions(-) diff --git a/handler/batch_handler.go b/handler/batch_handler.go index 52801c2..e1b6449 100644 --- a/handler/batch_handler.go +++ b/handler/batch_handler.go @@ -29,9 +29,10 @@ func (h *BatchHandler[T]) Handle(ctx context.Context, data []Message[T]) ([]Mess if err != nil { return failMessages, err } - if len(failIndices) > 0 { - for _, failIndex := range failIndices { - failMessages = append(failMessages, data[failIndex]) + sl := len(failIndices) + if sl > 0 { + for j := 0; j < sl; j++ { + failMessages = append(failMessages, data[failIndices[j]]) } } } diff --git a/handler/batch_worker.go b/handler/batch_worker.go index f994a2b..cb86f7b 100644 --- a/handler/batch_worker.go +++ b/handler/batch_worker.go @@ -14,13 +14,14 @@ const TimeFormat = "15:04:05.000" type BatchWorker[T any] struct { batchSize int timeout int64 - limitRetry int + Unmarshal func(data []byte, v any) error handle func(ctx context.Context, data []Message[T]) ([]Message[T], error) Validate func(context.Context, *T) ([]ErrorMessage, error) - HandleError func(context.Context, *T, []ErrorMessage, []byte, map[string]string) + Reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string) + HandleError func(context.Context, []byte, map[string]string) error Retry func(context.Context, []byte, map[string]string) error + limitRetry int RetryCountName string - Error func(context.Context, []byte, map[string]string) error Goroutine bool messages []Message[T] latestExecutedTime time.Time @@ -30,19 +31,34 @@ type BatchWorker[T any] struct { LogInfo func(context.Context, string) } -func NewBatchWorker[T any](batchSize int, timeout int64, limitRetry int, handle func(context.Context, []Message[T]) ([]Message[T], error), retry func(context.Context, []byte, map[string]string) error, retryCountName string, handleError func(context.Context, []byte, map[string]string) error, goroutine bool, logs ...func(context.Context, string)) *BatchWorker[T] { +func NewBatchWorker[T any]( + batchSize int, timeout int64, + unmarshal func(data []byte, v any) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handle func(context.Context, []Message[T]) ([]Message[T], error), + retry func(context.Context, []byte, map[string]string) error, + limitRetry int, + retryCountName string, + handleError func(context.Context, []byte, map[string]string) error, + goroutine bool, logs ...func(context.Context, string)) *BatchWorker[T] { if len(retryCountName) == 0 { retryCountName = "retryCount" } - + if unmarshal == nil { + unmarshal = json.Unmarshal + } w := &BatchWorker[T]{ batchSize: batchSize, timeout: timeout, - limitRetry: limitRetry, + Unmarshal: unmarshal, + Validate: validate, + Reject: reject, handle: handle, Retry: retry, + limitRetry: limitRetry, RetryCountName: retryCountName, - Error: handleError, + HandleError: handleError, Goroutine: goroutine, } if len(logs) >= 1 { @@ -83,7 +99,7 @@ func (w *BatchWorker[T]) Handle(ctx context.Context, data []byte, attrs map[stri return } if len(errs) > 0 { - w.HandleError(ctx, &v, errs, data, attrs) + w.Reject(ctx, &v, errs, data, attrs) return } } @@ -95,7 +111,13 @@ func (w *BatchWorker[T]) Handle(ctx context.Context, data []byte, attrs map[stri } w.mux.Unlock() } - +func (w *BatchWorker[T]) CallByTimer(ctx context.Context) { + w.mux.Lock() + if w.ready(ctx) { + w.execute(ctx) + } + w.mux.Unlock() +} func (w *BatchWorker[T]) ready(ctx context.Context) bool { isReady := false now := time.Now() @@ -122,7 +144,7 @@ func (w *BatchWorker[T]) execute(ctx context.Context) { if err != nil && w.LogError != nil { w.LogError(ctx, "Error of batch handling: "+err.Error()) } - if errList != nil && len(errList) > 0 { + if len(errList) > 0 { if w.Retry == nil { if w.LogError != nil { l := len(errList) @@ -150,8 +172,8 @@ func (w *BatchWorker[T]) execute(ctx context.Context) { x := CreateLog(errList[i].Data, errList[i].Attributes) w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, w.limitRetry, x)) } - if w.Error != nil { - w.Error(ctx, errList[i].Data, errList[i].Attributes) + if w.HandleError != nil { + w.HandleError(ctx, errList[i].Data, errList[i].Attributes) } continue } else if w.LogInfo != nil { @@ -182,18 +204,18 @@ func (w *BatchWorker[T]) Run(ctx context.Context) { for { select { case <-ticker.C: - w.Handle(ctx, nil, nil) + w.CallByTimer(ctx) } } }() } func CreateLog(data []byte, header map[string]string) interface{} { - if header == nil || len(header) == 0 { + if len(header) == 0 { return data } m := make(map[string]interface{}) m["data"] = data - if header != nil && len(header) > 0 { + if len(header) > 0 { m["attributes"] = header } return m diff --git a/handler/common.go b/handler/common.go index 94f03c1..3e0919f 100644 --- a/handler/common.go +++ b/handler/common.go @@ -24,13 +24,12 @@ type ErrorHandler[T any] struct { LogError func(context.Context, string) } -func (w *ErrorHandler[T]) HandleError(ctx context.Context, res T, err []ErrorMessage, data []byte) error { +func (w *ErrorHandler[T]) Reject(ctx context.Context, res T, err []ErrorMessage, data []byte) { if w.LogError != nil && data != nil { w.LogError(ctx, fmt.Sprintf("Message is invalid %s Error: %+v", data, err)) } - return nil } -func (w *ErrorHandler[T]) HandleErrorWithMap(ctx context.Context, res T, err []ErrorMessage, data []byte, attrs map[string]string) error { +func (w *ErrorHandler[T]) RejectWithMap(ctx context.Context, res T, err []ErrorMessage, data []byte, attrs map[string]string) { if w.LogError != nil && data != nil { if len(attrs) > 0 { w.LogError(ctx, fmt.Sprintf("Message is invalid %s Attributes: %+v Error: %+v", data, attrs, err)) @@ -38,9 +37,21 @@ func (w *ErrorHandler[T]) HandleErrorWithMap(ctx context.Context, res T, err []E w.LogError(ctx, fmt.Sprintf("Message is invalid %s Error: %+v", data, err)) } } - return nil } - +func (w *ErrorHandler[T]) HandleError(ctx context.Context, data []byte) { + if w.LogError != nil && data != nil { + w.LogError(ctx, fmt.Sprintf("Message is invalid %s", data)) + } +} +func (w *ErrorHandler[T]) HandleErrorWithMap(ctx context.Context, data []byte, attrs map[string]string) { + if w.LogError != nil && data != nil { + if len(attrs) > 0 { + w.LogError(ctx, fmt.Sprintf("Message is invalid %s Attributes: %+v", data, attrs)) + } else { + w.LogError(ctx, fmt.Sprintf("Message is invalid %s", data)) + } + } +} func GetString(ctx context.Context, key string) string { if len(key) > 0 { u := ctx.Value(key) diff --git a/handler/handler.go b/handler/handler.go index b005da4..192aacb 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -8,9 +8,11 @@ import ( ) type Handler[T any] struct { + Unmarshal func(data []byte, v any) error Write func(ctx context.Context, model *T) error Validate func(context.Context, *T) ([]ErrorMessage, error) - HandleError func(context.Context, *T, []ErrorMessage, []byte) + Reject func(context.Context, *T, []ErrorMessage, []byte) + HandleError func(context.Context, []byte) Retries []time.Duration Goroutines bool LogError func(context.Context, string) @@ -18,18 +20,35 @@ type Handler[T any] struct { Key string } -func NewHandlerByConfig[T any](c *RetryConfig, write func(context.Context, *T) error, validate func(context.Context, *T) ([]ErrorMessage, error), handleError func(context.Context, *T, []ErrorMessage, []byte), goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] { +func NewHandlerByConfig[T any](c *RetryConfig, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte), + handleError func(context.Context, []byte), + goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] { if c == nil { - return NewHandlerWithKey[T](write, validate, handleError, nil, goroutines, key, logs...) + return NewHandlerWithKey[T](nil, write, validate, reject, handleError, nil, goroutines, key, logs...) } else { retries := DurationsFromValue(*c, "Retry", 20) - return NewHandlerWithKey[T](write, validate, handleError, retries, goroutines, key, logs...) + return NewHandlerWithKey[T](nil, write, validate, reject, handleError, retries, goroutines, key, logs...) } } -func NewHandlerWithKey[T any](write func(context.Context, *T) error, validate func(context.Context, *T) ([]ErrorMessage, error), handleError func(context.Context, *T, []ErrorMessage, []byte), retries []time.Duration, goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] { +func NewHandlerWithKey[T any]( + unmarshal func(data []byte, v any) error, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte), + handleError func(context.Context, []byte), + retries []time.Duration, + goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] { + if unmarshal == nil { + unmarshal = json.Unmarshal + } c := &Handler[T]{ Write: write, + Unmarshal: unmarshal, Validate: validate, + Reject: reject, HandleError: handleError, Retries: retries, Goroutines: goroutines, @@ -73,7 +92,7 @@ func (c *Handler[T]) Handle(ctx context.Context, data []byte) { return } if len(errs) > 0 { - c.HandleError(ctx, &v, errs, data) + c.Reject(ctx, &v, errs, data) } } if c.Goroutines { @@ -102,12 +121,18 @@ func (c *Handler[T]) write(ctx context.Context, data []byte, item *T) error { }, c.LogError) if err != nil && c.LogError != nil { c.LogError(ctx, fmt.Sprintf("Failed to write after %d retries: %s. Error: %s.", len(c.Retries), data, err.Error())) + if c.HandleError != nil { + c.HandleError(ctx, data) + } } return nil } else { if c.LogError != nil { c.LogError(ctx, fmt.Sprintf("Failed to write %s . Error: %s", data, er3.Error())) } + if c.HandleError != nil { + c.HandleError(ctx, data) + } return er3 } } diff --git a/handler/retry_handler.go b/handler/retry_handler.go index c7e5de7..62c6c80 100644 --- a/handler/retry_handler.go +++ b/handler/retry_handler.go @@ -15,26 +15,47 @@ type HandlerConfig struct { } type RetryHandler[T any] struct { - Write func(context.Context, *T) error - Validate func(context.Context, *T) ([]ErrorMessage, error) - HandleError func(context.Context, *T, []ErrorMessage, []byte, map[string]string) - LimitRetry int - Retry func(context.Context, []byte, map[string]string) error - RetryCountName string - HandleException func(context.Context, []byte, map[string]string) - Goroutines bool - LogError func(context.Context, string) - LogInfo func(context.Context, string) - Key string + Unmarshal func(data []byte, v any) error + Write func(context.Context, *T) error + Validate func(context.Context, *T) ([]ErrorMessage, error) + Reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string) + HandleError func(context.Context, []byte, map[string]string) + Retry func(context.Context, []byte, map[string]string) error + LimitRetry int + RetryCountName string + Goroutines bool + LogError func(context.Context, string) + LogInfo func(context.Context, string) + Key string } -func NewRetryHandler[T any](write func(context.Context, *T) error, validate func(context.Context, *T) ([]ErrorMessage, error), handleError func(context.Context, *T, []ErrorMessage, []byte, map[string]string), goroutines bool, key string, logs ...func(context.Context, string)) *RetryHandler[T] { +func NewRetryHandler[T any]( + unmarshal func(data []byte, v any) error, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handleError func(context.Context, []byte, map[string]string), + retry func(context.Context, []byte, map[string]string) error, + limitRetry int, + retryCountName string, + goroutines bool, key string, logs ...func(context.Context, string)) *RetryHandler[T] { + if len(retryCountName) == 0 { + retryCountName = "retry" + } + if unmarshal == nil { + unmarshal = json.Unmarshal + } c := &RetryHandler[T]{ - Write: write, - Validate: validate, - HandleError: handleError, - Goroutines: goroutines, - Key: key, + Unmarshal: unmarshal, + Write: write, + Validate: validate, + Reject: reject, + HandleError: handleError, + Retry: retry, + LimitRetry: limitRetry, + RetryCountName: retryCountName, + Goroutines: goroutines, + Key: key, } if len(logs) >= 1 { c.LogError = logs[0] @@ -58,7 +79,7 @@ func (c *RetryHandler[T]) Handle(ctx context.Context, data []byte, attrs map[str } } var v T - er1 := json.Unmarshal(data, &v) + er1 := c.Unmarshal(data, &v) if er1 != nil { if c.LogError != nil { c.LogError(ctx, fmt.Sprintf("cannot unmarshal item: %s. Error: %s", data, er1.Error())) @@ -74,13 +95,13 @@ func (c *RetryHandler[T]) Handle(ctx context.Context, data []byte, attrs map[str return } if len(errs) > 0 { - c.HandleError(ctx, &v, errs, data, attrs) + c.Reject(ctx, &v, errs, data, attrs) } } if c.Goroutines { - go Write[*T](ctx, c.Write, &v, data, attrs, c.HandleException, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo) + go Write[*T](ctx, c.Write, &v, data, attrs, c.HandleError, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo) } else { - Write[*T](ctx, c.Write, &v, data, attrs, c.HandleException, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo) + Write[*T](ctx, c.Write, &v, data, attrs, c.HandleError, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo) } } @@ -123,7 +144,7 @@ func Write[T any](ctx context.Context, write func(context.Context, T) error, ite if attrs == nil || len(attrs) == 0 { logInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, limitRetry, data)) } else { - logInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s %s.", retryCount, limitRetry, data, attrs)) + logInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s %v", retryCount, limitRetry, data, attrs)) } } @@ -133,9 +154,9 @@ func Write[T any](ctx context.Context, write func(context.Context, T) error, ite } else { if logInfo != nil { if attrs == nil || len(attrs) == 0 { - logInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s.", retryCount, data)) + logInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s", retryCount, data)) } else { - logInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s %s.", retryCount, data, attrs)) + logInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s %v", retryCount, data, attrs)) } } attrs[retryCountName] = strconv.Itoa(retryCount) @@ -145,7 +166,7 @@ func Write[T any](ctx context.Context, write func(context.Context, T) error, ite if attrs == nil || len(attrs) == 0 { logError(ctx, fmt.Sprintf("Cannot retry %s . Error: %s", data, er2.Error())) } else { - logError(ctx, fmt.Sprintf("Cannot retry %s %s. Error: %s", data, attrs, er2.Error())) + logError(ctx, fmt.Sprintf("Cannot retry %s %v. Error: %s", data, attrs, er2.Error())) } } }