From 096c7835043178fe19ddb0413fce73e5ee92380f Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Sun, 31 Oct 2021 23:54:26 +0700 Subject: [PATCH] Refactor code --- README.md | 7 +- confluent/consumer.go | 16 +++++ confluent/producer.go | 116 ++++++++++++++++++++++++++----- confluent/producer_config.go | 3 +- confluent/simple_consumer.go | 17 ++++- confluent/simple_producer.go | 128 +++++++++++++++++++++++++---------- converter/converter.go | 22 +++++- kafka/simple_writer.go | 19 ++++++ kafka/writer.go | 19 ++++++ sarama/simple_writer.go | 24 +++++++ sarama/writer.go | 24 +++++++ 11 files changed, 338 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index ebc75bc..a1c7a07 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ Support these message queues: - Amazon Simple Queue Service (SQS) at [sqs](https://github.com/core-go/mq/tree/main/sqs) - Google Cloud Pub/Sub at [pubsub](https://github.com/core-go/mq/tree/main/pubsub) -- Kafka: at [segmentio/kafka-go](https://github.com/core-go/mq/tree/main/kafka) and [Shopify/sarama](https://github.com/core-go/mq/tree/main/sarama) +- Kafka: at [segmentio/kafka-go](https://github.com/core-go/mq/tree/main/kafka), [Shopify/sarama](https://github.com/core-go/mq/tree/main/sarama) and [confluent](https://github.com/core-go/mq/tree/main/confluent) - NATS at [nats](https://github.com/core-go/mq/tree/main/nats) - Active MQ at [amq](https://github.com/core-go/mq/tree/main/amq) - RabbitMQ at [rabbitmq](https://github.com/core-go/mq/tree/main/rabbitmq) @@ -24,3 +24,8 @@ Import: ```go import "github.com/core-go/mq" ``` + +Build for confluent: +```go +go build -buildmode=exe main.go +``` diff --git a/confluent/consumer.go b/confluent/consumer.go index bcffdc1..a98c711 100644 --- a/confluent/consumer.go +++ b/confluent/consumer.go @@ -38,6 +38,22 @@ func NewConsumerByConfig(c ConsumerConfig, options ...func(context.Context, []by }, nil } } +func NewConsumerByConfigMap(conf kafka.ConfigMap, topics []string, options ...func(context.Context, []byte) ([]byte, error)) (*Consumer, error) { + consumer, err := kafka.NewConsumer(&conf) + if err != nil { + fmt.Printf("Failed to create Consumer: %s\n", err) + return nil, err + } + var convert func(context.Context, []byte) ([]byte, error) + if len(options) > 0 { + convert = options[0] + } + return &Consumer{ + Consumer: consumer, + Topics: topics, + Convert: convert, + }, nil +} func NewConsumer(consumer *kafka.Consumer, topics []string, options ...func(context.Context, []byte) ([]byte, error)) *Consumer { var convert func(context.Context, []byte) ([]byte, error) if len(options) > 0 { diff --git a/confluent/producer.go b/confluent/producer.go index 12a0557..198b024 100644 --- a/confluent/producer.go +++ b/confluent/producer.go @@ -12,34 +12,67 @@ type ( Producer struct { Producer *kafka.Producer Topic string + Timeout int Convert func(context.Context, []byte) ([]byte, error) + Generate func() string } ) - -func NewProducerByConfig(c ProducerConfig, options ...func(context.Context, []byte) ([]byte, error)) (*Producer, error) { +func NewProducerByConfigMap(c kafka.ConfigMap, topic string, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*Producer, error) { + p, err := kafka.NewProducer(&c) + if err != nil { + fmt.Printf("Failed to create Producer: %s\n", err) + return nil, err + } + var generate func() string + if len(options) > 0 { + generate = options[0] + } + if timeout <= 0 { + timeout = 100 + } + pd := &Producer{ + Producer: p, + Topic: topic, + Timeout: timeout, + Convert: convert, + Generate: generate, + } + return pd, nil +} +func NewProducerByConfig(c ProducerConfig, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*Producer, error) { p, err := NewKafkaProducerByConfig(c) if err != nil { fmt.Printf("Failed to create Producer: %s\n", err) return nil, err } - var convert func(context.Context, []byte) ([]byte, error) + var generate func() string if len(options) > 0 { - convert = options[0] + generate = options[0] + } + timeout := c.Timeout + if timeout <= 0 { + timeout = 100 } - return &Producer{ + pd := &Producer{ Producer: p, Topic: c.Topic, + Timeout: timeout, Convert: convert, - }, nil + Generate: generate, + } + return pd, nil } -func NewProducer(producer *kafka.Producer, topic string, options ...func(context.Context, []byte)([]byte, error)) *Producer { - var convert func(context.Context, []byte) ([]byte, error) +func NewProducer(producer *kafka.Producer, topic string, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) *Producer { + var generate func() string if len(options) > 0 { - convert = options[0] + generate = options[0] + } + if timeout <= 0 { + timeout = 100 } - return &Producer{Producer: producer, Topic: topic, Convert: convert} + return &Producer{Producer: producer, Topic: topic, Timeout: timeout, Convert: convert, Generate: generate} } -func NewProducerByConfigAndRetries(c ProducerConfig, convert func(context.Context, []byte)([]byte, error), retries ...time.Duration) (*Producer, error) { +func NewProducerByConfigAndRetries(c ProducerConfig, convert func(context.Context, []byte) ([]byte, error), retries ...time.Duration) (*Producer, error) { if len(retries) == 0 { return NewProducerByConfig(c, convert) } else { @@ -47,8 +80,8 @@ func NewProducerByConfigAndRetries(c ProducerConfig, convert func(context.Contex } } -func NewProducerWithRetryArray(c ProducerConfig, retries []time.Duration, options...func(context.Context, []byte)([]byte, error)) (*Producer, error) { - p, err := NewProducerByConfig(c, options...) +func NewProducerWithRetryArray(c ProducerConfig, retries []time.Duration, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*Producer, error) { + p, err := NewProducerByConfig(c, convert, options...) if err == nil { return p, nil } @@ -56,7 +89,7 @@ func NewProducerWithRetryArray(c ProducerConfig, retries []time.Duration, option i := 0 err = Retry(retries, func() (err error) { i = i + 1 - p2, er2 := NewProducerByConfig(c) + p2, er2 := NewProducerByConfig(c, convert, options...) p = p2 if er2 == nil { log.Println(fmt.Sprintf("create new Producer successfully after %d retries", i)) @@ -84,6 +117,57 @@ func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes m if messageAttributes != nil { msg.Headers = MapToHeader(messageAttributes) } - - return Produce(p.Producer, &msg) + if p.Generate != nil { + id := p.Generate() + msg.Key = []byte(id) + } + deliveryChan := make(chan kafka.Event) + defer close(deliveryChan) + err = p.Producer.Produce(&msg, deliveryChan) + if err != nil { + return msg.String(), err + } + p.Producer.Flush(p.Timeout) + e := <-deliveryChan + switch m := e.(type) { + case *kafka.Message: + return msg.String(), m.TopicPartition.Error + case kafka.Error: + return "", m + } + return msg.String(), nil +} +func (p *Producer) ProduceWithKey(ctx context.Context, data []byte, key []byte, messageAttributes map[string]string) (string, error) { + var binary = data + var err error + if p.Convert != nil { + binary, err = p.Convert(ctx, data) + if err != nil { + return "", err + } + } + msg := kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &p.Topic, Partition: kafka.PartitionAny}, + Value: binary} + if messageAttributes != nil { + msg.Headers = MapToHeader(messageAttributes) + } + if key != nil { + msg.Key = key + } + deliveryChan := make(chan kafka.Event) + defer close(deliveryChan) + err = p.Producer.Produce(&msg, deliveryChan) + if err != nil { + return msg.String(), err + } + p.Producer.Flush(p.Timeout) + e := <-deliveryChan + switch m := e.(type) { + case *kafka.Message: + return msg.String(), m.TopicPartition.Error + case kafka.Error: + return "", m + } + return msg.String(), nil } diff --git a/confluent/producer_config.go b/confluent/producer_config.go index 9795ca9..911473e 100644 --- a/confluent/producer_config.go +++ b/confluent/producer_config.go @@ -2,7 +2,8 @@ package kafka type ProducerConfig struct { Brokers []string `mapstructure:"brokers" json:"brokers,omitempty" gorm:"column:brokers" bson:"brokers,omitempty" dynamodbav:"brokers,omitempty" firestore:"brokers,omitempty"` - Topic string `mapstructure:"Topic" json:"Topic,omitempty" gorm:"column:Topic" bson:"Topic,omitempty" dynamodbav:"Topic,omitempty" firestore:"Topic,omitempty"` + Topic string `mapstructure:"topic" json:"topic,omitempty" gorm:"column:topic" bson:"topic,omitempty" dynamodbav:"topic,omitempty" firestore:"topic,omitempty"` + Timeout int `mapstructure:"timeout" json:"timeout,omitempty" gorm:"column:timeout" bson:"timeout,omitempty" dynamodbav:"timeout,omitempty" firestore:"timeout,omitempty"` Client ClientConfig `mapstructure:"client" json:"client,omitempty" gorm:"column:client" bson:"client,omitempty" dynamodbav:"client,omitempty" firestore:"client,omitempty"` MaxOpenRequests *int `mapstructure:"max_open_requests" json:"maxOpenRequests,omitempty" gorm:"column:maxopenrequests" bson:"maxOpenRequests,omitempty" dynamodbav:"maxOpenRequests,omitempty" firestore:"maxOpenRequests,omitempty"` RequiredAcks *int16 `mapstructure:"required_acks" json:"requiredAcks,omitempty" gorm:"column:requiredacks" bson:"requiredAcks,omitempty" dynamodbav:"requiredAcks,omitempty" firestore:"requiredAcks,omitempty"` diff --git a/confluent/simple_consumer.go b/confluent/simple_consumer.go index 3cffd2d..54871e5 100644 --- a/confluent/simple_consumer.go +++ b/confluent/simple_consumer.go @@ -14,7 +14,22 @@ type ( Convert func(context.Context, []byte) ([]byte, error) } ) - +func NewSimpleConsumerByConfigMap(conf kafka.ConfigMap, topics []string, options ...func(context.Context, []byte) ([]byte, error)) (*SimpleConsumer, error) { + consumer, err := kafka.NewConsumer(&conf) + if err != nil { + fmt.Printf("Failed to create Consumer: %s\n", err) + return nil, err + } + var convert func(context.Context, []byte) ([]byte, error) + if len(options) > 0 { + convert = options[0] + } + return &SimpleConsumer{ + Consumer: consumer, + Topics: topics, + Convert: convert, + }, nil +} func NewSimpleConsumerByConfig(c ConsumerConfig, options ...func(context.Context, []byte) ([]byte, error)) (*SimpleConsumer, error) { consumer, err := NewKafkaConsumerByConfig(c) if err != nil { diff --git a/confluent/simple_producer.go b/confluent/simple_producer.go index 2f621e2..47c4778 100644 --- a/confluent/simple_producer.go +++ b/confluent/simple_producer.go @@ -10,31 +10,62 @@ import ( type ( SimpleProducer struct { Producer *kafka.Producer - Convert func(context.Context, []byte)([]byte, error) + Timeout int + Convert func(context.Context, []byte) ([]byte, error) + Generate func() string } ) - -func NewSimpleProducerByConfig(c ProducerConfig, options...func(context.Context, []byte)([]byte, error)) (*SimpleProducer, error) { +func NewSimpleProducerByConfigMap(c kafka.ConfigMap, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*SimpleProducer, error) { + p, err := kafka.NewProducer(&c) + if err != nil { + fmt.Printf("Failed to create Producer: %s\n", err) + return nil, err + } + var generate func() string + if len(options) > 0 { + generate = options[0] + } + if timeout <= 0 { + timeout = 100 + } + pd := &SimpleProducer{ + Producer: p, + Timeout: timeout, + Convert: convert, + Generate: generate, + } + return pd, nil +} +func NewSimpleProducerByConfig(c ProducerConfig, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*SimpleProducer, error) { p, err := NewKafkaProducerByConfig(c) if err != nil { fmt.Printf("Failed to create Producer: %s\n", err) return nil, err } - var convert func(context.Context, []byte)([]byte, error) + var generate func() string if len(options) > 0 { - convert = options[0] + generate = options[0] + } + if timeout <= 0 { + timeout = 100 } - return &SimpleProducer{ + pd := &SimpleProducer{ Producer: p, - Convert: convert, - }, nil + Timeout: timeout, + Convert: convert, + Generate: generate, + } + return pd, nil } -func NewSimpleProducer(producer *kafka.Producer, options...func(context.Context, []byte)([]byte, error)) *SimpleProducer { - var convert func(context.Context, []byte)([]byte, error) +func NewSimpleProducer(producer *kafka.Producer, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) *SimpleProducer { + var generate func() string if len(options) > 0 { - convert = options[0] + generate = options[0] } - return &SimpleProducer{Producer: producer, Convert: convert} + if timeout <= 0 { + timeout = 100 + } + return &SimpleProducer{Producer: producer, Timeout: timeout, Convert: convert, Generate: generate} } func NewKafkaProducerByConfig(c ProducerConfig) (*kafka.Producer, error) { conf := kafka.ConfigMap{ @@ -84,33 +115,60 @@ func (p *SimpleProducer) Produce(ctx context.Context, topic string, data []byte, if messageAttributes != nil { msg.Headers = MapToHeader(messageAttributes) } - return Produce(p.Producer, &msg) -} - -func Produce(producer *kafka.Producer, msg *kafka.Message) (string, error) { - deliveryChan := make(chan kafka.Event, 10000) - err := producer.Produce(msg, deliveryChan) + if p.Generate != nil { + id := p.Generate() + msg.Key = []byte(id) + } + deliveryChan := make(chan kafka.Event) + defer close(deliveryChan) + err = p.Producer.Produce(&msg, deliveryChan) if err != nil { - fmt.Printf("Failed to produce msg: %s\n", err) - return "", err - } - go func() { - for e := range producer.Events() { - switch ev := e.(type) { - case *kafka.Message: - if ev.TopicPartition.Error != nil { - fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition) - } else { - fmt.Printf("Successfully produced record to Topic %s partition [%d] @ offset %v\n", - *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset) - } - } + return msg.String(), err + } + p.Producer.Flush(p.Timeout) + e := <-deliveryChan + switch m := e.(type) { + case *kafka.Message: + return msg.String(), m.TopicPartition.Error + case kafka.Error: + return "", m + } + return msg.String(), nil +} +func (p *SimpleProducer) ProduceWithKey(ctx context.Context, topic string, data []byte, key []byte, messageAttributes map[string]string) (string, error) { + var binary = data + var err error + if p.Convert != nil { + binary, err = p.Convert(ctx, data) + if err != nil { + return "", err } - }() - + } + msg := kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, + Value: binary} + if messageAttributes != nil { + msg.Headers = MapToHeader(messageAttributes) + } + if key != nil { + msg.Key = key + } + deliveryChan := make(chan kafka.Event) + defer close(deliveryChan) + err = p.Producer.Produce(&msg, deliveryChan) + if err != nil { + return msg.String(), err + } + p.Producer.Flush(p.Timeout) + e := <-deliveryChan + switch m := e.(type) { + case *kafka.Message: + return msg.String(), m.TopicPartition.Error + case kafka.Error: + return "", m + } return msg.String(), nil } - func MapToHeader(messageAttributes map[string]string) []kafka.Header { headers := make([]kafka.Header, 0) for k, v := range messageAttributes { diff --git a/converter/converter.go b/converter/converter.go index d75a2f8..55408dc 100644 --- a/converter/converter.go +++ b/converter/converter.go @@ -4,7 +4,7 @@ import ( "fmt" "time" ) - +const layout = "2006-01-02" func TimeToMilliseconds(time string) (int64, error) { var h, m, s int _, err := fmt.Sscanf(time, "%02d:%02d:%02d", &h, &m, &s) @@ -14,7 +14,6 @@ func TimeToMilliseconds(time string) (int64, error) { return int64(h * 3600000 + m * 60000 + s * 1000), nil } func DateToUnixTime(s string) (int64, error) { - layout := "2006-01-02" date, err := time.Parse(layout, s) if err != nil { return 0, err @@ -22,7 +21,6 @@ func DateToUnixTime(s string) (int64, error) { return date.Unix() * 1000, nil } func DateToUnixNano(s string) (int64, error) { - layout := "2006-01-02" date, err := time.Parse(layout, s) if err != nil { return 0, err @@ -44,3 +42,21 @@ func MillisecondsToTimeString(milliseconds int) string { second := milliseconds / secondUint return fmt.Sprintf("%02d:%02d:%02d", hour, minute, second) } +func StringToAvroDate(date *string) (*int, error) { + if date == nil { + return nil, nil + } + d, err := time.Parse(layout, *date) + if err != nil { + return nil, err + } + i := int(d.Unix() / int64(60*60*24)) + return &i, nil +} +func ToAvroDate(date *time.Time) *int { + if date == nil { + return nil + } + i := int(date.Unix() / int64(60*60*24)) + return &i +} diff --git a/kafka/simple_writer.go b/kafka/simple_writer.go index 35e7b06..7c33895 100644 --- a/kafka/simple_writer.go +++ b/kafka/simple_writer.go @@ -57,3 +57,22 @@ func (p *SimpleWriter) Write(ctx context.Context, topic string, data []byte, att return "", err } } +func (p *SimpleWriter) WriteWithKey(ctx context.Context, topic string, data []byte, key []byte, attributes map[string]string) (string, error) { + var binary = data + var err error + if p.Convert != nil { + binary, err = p.Convert(ctx, binary) + if err != nil { + return "", err + } + } + msg := kafka.Message{Value: binary} + if attributes != nil { + msg.Headers = MapToHeader(attributes) + } + if key != nil { + msg.Key = key + } + err = p.Writer.WriteMessages(ctx, msg) + return "", err +} diff --git a/kafka/writer.go b/kafka/writer.go index fc91f6a..c220aed 100644 --- a/kafka/writer.go +++ b/kafka/writer.go @@ -56,3 +56,22 @@ func (p *Writer) Write(ctx context.Context, data []byte, attributes map[string]s return "", err } } +func (p *Writer) WriteWithKey(ctx context.Context, data []byte, key []byte, attributes map[string]string) (string, error) { + var binary = data + var err error + if p.Convert != nil { + binary, err = p.Convert(ctx, data) + if err != nil { + return "", err + } + } + msg := kafka.Message{Value: binary} + if attributes != nil { + msg.Headers = MapToHeader(attributes) + } + if key != nil { + msg.Key = key + } + err = p.Writer.WriteMessages(ctx, msg) + return "", err +} diff --git a/sarama/simple_writer.go b/sarama/simple_writer.go index 7c20853..be614e9 100644 --- a/sarama/simple_writer.go +++ b/sarama/simple_writer.go @@ -58,3 +58,27 @@ func (p *SimpleWriter) Write(ctx context.Context, topic string, data []byte, mes return string(b), err } } +func (p *SimpleWriter) WriteWithKey(ctx context.Context, topic string, data []byte, key string, messageAttributes map[string]string) (string, error) { + var binary = data + var err error + if p.Convert != nil { + binary, err = p.Convert(ctx, data) + if err != nil { + return "", err + } + } + msg := sarama.ProducerMessage{Value: sarama.ByteEncoder(binary), Topic: topic} + if messageAttributes != nil { + msg.Headers = MapToHeader(messageAttributes) + } + m := make(map[string]interface{}) + if len(key) > 0 { + msg.Key = sarama.StringEncoder(key) + m[Key] = key + } + pt, o, err := p.SyncProducer.SendMessage(&msg) + m[Partition] = pt + m[Offset] = o + b, _ := json.Marshal(m) + return string(b), err +} diff --git a/sarama/writer.go b/sarama/writer.go index 2309601..7b69f65 100644 --- a/sarama/writer.go +++ b/sarama/writer.go @@ -148,3 +148,27 @@ func (p *Writer) Write(ctx context.Context, data []byte, messageAttributes map[s return string(b), err } } +func (p *Writer) WriteWithKey(ctx context.Context, data []byte, key string, messageAttributes map[string]string) (string, error) { + var binary = data + var err error + if p.Convert != nil { + binary, err = p.Convert(ctx, data) + if err != nil { + return "", err + } + } + msg := sarama.ProducerMessage{Value: sarama.ByteEncoder(binary), Topic: p.Topic} + if messageAttributes != nil { + msg.Headers = MapToHeader(messageAttributes) + } + m := make(map[string]interface{}) + if len(key) > 0 { + msg.Key = sarama.StringEncoder(key) + m[Key] = key + } + pt, o, err := p.SyncProducer.SendMessage(&msg) + m[Partition] = pt + m[Offset] = o + b, _ := json.Marshal(m) + return string(b), err +}