Skip to content

Commit

Permalink
Refactor handler
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Jun 2, 2024
1 parent 2246664 commit fb85836
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 54 deletions.
7 changes: 4 additions & 3 deletions handler/batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ func (h *BatchHandler[T]) Handle(ctx context.Context, data []Message[T]) ([]Mess
if err != nil {
return failMessages, err
}
if len(failIndices) > 0 {
for _, failIndex := range failIndices {
failMessages = append(failMessages, data[failIndex])
sl := len(failIndices)
if sl > 0 {
for j := 0; j < sl; j++ {
failMessages = append(failMessages, data[failIndices[j]])
}
}
}
Expand Down
52 changes: 37 additions & 15 deletions handler/batch_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ const TimeFormat = "15:04:05.000"
type BatchWorker[T any] struct {
batchSize int
timeout int64
limitRetry int
Unmarshal func(data []byte, v any) error
handle func(ctx context.Context, data []Message[T]) ([]Message[T], error)
Validate func(context.Context, *T) ([]ErrorMessage, error)
HandleError func(context.Context, *T, []ErrorMessage, []byte, map[string]string)
Reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string)
HandleError func(context.Context, []byte, map[string]string) error
Retry func(context.Context, []byte, map[string]string) error
limitRetry int
RetryCountName string
Error func(context.Context, []byte, map[string]string) error
Goroutine bool
messages []Message[T]
latestExecutedTime time.Time
Expand All @@ -30,19 +31,34 @@ type BatchWorker[T any] struct {
LogInfo func(context.Context, string)
}

func NewBatchWorker[T any](batchSize int, timeout int64, limitRetry int, handle func(context.Context, []Message[T]) ([]Message[T], error), retry func(context.Context, []byte, map[string]string) error, retryCountName string, handleError func(context.Context, []byte, map[string]string) error, goroutine bool, logs ...func(context.Context, string)) *BatchWorker[T] {
func NewBatchWorker[T any](
batchSize int, timeout int64,
unmarshal func(data []byte, v any) error,
validate func(context.Context, *T) ([]ErrorMessage, error),
reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string),
handle func(context.Context, []Message[T]) ([]Message[T], error),
retry func(context.Context, []byte, map[string]string) error,
limitRetry int,
retryCountName string,
handleError func(context.Context, []byte, map[string]string) error,
goroutine bool, logs ...func(context.Context, string)) *BatchWorker[T] {
if len(retryCountName) == 0 {
retryCountName = "retryCount"
}

if unmarshal == nil {
unmarshal = json.Unmarshal
}
w := &BatchWorker[T]{
batchSize: batchSize,
timeout: timeout,
limitRetry: limitRetry,
Unmarshal: unmarshal,
Validate: validate,
Reject: reject,
handle: handle,
Retry: retry,
limitRetry: limitRetry,
RetryCountName: retryCountName,
Error: handleError,
HandleError: handleError,
Goroutine: goroutine,
}
if len(logs) >= 1 {
Expand Down Expand Up @@ -83,7 +99,7 @@ func (w *BatchWorker[T]) Handle(ctx context.Context, data []byte, attrs map[stri
return
}
if len(errs) > 0 {
w.HandleError(ctx, &v, errs, data, attrs)
w.Reject(ctx, &v, errs, data, attrs)
return
}
}
Expand All @@ -95,7 +111,13 @@ func (w *BatchWorker[T]) Handle(ctx context.Context, data []byte, attrs map[stri
}
w.mux.Unlock()
}

func (w *BatchWorker[T]) CallByTimer(ctx context.Context) {
w.mux.Lock()
if w.ready(ctx) {
w.execute(ctx)
}
w.mux.Unlock()
}
func (w *BatchWorker[T]) ready(ctx context.Context) bool {
isReady := false
now := time.Now()
Expand All @@ -122,7 +144,7 @@ func (w *BatchWorker[T]) execute(ctx context.Context) {
if err != nil && w.LogError != nil {
w.LogError(ctx, "Error of batch handling: "+err.Error())
}
if errList != nil && len(errList) > 0 {
if len(errList) > 0 {
if w.Retry == nil {
if w.LogError != nil {
l := len(errList)
Expand Down Expand Up @@ -150,8 +172,8 @@ func (w *BatchWorker[T]) execute(ctx context.Context) {
x := CreateLog(errList[i].Data, errList[i].Attributes)
w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, w.limitRetry, x))
}
if w.Error != nil {
w.Error(ctx, errList[i].Data, errList[i].Attributes)
if w.HandleError != nil {
w.HandleError(ctx, errList[i].Data, errList[i].Attributes)
}
continue
} else if w.LogInfo != nil {
Expand Down Expand Up @@ -182,18 +204,18 @@ func (w *BatchWorker[T]) Run(ctx context.Context) {
for {
select {
case <-ticker.C:
w.Handle(ctx, nil, nil)
w.CallByTimer(ctx)
}
}
}()
}
func CreateLog(data []byte, header map[string]string) interface{} {
if header == nil || len(header) == 0 {
if len(header) == 0 {
return data
}
m := make(map[string]interface{})
m["data"] = data
if header != nil && len(header) > 0 {
if len(header) > 0 {
m["attributes"] = header
}
return m
Expand Down
21 changes: 16 additions & 5 deletions handler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,34 @@ type ErrorHandler[T any] struct {
LogError func(context.Context, string)
}

func (w *ErrorHandler[T]) HandleError(ctx context.Context, res T, err []ErrorMessage, data []byte) error {
func (w *ErrorHandler[T]) Reject(ctx context.Context, res T, err []ErrorMessage, data []byte) {
if w.LogError != nil && data != nil {
w.LogError(ctx, fmt.Sprintf("Message is invalid %s Error: %+v", data, err))
}
return nil
}
func (w *ErrorHandler[T]) HandleErrorWithMap(ctx context.Context, res T, err []ErrorMessage, data []byte, attrs map[string]string) error {
func (w *ErrorHandler[T]) RejectWithMap(ctx context.Context, res T, err []ErrorMessage, data []byte, attrs map[string]string) {
if w.LogError != nil && data != nil {
if len(attrs) > 0 {
w.LogError(ctx, fmt.Sprintf("Message is invalid %s Attributes: %+v Error: %+v", data, attrs, err))
} else {
w.LogError(ctx, fmt.Sprintf("Message is invalid %s Error: %+v", data, err))
}
}
return nil
}

func (w *ErrorHandler[T]) HandleError(ctx context.Context, data []byte) {
if w.LogError != nil && data != nil {
w.LogError(ctx, fmt.Sprintf("Message is invalid %s", data))
}
}
func (w *ErrorHandler[T]) HandleErrorWithMap(ctx context.Context, data []byte, attrs map[string]string) {
if w.LogError != nil && data != nil {
if len(attrs) > 0 {
w.LogError(ctx, fmt.Sprintf("Message is invalid %s Attributes: %+v", data, attrs))
} else {
w.LogError(ctx, fmt.Sprintf("Message is invalid %s", data))
}
}
}
func GetString(ctx context.Context, key string) string {
if len(key) > 0 {
u := ctx.Value(key)
Expand Down
37 changes: 31 additions & 6 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,47 @@ import (
)

type Handler[T any] struct {
Unmarshal func(data []byte, v any) error
Write func(ctx context.Context, model *T) error
Validate func(context.Context, *T) ([]ErrorMessage, error)
HandleError func(context.Context, *T, []ErrorMessage, []byte)
Reject func(context.Context, *T, []ErrorMessage, []byte)
HandleError func(context.Context, []byte)
Retries []time.Duration
Goroutines bool
LogError func(context.Context, string)
LogInfo func(context.Context, string)
Key string
}

func NewHandlerByConfig[T any](c *RetryConfig, write func(context.Context, *T) error, validate func(context.Context, *T) ([]ErrorMessage, error), handleError func(context.Context, *T, []ErrorMessage, []byte), goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] {
func NewHandlerByConfig[T any](c *RetryConfig,
write func(context.Context, *T) error,
validate func(context.Context, *T) ([]ErrorMessage, error),
reject func(context.Context, *T, []ErrorMessage, []byte),
handleError func(context.Context, []byte),
goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] {
if c == nil {
return NewHandlerWithKey[T](write, validate, handleError, nil, goroutines, key, logs...)
return NewHandlerWithKey[T](nil, write, validate, reject, handleError, nil, goroutines, key, logs...)
} else {
retries := DurationsFromValue(*c, "Retry", 20)
return NewHandlerWithKey[T](write, validate, handleError, retries, goroutines, key, logs...)
return NewHandlerWithKey[T](nil, write, validate, reject, handleError, retries, goroutines, key, logs...)
}
}
func NewHandlerWithKey[T any](write func(context.Context, *T) error, validate func(context.Context, *T) ([]ErrorMessage, error), handleError func(context.Context, *T, []ErrorMessage, []byte), retries []time.Duration, goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] {
func NewHandlerWithKey[T any](
unmarshal func(data []byte, v any) error,
write func(context.Context, *T) error,
validate func(context.Context, *T) ([]ErrorMessage, error),
reject func(context.Context, *T, []ErrorMessage, []byte),
handleError func(context.Context, []byte),
retries []time.Duration,
goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] {
if unmarshal == nil {
unmarshal = json.Unmarshal
}
c := &Handler[T]{
Write: write,
Unmarshal: unmarshal,
Validate: validate,
Reject: reject,
HandleError: handleError,
Retries: retries,
Goroutines: goroutines,
Expand Down Expand Up @@ -73,7 +92,7 @@ func (c *Handler[T]) Handle(ctx context.Context, data []byte) {
return
}
if len(errs) > 0 {
c.HandleError(ctx, &v, errs, data)
c.Reject(ctx, &v, errs, data)
}
}
if c.Goroutines {
Expand Down Expand Up @@ -102,12 +121,18 @@ func (c *Handler[T]) write(ctx context.Context, data []byte, item *T) error {
}, c.LogError)
if err != nil && c.LogError != nil {
c.LogError(ctx, fmt.Sprintf("Failed to write after %d retries: %s. Error: %s.", len(c.Retries), data, err.Error()))
if c.HandleError != nil {
c.HandleError(ctx, data)
}
}
return nil
} else {
if c.LogError != nil {
c.LogError(ctx, fmt.Sprintf("Failed to write %s . Error: %s", data, er3.Error()))
}
if c.HandleError != nil {
c.HandleError(ctx, data)
}
return er3
}
}
Expand Down
71 changes: 46 additions & 25 deletions handler/retry_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,47 @@ type HandlerConfig struct {
}

type RetryHandler[T any] struct {
Write func(context.Context, *T) error
Validate func(context.Context, *T) ([]ErrorMessage, error)
HandleError func(context.Context, *T, []ErrorMessage, []byte, map[string]string)
LimitRetry int
Retry func(context.Context, []byte, map[string]string) error
RetryCountName string
HandleException func(context.Context, []byte, map[string]string)
Goroutines bool
LogError func(context.Context, string)
LogInfo func(context.Context, string)
Key string
Unmarshal func(data []byte, v any) error
Write func(context.Context, *T) error
Validate func(context.Context, *T) ([]ErrorMessage, error)
Reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string)
HandleError func(context.Context, []byte, map[string]string)
Retry func(context.Context, []byte, map[string]string) error
LimitRetry int
RetryCountName string
Goroutines bool
LogError func(context.Context, string)
LogInfo func(context.Context, string)
Key string
}

func NewRetryHandler[T any](write func(context.Context, *T) error, validate func(context.Context, *T) ([]ErrorMessage, error), handleError func(context.Context, *T, []ErrorMessage, []byte, map[string]string), goroutines bool, key string, logs ...func(context.Context, string)) *RetryHandler[T] {
func NewRetryHandler[T any](
unmarshal func(data []byte, v any) error,
write func(context.Context, *T) error,
validate func(context.Context, *T) ([]ErrorMessage, error),
reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string),
handleError func(context.Context, []byte, map[string]string),
retry func(context.Context, []byte, map[string]string) error,
limitRetry int,
retryCountName string,
goroutines bool, key string, logs ...func(context.Context, string)) *RetryHandler[T] {
if len(retryCountName) == 0 {
retryCountName = "retry"
}
if unmarshal == nil {
unmarshal = json.Unmarshal
}
c := &RetryHandler[T]{
Write: write,
Validate: validate,
HandleError: handleError,
Goroutines: goroutines,
Key: key,
Unmarshal: unmarshal,
Write: write,
Validate: validate,
Reject: reject,
HandleError: handleError,
Retry: retry,
LimitRetry: limitRetry,
RetryCountName: retryCountName,
Goroutines: goroutines,
Key: key,
}
if len(logs) >= 1 {
c.LogError = logs[0]
Expand All @@ -58,7 +79,7 @@ func (c *RetryHandler[T]) Handle(ctx context.Context, data []byte, attrs map[str
}
}
var v T
er1 := json.Unmarshal(data, &v)
er1 := c.Unmarshal(data, &v)
if er1 != nil {
if c.LogError != nil {
c.LogError(ctx, fmt.Sprintf("cannot unmarshal item: %s. Error: %s", data, er1.Error()))
Expand All @@ -74,13 +95,13 @@ func (c *RetryHandler[T]) Handle(ctx context.Context, data []byte, attrs map[str
return
}
if len(errs) > 0 {
c.HandleError(ctx, &v, errs, data, attrs)
c.Reject(ctx, &v, errs, data, attrs)
}
}
if c.Goroutines {
go Write[*T](ctx, c.Write, &v, data, attrs, c.HandleException, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo)
go Write[*T](ctx, c.Write, &v, data, attrs, c.HandleError, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo)
} else {
Write[*T](ctx, c.Write, &v, data, attrs, c.HandleException, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo)
Write[*T](ctx, c.Write, &v, data, attrs, c.HandleError, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo)
}
}

Expand Down Expand Up @@ -123,7 +144,7 @@ func Write[T any](ctx context.Context, write func(context.Context, T) error, ite
if attrs == nil || len(attrs) == 0 {
logInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, limitRetry, data))
} else {
logInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s %s.", retryCount, limitRetry, data, attrs))
logInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s %v", retryCount, limitRetry, data, attrs))
}

}
Expand All @@ -133,9 +154,9 @@ func Write[T any](ctx context.Context, write func(context.Context, T) error, ite
} else {
if logInfo != nil {
if attrs == nil || len(attrs) == 0 {
logInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s.", retryCount, data))
logInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s", retryCount, data))
} else {
logInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s %s.", retryCount, data, attrs))
logInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s %v", retryCount, data, attrs))
}
}
attrs[retryCountName] = strconv.Itoa(retryCount)
Expand All @@ -145,7 +166,7 @@ func Write[T any](ctx context.Context, write func(context.Context, T) error, ite
if attrs == nil || len(attrs) == 0 {
logError(ctx, fmt.Sprintf("Cannot retry %s . Error: %s", data, er2.Error()))
} else {
logError(ctx, fmt.Sprintf("Cannot retry %s %s. Error: %s", data, attrs, er2.Error()))
logError(ctx, fmt.Sprintf("Cannot retry %s %v. Error: %s", data, attrs, er2.Error()))
}
}
}
Expand Down

0 comments on commit fb85836

Please sign in to comment.