Skip to content

Commit

Permalink
add retry writer & sender
Browse files Browse the repository at this point in the history
  • Loading branch information
MinhTran1997 committed Oct 6, 2021
1 parent f941865 commit b41ece0
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 21 deletions.
37 changes: 27 additions & 10 deletions retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,54 @@ type RetrySender struct {
send func(ctx context.Context, data []byte, attributes map[string]string) (string, error)
Retries []time.Duration
Log func(context.Context, string)
Error func(ctx context.Context, data []byte, attrs map[string]string) error
Goroutines bool
}

func NewSenderByConfig(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), c *RetryConfig) *RetrySender {
func NewSenderByConfig(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), c *RetryConfig, options... func(context.Context, []byte, map[string]string) error) *RetrySender {
var handlerError func(context.Context, []byte, map[string]string) error
if len(options) > 0 {
handlerError = options[0]
}
if c == nil {
return &RetrySender{send: send, Log: log, Goroutines: goroutines}
return &RetrySender{send: send, Log: log, Goroutines: goroutines, Error: handlerError}
} else {
retries := DurationsFromValue(*c, "Retry", 20)
if len(retries) == 0 {
return &RetrySender{send: send, Log: log, Goroutines: goroutines}
}
return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines}
return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines, Error: handlerError}
}
}
func NewSender(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), retries ...time.Duration) *RetrySender {
return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines}
func NewSender(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), retries []time.Duration, options... func(context.Context, []byte, map[string]string) error) *RetrySender {
var handlerError func(context.Context, []byte, map[string]string) error
if len(options) > 0 {
handlerError = options[0]
}
return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines, Error: handlerError}
}
func (c *RetrySender) Send(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
if !c.Goroutines {
return Send(ctx, c.send, data, attributes, c.Log, c.Retries...)
return Send(ctx, c.send, data, attributes, c.Log, c.Error, c.Retries...)
} else {
go Send(ctx, c.send, data, attributes, c.Log, c.Retries...)
go Send(ctx, c.send, data, attributes, c.Log, c.Error, c.Retries...)
return "", nil
}
}
func Send(ctx context.Context, send func(context.Context, []byte, map[string]string) (string, error), data []byte, attributes map[string]string, log func(context.Context, string), retries ...time.Duration) (string, error) {
func Send(ctx context.Context, send func(context.Context, []byte, map[string]string) (string, error), data []byte, attributes map[string]string, log func(context.Context, string), handlerError func(context.Context, []byte, map[string]string) error, retries ...time.Duration) (string, error) {
l := len(retries)
if l == 0 {
return send(ctx, data, attributes)
r, err := send(ctx, data, attributes)
if err != nil && handlerError != nil {
handlerError(ctx, data, attributes)
}
return r, err
} else {
return SendWithRetries(ctx, send, data, attributes, retries, log)
r, err := SendWithRetries(ctx, send, data, attributes, retries, log)
if err != nil && handlerError != nil {
handlerError(ctx, data, attributes)
}
return r, err
}
}

Expand Down
39 changes: 28 additions & 11 deletions retry_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,57 @@ import (
)

type RetryWriter struct {
write func(ctx context.Context, model interface{}) error
write func(ctx context.Context, model interface{}) error
Retries []time.Duration
Log func(context.Context, string)
WriteError func(ctx context.Context, model interface{}) error
Goroutines bool
}

func NewWriterByConfig(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), c *RetryConfig) *RetryWriter {
func NewWriterByConfig(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), c *RetryConfig, options ...func(context.Context, interface{}) error) *RetryWriter {
var writeError func(context.Context, interface{}) error
if len(options) > 0 {
writeError = options[0]
}
if c == nil {
return &RetryWriter{write: write, Log: log, Goroutines: goroutines}
} else {
retries := DurationsFromValue(*c, "Retry", 20)
if len(retries) == 0 {
return &RetryWriter{write: write, Log: log, Goroutines: goroutines}
return &RetryWriter{write: write, Log: log, Goroutines: goroutines, WriteError: writeError}
}
return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines}
return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines, WriteError: writeError}
}
}
func NewWriter(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), retries ...time.Duration) *RetryWriter {
return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines}
func NewWriter(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), retries []time.Duration, options ...func(context.Context, interface{}) error) *RetryWriter {
var writeError func(context.Context, interface{}) error
if len(options) > 0 {
writeError = options[0]
}
return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines, WriteError: writeError}
}
func (c *RetryWriter) Write(ctx context.Context, model interface{}) error {
if !c.Goroutines {
return WriteTo(ctx, c.write, model, c.Log, c.Retries...)
return WriteTo(ctx, c.write, model, c.Log, c.WriteError, c.Retries...)
} else {
go WriteTo(ctx, c.write, model, c.Log, c.Retries...)
go WriteTo(ctx, c.write, model, c.Log, c.WriteError, c.Retries...)
return nil
}
}
func WriteTo(ctx context.Context, write func(context.Context, interface{}) error, model interface{}, log func(context.Context, string), retries ...time.Duration) error {
func WriteTo(ctx context.Context, write func(context.Context, interface{}) error, model interface{}, log func(context.Context, string), writeError func(context.Context, interface{}) error, retries ...time.Duration) error {
l := len(retries)
if l == 0 {
return write(ctx, model)
err := write(ctx, model)
if err != nil && writeError != nil {
writeError(ctx, model)
}
return err
} else {
return WriteWithRetries(ctx, write, model, retries, log)
err := WriteWithRetries(ctx, write, model, retries, log)
if err != nil && writeError != nil {
writeError(ctx, model)
}
return err
}
}

Expand Down

0 comments on commit b41ece0

Please sign in to comment.