Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Oct 31, 2021
1 parent ca77384 commit 096c783
Show file tree
Hide file tree
Showing 11 changed files with 338 additions and 57 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Support these message queues:
- Amazon Simple Queue Service (SQS) at [sqs](https://github.com/core-go/mq/tree/main/sqs)
- Google Cloud Pub/Sub at [pubsub](https://github.com/core-go/mq/tree/main/pubsub)
- Kafka: at [segmentio/kafka-go](https://github.com/core-go/mq/tree/main/kafka) and [Shopify/sarama](https://github.com/core-go/mq/tree/main/sarama)
- Kafka: at [segmentio/kafka-go](https://github.com/core-go/mq/tree/main/kafka), [Shopify/sarama](https://github.com/core-go/mq/tree/main/sarama) and [confluent](https://github.com/core-go/mq/tree/main/confluent)
- NATS at [nats](https://github.com/core-go/mq/tree/main/nats)
- Active MQ at [amq](https://github.com/core-go/mq/tree/main/amq)
- RabbitMQ at [rabbitmq](https://github.com/core-go/mq/tree/main/rabbitmq)
Expand All @@ -24,3 +24,8 @@ Import:
```go
import "github.com/core-go/mq"
```

Build for confluent:
```go
go build -buildmode=exe main.go
```
16 changes: 16 additions & 0 deletions confluent/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ func NewConsumerByConfig(c ConsumerConfig, options ...func(context.Context, []by
}, nil
}
}
func NewConsumerByConfigMap(conf kafka.ConfigMap, topics []string, options ...func(context.Context, []byte) ([]byte, error)) (*Consumer, error) {
consumer, err := kafka.NewConsumer(&conf)
if err != nil {
fmt.Printf("Failed to create Consumer: %s\n", err)
return nil, err
}
var convert func(context.Context, []byte) ([]byte, error)
if len(options) > 0 {
convert = options[0]
}
return &Consumer{
Consumer: consumer,
Topics: topics,
Convert: convert,
}, nil
}
func NewConsumer(consumer *kafka.Consumer, topics []string, options ...func(context.Context, []byte) ([]byte, error)) *Consumer {
var convert func(context.Context, []byte) ([]byte, error)
if len(options) > 0 {
Expand Down
116 changes: 100 additions & 16 deletions confluent/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,84 @@ type (
Producer struct {
Producer *kafka.Producer
Topic string
Timeout int
Convert func(context.Context, []byte) ([]byte, error)
Generate func() string
}
)

func NewProducerByConfig(c ProducerConfig, options ...func(context.Context, []byte) ([]byte, error)) (*Producer, error) {
func NewProducerByConfigMap(c kafka.ConfigMap, topic string, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*Producer, error) {
p, err := kafka.NewProducer(&c)
if err != nil {
fmt.Printf("Failed to create Producer: %s\n", err)
return nil, err
}
var generate func() string
if len(options) > 0 {
generate = options[0]
}
if timeout <= 0 {
timeout = 100
}
pd := &Producer{
Producer: p,
Topic: topic,
Timeout: timeout,
Convert: convert,
Generate: generate,
}
return pd, nil
}
func NewProducerByConfig(c ProducerConfig, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*Producer, error) {
p, err := NewKafkaProducerByConfig(c)
if err != nil {
fmt.Printf("Failed to create Producer: %s\n", err)
return nil, err
}
var convert func(context.Context, []byte) ([]byte, error)
var generate func() string
if len(options) > 0 {
convert = options[0]
generate = options[0]
}
timeout := c.Timeout
if timeout <= 0 {
timeout = 100
}
return &Producer{
pd := &Producer{
Producer: p,
Topic: c.Topic,
Timeout: timeout,
Convert: convert,
}, nil
Generate: generate,
}
return pd, nil
}
func NewProducer(producer *kafka.Producer, topic string, options ...func(context.Context, []byte)([]byte, error)) *Producer {
var convert func(context.Context, []byte) ([]byte, error)
func NewProducer(producer *kafka.Producer, topic string, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) *Producer {
var generate func() string
if len(options) > 0 {
convert = options[0]
generate = options[0]
}
if timeout <= 0 {
timeout = 100
}
return &Producer{Producer: producer, Topic: topic, Convert: convert}
return &Producer{Producer: producer, Topic: topic, Timeout: timeout, Convert: convert, Generate: generate}
}
func NewProducerByConfigAndRetries(c ProducerConfig, convert func(context.Context, []byte)([]byte, error), retries ...time.Duration) (*Producer, error) {
func NewProducerByConfigAndRetries(c ProducerConfig, convert func(context.Context, []byte) ([]byte, error), retries ...time.Duration) (*Producer, error) {
if len(retries) == 0 {
return NewProducerByConfig(c, convert)
} else {
return NewProducerWithRetryArray(c, retries, convert)
}
}

func NewProducerWithRetryArray(c ProducerConfig, retries []time.Duration, options...func(context.Context, []byte)([]byte, error)) (*Producer, error) {
p, err := NewProducerByConfig(c, options...)
func NewProducerWithRetryArray(c ProducerConfig, retries []time.Duration, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*Producer, error) {
p, err := NewProducerByConfig(c, convert, options...)
if err == nil {
return p, nil
}

i := 0
err = Retry(retries, func() (err error) {
i = i + 1
p2, er2 := NewProducerByConfig(c)
p2, er2 := NewProducerByConfig(c, convert, options...)
p = p2
if er2 == nil {
log.Println(fmt.Sprintf("create new Producer successfully after %d retries", i))
Expand Down Expand Up @@ -84,6 +117,57 @@ func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes m
if messageAttributes != nil {
msg.Headers = MapToHeader(messageAttributes)
}

return Produce(p.Producer, &msg)
if p.Generate != nil {
id := p.Generate()
msg.Key = []byte(id)
}
deliveryChan := make(chan kafka.Event)
defer close(deliveryChan)
err = p.Producer.Produce(&msg, deliveryChan)
if err != nil {
return msg.String(), err
}
p.Producer.Flush(p.Timeout)
e := <-deliveryChan
switch m := e.(type) {
case *kafka.Message:
return msg.String(), m.TopicPartition.Error
case kafka.Error:
return "", m
}
return msg.String(), nil
}
func (p *Producer) ProduceWithKey(ctx context.Context, data []byte, key []byte, messageAttributes map[string]string) (string, error) {
var binary = data
var err error
if p.Convert != nil {
binary, err = p.Convert(ctx, data)
if err != nil {
return "", err
}
}
msg := kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &p.Topic, Partition: kafka.PartitionAny},
Value: binary}
if messageAttributes != nil {
msg.Headers = MapToHeader(messageAttributes)
}
if key != nil {
msg.Key = key
}
deliveryChan := make(chan kafka.Event)
defer close(deliveryChan)
err = p.Producer.Produce(&msg, deliveryChan)
if err != nil {
return msg.String(), err
}
p.Producer.Flush(p.Timeout)
e := <-deliveryChan
switch m := e.(type) {
case *kafka.Message:
return msg.String(), m.TopicPartition.Error
case kafka.Error:
return "", m
}
return msg.String(), nil
}
3 changes: 2 additions & 1 deletion confluent/producer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package kafka

type ProducerConfig struct {
Brokers []string `mapstructure:"brokers" json:"brokers,omitempty" gorm:"column:brokers" bson:"brokers,omitempty" dynamodbav:"brokers,omitempty" firestore:"brokers,omitempty"`
Topic string `mapstructure:"Topic" json:"Topic,omitempty" gorm:"column:Topic" bson:"Topic,omitempty" dynamodbav:"Topic,omitempty" firestore:"Topic,omitempty"`
Topic string `mapstructure:"topic" json:"topic,omitempty" gorm:"column:topic" bson:"topic,omitempty" dynamodbav:"topic,omitempty" firestore:"topic,omitempty"`
Timeout int `mapstructure:"timeout" json:"timeout,omitempty" gorm:"column:timeout" bson:"timeout,omitempty" dynamodbav:"timeout,omitempty" firestore:"timeout,omitempty"`
Client ClientConfig `mapstructure:"client" json:"client,omitempty" gorm:"column:client" bson:"client,omitempty" dynamodbav:"client,omitempty" firestore:"client,omitempty"`
MaxOpenRequests *int `mapstructure:"max_open_requests" json:"maxOpenRequests,omitempty" gorm:"column:maxopenrequests" bson:"maxOpenRequests,omitempty" dynamodbav:"maxOpenRequests,omitempty" firestore:"maxOpenRequests,omitempty"`
RequiredAcks *int16 `mapstructure:"required_acks" json:"requiredAcks,omitempty" gorm:"column:requiredacks" bson:"requiredAcks,omitempty" dynamodbav:"requiredAcks,omitempty" firestore:"requiredAcks,omitempty"`
Expand Down
17 changes: 16 additions & 1 deletion confluent/simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,22 @@ type (
Convert func(context.Context, []byte) ([]byte, error)
}
)

func NewSimpleConsumerByConfigMap(conf kafka.ConfigMap, topics []string, options ...func(context.Context, []byte) ([]byte, error)) (*SimpleConsumer, error) {
consumer, err := kafka.NewConsumer(&conf)
if err != nil {
fmt.Printf("Failed to create Consumer: %s\n", err)
return nil, err
}
var convert func(context.Context, []byte) ([]byte, error)
if len(options) > 0 {
convert = options[0]
}
return &SimpleConsumer{
Consumer: consumer,
Topics: topics,
Convert: convert,
}, nil
}
func NewSimpleConsumerByConfig(c ConsumerConfig, options ...func(context.Context, []byte) ([]byte, error)) (*SimpleConsumer, error) {
consumer, err := NewKafkaConsumerByConfig(c)
if err != nil {
Expand Down
128 changes: 93 additions & 35 deletions confluent/simple_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,62 @@ import (
type (
SimpleProducer struct {
Producer *kafka.Producer
Convert func(context.Context, []byte)([]byte, error)
Timeout int
Convert func(context.Context, []byte) ([]byte, error)
Generate func() string
}
)

func NewSimpleProducerByConfig(c ProducerConfig, options...func(context.Context, []byte)([]byte, error)) (*SimpleProducer, error) {
func NewSimpleProducerByConfigMap(c kafka.ConfigMap, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*SimpleProducer, error) {
p, err := kafka.NewProducer(&c)
if err != nil {
fmt.Printf("Failed to create Producer: %s\n", err)
return nil, err
}
var generate func() string
if len(options) > 0 {
generate = options[0]
}
if timeout <= 0 {
timeout = 100
}
pd := &SimpleProducer{
Producer: p,
Timeout: timeout,
Convert: convert,
Generate: generate,
}
return pd, nil
}
func NewSimpleProducerByConfig(c ProducerConfig, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*SimpleProducer, error) {
p, err := NewKafkaProducerByConfig(c)
if err != nil {
fmt.Printf("Failed to create Producer: %s\n", err)
return nil, err
}
var convert func(context.Context, []byte)([]byte, error)
var generate func() string
if len(options) > 0 {
convert = options[0]
generate = options[0]
}
if timeout <= 0 {
timeout = 100
}
return &SimpleProducer{
pd := &SimpleProducer{
Producer: p,
Convert: convert,
}, nil
Timeout: timeout,
Convert: convert,
Generate: generate,
}
return pd, nil
}
func NewSimpleProducer(producer *kafka.Producer, options...func(context.Context, []byte)([]byte, error)) *SimpleProducer {
var convert func(context.Context, []byte)([]byte, error)
func NewSimpleProducer(producer *kafka.Producer, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) *SimpleProducer {
var generate func() string
if len(options) > 0 {
convert = options[0]
generate = options[0]
}
return &SimpleProducer{Producer: producer, Convert: convert}
if timeout <= 0 {
timeout = 100
}
return &SimpleProducer{Producer: producer, Timeout: timeout, Convert: convert, Generate: generate}
}
func NewKafkaProducerByConfig(c ProducerConfig) (*kafka.Producer, error) {
conf := kafka.ConfigMap{
Expand Down Expand Up @@ -84,33 +115,60 @@ func (p *SimpleProducer) Produce(ctx context.Context, topic string, data []byte,
if messageAttributes != nil {
msg.Headers = MapToHeader(messageAttributes)
}
return Produce(p.Producer, &msg)
}

func Produce(producer *kafka.Producer, msg *kafka.Message) (string, error) {
deliveryChan := make(chan kafka.Event, 10000)
err := producer.Produce(msg, deliveryChan)
if p.Generate != nil {
id := p.Generate()
msg.Key = []byte(id)
}
deliveryChan := make(chan kafka.Event)
defer close(deliveryChan)
err = p.Producer.Produce(&msg, deliveryChan)
if err != nil {
fmt.Printf("Failed to produce msg: %s\n", err)
return "", err
}
go func() {
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Successfully produced record to Topic %s partition [%d] @ offset %v\n",
*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
}
}
return msg.String(), err
}
p.Producer.Flush(p.Timeout)
e := <-deliveryChan
switch m := e.(type) {
case *kafka.Message:
return msg.String(), m.TopicPartition.Error
case kafka.Error:
return "", m
}
return msg.String(), nil
}
func (p *SimpleProducer) ProduceWithKey(ctx context.Context, topic string, data []byte, key []byte, messageAttributes map[string]string) (string, error) {
var binary = data
var err error
if p.Convert != nil {
binary, err = p.Convert(ctx, data)
if err != nil {
return "", err
}
}()

}
msg := kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: binary}
if messageAttributes != nil {
msg.Headers = MapToHeader(messageAttributes)
}
if key != nil {
msg.Key = key
}
deliveryChan := make(chan kafka.Event)
defer close(deliveryChan)
err = p.Producer.Produce(&msg, deliveryChan)
if err != nil {
return msg.String(), err
}
p.Producer.Flush(p.Timeout)
e := <-deliveryChan
switch m := e.(type) {
case *kafka.Message:
return msg.String(), m.TopicPartition.Error
case kafka.Error:
return "", m
}
return msg.String(), nil
}

func MapToHeader(messageAttributes map[string]string) []kafka.Header {
headers := make([]kafka.Header, 0)
for k, v := range messageAttributes {
Expand Down
Loading

0 comments on commit 096c783

Please sign in to comment.