Skip to content

Commit

Permalink
push retry_sender bytes_writer retry_write files
Browse files Browse the repository at this point in the history
  • Loading branch information
vanhop993 committed Oct 6, 2021
1 parent f3b3fdf commit f941865
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 9 deletions.
15 changes: 15 additions & 0 deletions bytes_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package mq

import "context"

type BytesWriter struct {
Send func(ctx context.Context, data []byte, attributes map[string]string) (string, error)
}
func NewBytesWriter(send func(ctx context.Context, data []byte, attributes map[string]string) (string, error)) *BytesWriter {
return &BytesWriter{Send: send}
}
func (w *BytesWriter) Write(ctx context.Context, model interface{}) error {
data := model.([]byte)
_, err := w.Send(ctx, data, nil)
return err
}
18 changes: 9 additions & 9 deletions retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type RetrySender struct {
Goroutines bool
}

func NewProducerByConfig(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), c *RetryConfig) *RetrySender {
func NewSenderByConfig(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), c *RetryConfig) *RetrySender {
if c == nil {
return &RetrySender{send: send, Log: log, Goroutines: goroutines}
} else {
Expand All @@ -24,23 +24,23 @@ func NewProducerByConfig(send func(context.Context, []byte, map[string]string) (
return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines}
}
}
func NewProducer(produce func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), retries ...time.Duration) *RetrySender {
return &RetrySender{send: produce, Log: log, Retries: retries, Goroutines: goroutines}
func NewSender(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), retries ...time.Duration) *RetrySender {
return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines}
}
func (c *RetrySender) Produce(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
func (c *RetrySender) Send(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
if !c.Goroutines {
return Send(ctx, c.send, data, attributes, c.Log, c.Retries...)
} else {
go Send(ctx, c.send, data, attributes, c.Log, c.Retries...)
return "", nil
}
}
func Send(ctx context.Context, produce func(context.Context, []byte, map[string]string) (string, error), data []byte, attributes map[string]string, log func(context.Context, string), retries ...time.Duration) (string, error) {
func Send(ctx context.Context, send func(context.Context, []byte, map[string]string) (string, error), data []byte, attributes map[string]string, log func(context.Context, string), retries ...time.Duration) (string, error) {
l := len(retries)
if l == 0 {
return produce(ctx, data, attributes)
return send(ctx, data, attributes)
} else {
return SendWithRetries(ctx, produce, data, attributes, retries, log)
return SendWithRetries(ctx, send, data, attributes, retries, log)
}
}

Expand All @@ -55,12 +55,12 @@ func SendWithRetries(ctx context.Context, produce func(context.Context, []byte,
id2, er2 := produce(ctx, data, attributes)
id = id2
if er2 == nil && log != nil {
log(ctx, fmt.Sprintf("Produce successfully after %d retries %s", i, data))
log(ctx, fmt.Sprintf("Send successfully after %d retries %s", i, data))
}
return er2
}, log)
if err != nil && log != nil {
log(ctx, fmt.Sprintf("Failed to Produce after %d retries: %s. Error: %s.", len(retries), data, err.Error()))
log(ctx, fmt.Sprintf("Failed to send after %d retries: %s. Error: %s.", len(retries), data, err.Error()))
}
return id, err
}
66 changes: 66 additions & 0 deletions retry_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package mq

import (
"context"
"fmt"
"time"
)

type RetryWriter struct {
write func(ctx context.Context, model interface{}) error
Retries []time.Duration
Log func(context.Context, string)
Goroutines bool
}

func NewWriterByConfig(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), c *RetryConfig) *RetryWriter {
if c == nil {
return &RetryWriter{write: write, Log: log, Goroutines: goroutines}
} else {
retries := DurationsFromValue(*c, "Retry", 20)
if len(retries) == 0 {
return &RetryWriter{write: write, Log: log, Goroutines: goroutines}
}
return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines}
}
}
func NewWriter(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), retries ...time.Duration) *RetryWriter {
return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines}
}
func (c *RetryWriter) Write(ctx context.Context, model interface{}) error {
if !c.Goroutines {
return WriteTo(ctx, c.write, model, c.Log, c.Retries...)
} else {
go WriteTo(ctx, c.write, model, c.Log, c.Retries...)
return nil
}
}
func WriteTo(ctx context.Context, write func(context.Context, interface{}) error, model interface{}, log func(context.Context, string), retries ...time.Duration) error {
l := len(retries)
if l == 0 {
return write(ctx, model)
} else {
return WriteWithRetries(ctx, write, model, retries, log)
}
}

func WriteWithRetries(ctx context.Context, write func(context.Context, interface{}) error, model interface{}, retries []time.Duration, log func(context.Context, string)) error {
er1 := write(ctx, model)
if er1 == nil {
return er1
}
i := 0
err := Retry(ctx, retries, func() (err error) {
i = i + 1
er2 := write(ctx, model)

if er2 == nil && log != nil {
log(ctx, fmt.Sprintf("Write successfully after %d retries %s", i, model))
}
return er2
}, log)
if err != nil && log != nil {
log(ctx, fmt.Sprintf("Failed to write after %d retries: %s. Error: %s.", len(retries), model, err.Error()))
}
return err
}

0 comments on commit f941865

Please sign in to comment.