Skip to content

Commit

Permalink
Fix bug validate
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Oct 31, 2021
1 parent 591d468 commit ca77384
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 28 deletions.
17 changes: 17 additions & 0 deletions avro/marshaller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package avro

import "github.com/hamba/avro"

type Marshaller struct {
Schema avro.Schema
}

func NewMarshaller(schema avro.Schema) *Marshaller {
return &Marshaller{Schema: schema}
}
func (c *Marshaller) Unmarshal(data []byte, v interface{}) error {
return avro.Unmarshal(c.Schema, data, v)
}
func (c *Marshaller) Marshal(v interface{}) ([]byte, error) {
return avro.Marshal(c.Schema, v)
}
21 changes: 15 additions & 6 deletions batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ type BatchHandler struct {
modelType reflect.Type
modelsType reflect.Type
Write func(ctx context.Context, models interface{}) ([]int, []int, error) // Return: Success indices, Fail indices, Error
Unmarshal func(data []byte, v interface{}) error
Marshal func(v interface{}) ([]byte, error)
LogError func(context.Context, string)
LogInfo func(context.Context, string)
}

func NewBatchHandler(writeBatch func(context.Context, interface{}) ([]int, []int, error), modelType reflect.Type, logs ...func(context.Context, string)) *BatchHandler {
func NewBatchHandler(writeBatch func(context.Context, interface{}) ([]int, []int, error), modelType reflect.Type, unmarshal func(data []byte, v interface{}) error, logs ...func(context.Context, string)) *BatchHandler {
modelsType := reflect.Zero(reflect.SliceOf(modelType)).Type()
if unmarshal == nil {
unmarshal = json.Unmarshal
}
h := &BatchHandler{modelType: modelType, modelsType: modelsType, Write: writeBatch}
if len(logs) >= 1 {
h.LogError = logs[0]
Expand All @@ -39,7 +44,7 @@ func (h *BatchHandler) Handle(ctx context.Context, data []*Message) ([]*Message,
v = reflect.Append(v, v1)
} else {
item := InitModel(h.modelType)
err := json.Unmarshal(message.Data, item)
err := h.Unmarshal(message.Data, item)
if err != nil {
failMessages = append(failMessages, message)
return failMessages, fmt.Errorf(`cannot unmarshal item: %s. Error: %s`, message.Data, err.Error())
Expand All @@ -50,11 +55,15 @@ func (h *BatchHandler) Handle(ctx context.Context, data []*Message) ([]*Message,
}
}
if h.LogInfo != nil {
sv, er0 := json.Marshal(v.Interface())
if er0 != nil {
h.LogInfo(ctx, fmt.Sprintf(`models: %s`, v))
if h.Marshal != nil {
sv, er0 := h.Marshal(v.Interface())
if er0 != nil {
h.LogInfo(ctx, fmt.Sprintf(`models: %s`, v))
} else {
h.LogInfo(ctx, fmt.Sprintf(`models: %s`, sv))
}
} else {
h.LogInfo(ctx, fmt.Sprintf(`models: %s`, sv))
h.LogInfo(ctx, fmt.Sprintf(`models: %s`, v))
}
}
successIndices, failIndices, er1 := h.Write(ctx, v.Interface())
Expand Down
46 changes: 46 additions & 0 deletions converter/converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package converter

import (
"fmt"
"time"
)

func TimeToMilliseconds(time string) (int64, error) {
var h, m, s int
_, err := fmt.Sscanf(time, "%02d:%02d:%02d", &h, &m, &s)
if err != nil {
return 0, err
}
return int64(h * 3600000 + m * 60000 + s * 1000), nil
}
func DateToUnixTime(s string) (int64, error) {
layout := "2006-01-02"
date, err := time.Parse(layout, s)
if err != nil {
return 0, err
}
return date.Unix() * 1000, nil
}
func DateToUnixNano(s string) (int64, error) {
layout := "2006-01-02"
date, err := time.Parse(layout, s)
if err != nil {
return 0, err
}
return date.UnixNano(), nil
}
func UnixTime(m int64) string {
dateUtc := time.Unix(0, m* 1000000)
return dateUtc.Format("2006-01-02")
}
func MillisecondsToTimeString(milliseconds int) string {
hourUint := 3600000 //60 * 60 * 1000 = 3600000
minuteUint := 60000 //60 * 1000 = 60000
secondUint := 1000
hour := milliseconds / hourUint
milliseconds = milliseconds % hourUint
minute := milliseconds / minuteUint
milliseconds = milliseconds % minuteUint
second := milliseconds / secondUint
return fmt.Sprintf("%02d:%02d:%02d", hour, minute, second)
}
12 changes: 9 additions & 3 deletions error_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@ import (
)

func NewErrorWriter(write func(ctx context.Context, model interface{}) error, modelType *reflect.Type, logError ...func(context.Context, string)) *ErrorWriter {
h := &ErrorWriter{Write: write, ModelType: modelType}
return NewErrorWriterWithUnmarshal(write, modelType, json.Unmarshal, logError...)
}
func NewErrorWriterWithUnmarshal(write func(ctx context.Context, model interface{}) error, modelType *reflect.Type, unmarshal func(data []byte, v interface{}) error, logError ...func(context.Context, string)) *ErrorWriter {
if unmarshal == nil {
unmarshal = json.Unmarshal
}
h := &ErrorWriter{Write: write, ModelType: modelType, Unmarshal: unmarshal}
if len(logError) >= 1 {
h.LogError = logError[0]
}
return h
}

type ErrorWriter struct {
Write func(ctx context.Context, model interface{}) error
ModelType *reflect.Type
Unmarshal func(data []byte, v interface{}) error
LogError func(context.Context, string)
}

Expand All @@ -30,7 +36,7 @@ func (w *ErrorWriter) HandleError(ctx context.Context, data []byte, attrs map[st
return w.Write(ctx, data)
} else {
v := InitModel(*w.ModelType)
err := json.Unmarshal(data, v)
err := w.Unmarshal(data, v)
if err != nil {
return err
}
Expand Down
37 changes: 23 additions & 14 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,35 @@ type Handler struct {
Retry func(context.Context, []byte, map[string]string) error
RetryCountName string
Error func(context.Context, []byte, map[string]string) error
Unmarshal func([]byte, interface{}) error
Retries []time.Duration
Goroutines bool
LogError func(context.Context, string)
LogInfo func(context.Context, string)
}

func NewHandlerByConfig(c HandlerConfig, write func(context.Context, interface{}) error, modelType *reflect.Type, retry func(context.Context, []byte, map[string]string) error, validate func(context.Context, *Message) error, handleError func(context.Context, []byte, map[string]string) error, logs ...func(context.Context, string)) *Handler {
return NewHandlerWithRetryService(write, modelType, c.LimitRetry, retry, c.RetryCountName, validate, handleError, c.Goroutines, logs...)
func NewHandlerByConfig(c HandlerConfig, write func(context.Context, interface{}) error, modelType *reflect.Type, retry func(context.Context, []byte, map[string]string) error, validate func(context.Context, *Message) error, handleError func(context.Context, []byte, map[string]string) error, unmarshal func([]byte, interface{}) error, logs ...func(context.Context, string)) *Handler {
return NewHandlerWithRetryService(write, modelType, c.LimitRetry, retry, c.RetryCountName, validate, handleError, c.Goroutines, unmarshal, logs...)
}
func NewHandlerWithRetryConfig(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, c *RetryConfig, goroutines bool, handleError func(context.Context, []byte, map[string]string) error, logs ...func(context.Context, string)) *Handler {
func NewHandlerWithRetryConfig(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, c *RetryConfig, goroutines bool, handleError func(context.Context, []byte, map[string]string) error, unmarshal func([]byte, interface{}) error, logs ...func(context.Context, string)) *Handler {
if c == nil {
return NewHandlerWithRetries(write, modelType, validate, nil, handleError, goroutines, logs...)
return NewHandlerWithRetries(write, modelType, validate, nil, handleError, goroutines, unmarshal, logs...)
}
retries := DurationsFromValue(*c, "Retry", 20)
if len(retries) == 0 {
return NewHandlerWithRetries(write, modelType, validate, nil, handleError, goroutines, logs...)
return NewHandlerWithRetries(write, modelType, validate, nil, handleError, goroutines, unmarshal, logs...)
}
return NewHandlerWithRetries(write, modelType, validate, retries, handleError, goroutines, logs...)
return NewHandlerWithRetries(write, modelType, validate, retries, handleError, goroutines, unmarshal, logs...)
}
func NewHandlerWithRetries(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, retries []time.Duration, handleError func(context.Context, []byte, map[string]string) error, goroutines bool, logs ...func(context.Context, string)) *Handler {
func NewHandlerWithRetries(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, retries []time.Duration, handleError func(context.Context, []byte, map[string]string) error, goroutines bool, unmarshal func([]byte, interface{}) error, logs ...func(context.Context, string)) *Handler {
if unmarshal == nil {
unmarshal = json.Unmarshal
}
c := &Handler{
ModelType: modelType,
Write: write,
Validate: validate,
Unmarshal: unmarshal,
Goroutines: goroutines,
Error: handleError,
}
Expand All @@ -60,19 +65,24 @@ func NewHandlerWithRetries(write func(context.Context, interface{}) error, model
}
return c
}
func NewHandler(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, goroutines bool, logs ...func(context.Context, string)) *Handler {
return NewHandlerWithRetryService(write, modelType, -1, nil, "", validate, nil, goroutines, logs...)
func NewHandler(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, goroutines bool, unmarshal func([]byte, interface{}) error, logs ...func(context.Context, string)) *Handler {
return NewHandlerWithRetryService(write, modelType, -1, nil, "", validate, nil, goroutines, unmarshal, logs...)
}
func NewHandlerWithRetryService(write func(context.Context, interface{}) error, modelType *reflect.Type, limitRetry int, retry func(context.Context, []byte, map[string]string) error, retryCountName string, validate func(context.Context, *Message) error,
handleError func(context.Context, []byte, map[string]string) error,
goroutines bool, logs ...func(context.Context, string)) *Handler {
handleError func(context.Context, []byte, map[string]string) error,
goroutines bool,
unmarshal func([]byte, interface{}) error,
logs ...func(context.Context, string)) *Handler {
if len(retryCountName) == 0 {
retryCountName = "retryCount"
}
if retry != nil && handleError == nil {
e1 := NewErrorHandler(logs...)
handleError = e1.HandleError
}
if unmarshal == nil {
unmarshal = json.Unmarshal
}
c := &Handler{
ModelType: modelType,
Write: write,
Expand All @@ -81,6 +91,7 @@ func NewHandlerWithRetryService(write func(context.Context, interface{}) error,
Retry: retry,
RetryCountName: retryCountName,
Error: handleError,
Unmarshal: unmarshal,
Goroutines: goroutines,
}
if len(logs) >= 1 {
Expand Down Expand Up @@ -121,12 +132,10 @@ func (c *Handler) Handle(ctx context.Context, data []byte, header map[string]str
var item interface{}
if message.Value != nil {
item = message.Value
} else {
item = message.Data
}
if c.ModelType != nil && item == nil {
v := InitModel(*c.ModelType)
er1 := json.Unmarshal(message.Data, v)
er1 := c.Unmarshal(message.Data, v)
if er1 != nil {
if c.LogError != nil {
c.LogError(ctx, fmt.Sprintf(`cannot unmarshal item: %s. Error: %s`, message.Data, er1.Error()))
Expand Down
File renamed without changes.
17 changes: 12 additions & 5 deletions validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@ import (

type Validator struct {
modelType reflect.Type
check func(ctx context.Context, model interface{}) error
Check func(ctx context.Context, model interface{}) error
Unmarshal func([]byte, interface{}) error
}

func NewValidator(modelType reflect.Type, check func(context.Context, interface{}) error) *Validator {
v := &Validator{modelType: modelType, check: check}
func NewValidator(modelType reflect.Type, check func(context.Context, interface{}) error, opts...func([]byte, interface{}) error) *Validator {
var unmarshal func([]byte, interface{}) error
if len(opts) > 0 && opts[0] != nil {
unmarshal = opts[0]
} else {
unmarshal = json.Unmarshal
}
v := &Validator{modelType: modelType, Check: check, Unmarshal: unmarshal}
return v
}

func (v *Validator) Validate(ctx context.Context, message *Message) error {
item := InitModel(v.modelType)
err := json.Unmarshal(message.Data, item)
err := v.Unmarshal(message.Data, item)
if err != nil {
return fmt.Errorf(`cannot unmarshal item: %s. Error: %s`, message.Data, err.Error())
}
message.Value = item
return v.check(ctx, message.Value)
return v.Check(ctx, message.Value)
}

0 comments on commit ca77384

Please sign in to comment.