Skip to content

Commit

Permalink
Refactor handler
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Jun 2, 2024
1 parent cc736ca commit 53bbfdb
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 56 deletions.
10 changes: 10 additions & 0 deletions batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,18 @@ import (
"encoding/json"
"fmt"
"reflect"
"time"
)

type Message struct {
Id string `json:"id,omitempty" gorm:"column:id;primary_key" bson:"id,omitempty" dynamodbav:"id,omitempty" firestore:"id,omitempty"`
Data []byte `json:"data,omitempty" gorm:"column:data" bson:"data,omitempty" dynamodbav:"data,omitempty" firestore:"data,omitempty"`
Attributes map[string]string `json:"attributes,omitempty" gorm:"column:attributes" bson:"attributes,omitempty" dynamodbav:"attributes,omitempty" firestore:"attributes,omitempty"`
Timestamp *time.Time `json:"timestamp,omitempty" gorm:"column:timestamp" bson:"timestamp,omitempty" dynamodbav:"timestamp,omitempty" firestore:"timestamp,omitempty"`
Raw interface{} `json:"-" bson:"-" dynamodbav:"-" firestore:"-"`
Value interface{} `json:"-" bson:"-" dynamodbav:"-" firestore:"-"`
}

type BatchHandler struct {
modelType reflect.Type
modelsType reflect.Type
Expand Down
8 changes: 0 additions & 8 deletions batch_worker.go

This file was deleted.

8 changes: 0 additions & 8 deletions batch_worker_config.go

This file was deleted.

12 changes: 12 additions & 0 deletions default_batch_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ import (

const TimeFormat = "15:04:05.000"

type BatchWorker interface {
Handle(ctx context.Context, message *Message)
Run(ctx context.Context)
}

type BatchWorkerConfig struct {
BatchSize int `mapstructure:"batch_size" json:"batchSize,omitempty" gorm:"column:batchsize" bson:"batchSize,omitempty" dynamodbav:"batchSize,omitempty" firestore:"batchSize,omitempty"`
Timeout int64 `mapstructure:"timeout" json:"timeout,omitempty" gorm:"column:timeout" bson:"timeout,omitempty" dynamodbav:"timeout,omitempty" firestore:"timeout,omitempty"`
LimitRetry int `mapstructure:"limit_retry" json:"limitRetry,omitempty" gorm:"column:limitretry" bson:"limitRetry,omitempty" dynamodbav:"limitRetry,omitempty" firestore:"limitRetry,omitempty"`
Goroutines bool `mapstructure:"goroutines" json:"goroutines,omitempty" gorm:"column:goroutines" bson:"goroutines,omitempty" dynamodbav:"goroutines,omitempty" firestore:"goroutines,omitempty"`
}

type DefaultBatchWorker struct {
batchSize int
timeout int64
Expand Down
1 change: 1 addition & 0 deletions handler/batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type Message[T any] struct {
Attributes map[string]string `yaml:"attributes" mapstructure:"attributes" json:"attributes,omitempty" gorm:"column:attributes" bson:"attributes,omitempty" dynamodbav:"attributes,omitempty" firestore:"attributes,omitempty"`
Value T `yaml:"value" mapstructure:"value" json:"value,omitempty" gorm:"column:value" bson:"value,omitempty" dynamodbav:"value,omitempty" firestore:"value,omitempty"`
}

type BatchHandler[T any] struct {
Write func(context.Context, []T) ([]int, error) // Return: Fail indices, Error
}
Expand Down
64 changes: 36 additions & 28 deletions handler/batch_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ import (

const TimeFormat = "15:04:05.000"

type BatchConfig struct {
RetryCountName string `yaml:"retry_count_name" mapstructure:"retry_count_name" json:"retryCountName,omitempty" gorm:"column:retrycountname" bson:"retryCountName,omitempty" dynamodbav:"retryCountName,omitempty" firestore:"retryCountName,omitempty"`
LimitRetry int `yaml:"limit_retry" mapstructure:"limit_retry" json:"limitRetry,omitempty" gorm:"column:limitretry" bson:"limitRetry,omitempty" dynamodbav:"limitRetry,omitempty" firestore:"limitRetry,omitempty"`
Goroutines bool `yaml:"goroutines" mapstructure:"goroutines" json:"goroutines,omitempty" gorm:"column:goroutines" bson:"goroutines,omitempty" dynamodbav:"goroutines,omitempty" firestore:"goroutines,omitempty"`
Key string `yaml:"key" mapstructure:"key" json:"key,omitempty" gorm:"column:key" bson:"key,omitempty" dynamodbav:"key,omitempty" firestore:"key,omitempty"`
Timeout int64 `yaml:"timeout" mapstructure:"timeout" json:"timeout,omitempty" gorm:"column:timeout" bson:"timeout,omitempty" dynamodbav:"timeout,omitempty" firestore:"timeout,omitempty"`
BatchSize int `yaml:"batch_size" mapstructure:"batch_size" json:"batchSize,omitempty" gorm:"column:batchsize" bson:"batchSize,omitempty" dynamodbav:"batchSize,omitempty" firestore:"batchSize,omitempty"`
}

type BatchWorker[T any] struct {
batchSize int
timeout int64
Expand All @@ -29,15 +38,7 @@ type BatchWorker[T any] struct {
Key string
LogError func(context.Context, string)
LogInfo func(context.Context, string)
}

type BatchConfig struct {
RetryCountName string `yaml:"retry_count_name" mapstructure:"retry_count_name" json:"retryCountName,omitempty" gorm:"column:retrycountname" bson:"retryCountName,omitempty" dynamodbav:"retryCountName,omitempty" firestore:"retryCountName,omitempty"`
LimitRetry int `yaml:"limit_retry" mapstructure:"limit_retry" json:"limitRetry,omitempty" gorm:"column:limitretry" bson:"limitRetry,omitempty" dynamodbav:"limitRetry,omitempty" firestore:"limitRetry,omitempty"`
Goroutines bool `yaml:"goroutines" mapstructure:"goroutines" json:"goroutines,omitempty" gorm:"column:goroutines" bson:"goroutines,omitempty" dynamodbav:"goroutines,omitempty" firestore:"goroutines,omitempty"`
Key string `yaml:"key" mapstructure:"key" json:"key,omitempty" gorm:"column:key" bson:"key,omitempty" dynamodbav:"key,omitempty" firestore:"key,omitempty"`
Timeout int64 `yaml:"timeout" mapstructure:"timeout" json:"timeout,omitempty" gorm:"column:timeout" bson:"timeout,omitempty" dynamodbav:"timeout,omitempty" firestore:"timeout,omitempty"`
BatchSize int `yaml:"batch_size" mapstructure:"batch_size" json:"batchSize,omitempty" gorm:"column:batchsize" bson:"batchSize,omitempty" dynamodbav:"batchSize,omitempty" firestore:"batchSize,omitempty"`
LogDebug func(context.Context, string)
}

func NewBatchWorkerByConfig[T any](
Expand Down Expand Up @@ -146,6 +147,9 @@ func (w *BatchWorker[T]) Handle(ctx context.Context, data []byte, attrs map[stri
}
func (w *BatchWorker[T]) CallByTimer(ctx context.Context) {
w.mux.Lock()
if w.LogDebug != nil {
w.LogDebug(ctx, "Call by timer")
}
if w.ready(ctx) {
w.execute(ctx)
}
Expand All @@ -157,6 +161,9 @@ func (w *BatchWorker[T]) ready(ctx context.Context) bool {
batchSize := len(w.messages)
t := w.latestExecutedTime.Add(time.Duration(w.timeout) * time.Millisecond)
if batchSize > 0 && (batchSize >= w.batchSize || t.Sub(now) < 0) {
if w.LogDebug != nil && batchSize >= w.batchSize {
w.LogDebug(ctx, fmt.Sprintf("Call by batch size %d %d", w.batchSize, batchSize))
}
isReady = true
}
if isReady && w.LogInfo != nil {
Expand All @@ -182,8 +189,11 @@ func (w *BatchWorker[T]) execute(ctx context.Context) {
if w.LogError != nil {
l := len(errList)
for i := 0; i < l; i++ {
x := CreateLog(errList[i].Data, errList[i].Attributes)
w.LogError(ctx, fmt.Sprintf("Error message: %s.", x))
if len(errList[i].Attributes) > 0 {
w.LogError(ctx, fmt.Sprintf("Error message: %s %+v", errList[i].Data, errList[i].Attributes))
} else {
w.LogError(ctx, fmt.Sprintf("Error message: %s.", errList[i].Data))
}
}
}
} else {
Expand All @@ -202,22 +212,31 @@ func (w *BatchWorker[T]) execute(ctx context.Context) {
retryCount++
if retryCount > w.LimitRetry {
if w.LogInfo != nil {
x := CreateLog(errList[i].Data, errList[i].Attributes)
w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, w.LimitRetry, x))
if len(errList[i].Attributes) > 0 {
w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s %+v", retryCount, w.LimitRetry, errList[i].Data, errList[i].Attributes))
} else {
w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, w.LimitRetry, errList[i].Data))
}
}
if w.HandleError != nil {
w.HandleError(ctx, errList[i].Data, errList[i].Attributes)
}
continue
} else if w.LogInfo != nil {
x := CreateLog(errList[i].Data, errList[i].Attributes)
w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s.", retryCount, x))
if len(errList[i].Attributes) > 0 {
w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s %+v", retryCount, errList[i].Data, errList[i].Attributes))
} else {
w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s.", retryCount, errList[i].Data))
}
}
errList[i].Attributes[w.RetryCountName] = strconv.Itoa(retryCount)
er3 := w.Retry(ctx, errList[i].Data, errList[i].Attributes)
if er3 != nil && w.LogError != nil {
x := CreateLog(errList[i].Data, errList[i].Attributes)
w.LogError(ctx, fmt.Sprintf("Cannot retry %s . Error: %s", x, er3.Error()))
if len(errList[i].Attributes) > 0 {
w.LogError(ctx, fmt.Sprintf("Cannot retry %s %+v. Error: %s", errList[i].Data, errList[i].Attributes, er3.Error()))
} else {
w.LogError(ctx, fmt.Sprintf("Cannot retry %s . Error: %s", errList[i].Data, er3.Error()))
}
}
}
}
Expand All @@ -242,14 +261,3 @@ func (w *BatchWorker[T]) Run(ctx context.Context) {
}
}()
}
func CreateLog(data []byte, header map[string]string) interface{} {
if len(header) == 0 {
return data
}
m := make(map[string]interface{})
m["data"] = data
if len(header) > 0 {
m["attributes"] = header
}
return m
}
File renamed without changes.
12 changes: 0 additions & 12 deletions message.go

This file was deleted.

0 comments on commit 53bbfdb

Please sign in to comment.