From b41ece0366e9eeee4888d6f9b17b3a58161c3355 Mon Sep 17 00:00:00 2001 From: MinhTran1997 Date: Wed, 6 Oct 2021 18:17:11 +0700 Subject: [PATCH] add retry writer & sender --- retry_sender.go | 37 +++++++++++++++++++++++++++---------- retry_writer.go | 39 ++++++++++++++++++++++++++++----------- 2 files changed, 55 insertions(+), 21 deletions(-) diff --git a/retry_sender.go b/retry_sender.go index 9db662a..2cbee80 100644 --- a/retry_sender.go +++ b/retry_sender.go @@ -10,37 +10,54 @@ type RetrySender struct { send func(ctx context.Context, data []byte, attributes map[string]string) (string, error) Retries []time.Duration Log func(context.Context, string) + Error func(ctx context.Context, data []byte, attrs map[string]string) error Goroutines bool } -func NewSenderByConfig(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), c *RetryConfig) *RetrySender { +func NewSenderByConfig(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), c *RetryConfig, options... func(context.Context, []byte, map[string]string) error) *RetrySender { + var handlerError func(context.Context, []byte, map[string]string) error + if len(options) > 0 { + handlerError = options[0] + } if c == nil { - return &RetrySender{send: send, Log: log, Goroutines: goroutines} + return &RetrySender{send: send, Log: log, Goroutines: goroutines, Error: handlerError} } else { retries := DurationsFromValue(*c, "Retry", 20) if len(retries) == 0 { return &RetrySender{send: send, Log: log, Goroutines: goroutines} } - return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines} + return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines, Error: handlerError} } } -func NewSender(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), retries ...time.Duration) *RetrySender { - return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines} +func NewSender(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), retries []time.Duration, options... func(context.Context, []byte, map[string]string) error) *RetrySender { + var handlerError func(context.Context, []byte, map[string]string) error + if len(options) > 0 { + handlerError = options[0] + } + return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines, Error: handlerError} } func (c *RetrySender) Send(ctx context.Context, data []byte, attributes map[string]string) (string, error) { if !c.Goroutines { - return Send(ctx, c.send, data, attributes, c.Log, c.Retries...) + return Send(ctx, c.send, data, attributes, c.Log, c.Error, c.Retries...) } else { - go Send(ctx, c.send, data, attributes, c.Log, c.Retries...) + go Send(ctx, c.send, data, attributes, c.Log, c.Error, c.Retries...) return "", nil } } -func Send(ctx context.Context, send func(context.Context, []byte, map[string]string) (string, error), data []byte, attributes map[string]string, log func(context.Context, string), retries ...time.Duration) (string, error) { +func Send(ctx context.Context, send func(context.Context, []byte, map[string]string) (string, error), data []byte, attributes map[string]string, log func(context.Context, string), handlerError func(context.Context, []byte, map[string]string) error, retries ...time.Duration) (string, error) { l := len(retries) if l == 0 { - return send(ctx, data, attributes) + r, err := send(ctx, data, attributes) + if err != nil && handlerError != nil { + handlerError(ctx, data, attributes) + } + return r, err } else { - return SendWithRetries(ctx, send, data, attributes, retries, log) + r, err := SendWithRetries(ctx, send, data, attributes, retries, log) + if err != nil && handlerError != nil { + handlerError(ctx, data, attributes) + } + return r, err } } diff --git a/retry_writer.go b/retry_writer.go index d306274..670a447 100644 --- a/retry_writer.go +++ b/retry_writer.go @@ -7,40 +7,57 @@ import ( ) type RetryWriter struct { - write func(ctx context.Context, model interface{}) error + write func(ctx context.Context, model interface{}) error Retries []time.Duration Log func(context.Context, string) + WriteError func(ctx context.Context, model interface{}) error Goroutines bool } -func NewWriterByConfig(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), c *RetryConfig) *RetryWriter { +func NewWriterByConfig(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), c *RetryConfig, options ...func(context.Context, interface{}) error) *RetryWriter { + var writeError func(context.Context, interface{}) error + if len(options) > 0 { + writeError = options[0] + } if c == nil { return &RetryWriter{write: write, Log: log, Goroutines: goroutines} } else { retries := DurationsFromValue(*c, "Retry", 20) if len(retries) == 0 { - return &RetryWriter{write: write, Log: log, Goroutines: goroutines} + return &RetryWriter{write: write, Log: log, Goroutines: goroutines, WriteError: writeError} } - return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines} + return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines, WriteError: writeError} } } -func NewWriter(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), retries ...time.Duration) *RetryWriter { - return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines} +func NewWriter(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), retries []time.Duration, options ...func(context.Context, interface{}) error) *RetryWriter { + var writeError func(context.Context, interface{}) error + if len(options) > 0 { + writeError = options[0] + } + return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines, WriteError: writeError} } func (c *RetryWriter) Write(ctx context.Context, model interface{}) error { if !c.Goroutines { - return WriteTo(ctx, c.write, model, c.Log, c.Retries...) + return WriteTo(ctx, c.write, model, c.Log, c.WriteError, c.Retries...) } else { - go WriteTo(ctx, c.write, model, c.Log, c.Retries...) + go WriteTo(ctx, c.write, model, c.Log, c.WriteError, c.Retries...) return nil } } -func WriteTo(ctx context.Context, write func(context.Context, interface{}) error, model interface{}, log func(context.Context, string), retries ...time.Duration) error { +func WriteTo(ctx context.Context, write func(context.Context, interface{}) error, model interface{}, log func(context.Context, string), writeError func(context.Context, interface{}) error, retries ...time.Duration) error { l := len(retries) if l == 0 { - return write(ctx, model) + err := write(ctx, model) + if err != nil && writeError != nil { + writeError(ctx, model) + } + return err } else { - return WriteWithRetries(ctx, write, model, retries, log) + err := WriteWithRetries(ctx, write, model, retries, log) + if err != nil && writeError != nil { + writeError(ctx, model) + } + return err } }