Skip to content

Commit

Permalink
Format log
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Oct 31, 2021
1 parent f49b21c commit 54cab08
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 56 deletions.
114 changes: 76 additions & 38 deletions confluent/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/core-go/mq"
"log"
"os"
"time"
)

Expand All @@ -14,10 +14,12 @@ type (
Consumer *kafka.Consumer
Topics []string
Convert func(context.Context, []byte) ([]byte, error)
LogError func(context.Context, string)
LogInfo func(context.Context, string)
}
)

func NewConsumerByConfig(c ConsumerConfig, options ...func(context.Context, []byte) ([]byte, error)) (*Consumer, error) {
func NewConsumerByConfig(c ConsumerConfig, logs ...func(context.Context, string)) (*Consumer, error) {
if c.Client.Retry != nil && c.Client.Retry.Retry1 > 0 {
durations := DurationsFromValue(*c.Client.Retry, "Retry", 9)
return NewConsumerByConfigAndRetryArray(c, durations)
Expand All @@ -27,34 +29,48 @@ func NewConsumerByConfig(c ConsumerConfig, options ...func(context.Context, []by
fmt.Printf("Failed to create Consumer: %s\n", err)
return nil, err
}
var convert func(context.Context, []byte) ([]byte, error)
if len(options) > 0 {
convert = options[0]
}
return &Consumer{
cs := &Consumer{
Consumer: consumer,
Topics: []string{c.Topic},
Convert: convert,
}, nil
}
if len(logs) >= 1 {
cs.LogError = logs[0]
}
if len(logs) >= 2 {
cs.LogInfo = logs[1]
}
return cs, nil
}
}
func NewConsumerByConfigMap(conf kafka.ConfigMap, topics []string, options ...func(context.Context, []byte) ([]byte, error)) (*Consumer, error) {
func NewConsumerByConfigMap(conf kafka.ConfigMap, topics []string, logs ...func(context.Context, string)) (*Consumer, error) {
consumer, err := kafka.NewConsumer(&conf)
if err != nil {
fmt.Printf("Failed to create Consumer: %s\n", err)
return nil, err
}
var convert func(context.Context, []byte) ([]byte, error)
if len(options) > 0 {
convert = options[0]
}
return &Consumer{
cs := &Consumer{
Consumer: consumer,
Topics: topics,
Convert: convert,
}, nil
}
if len(logs) >= 1 {
cs.LogError = logs[0]
}
if len(logs) >= 2 {
cs.LogInfo = logs[1]
}
return cs, nil
}
func NewConsumer(consumer *kafka.Consumer, topics []string, options ...func(context.Context, []byte) ([]byte, error)) *Consumer {
func NewConsumer(consumer *kafka.Consumer, topics []string, logs ...func(context.Context, string)) *Consumer {
c := &Consumer{Consumer: consumer, Topics: topics}
if len(logs) >= 1 {
c.LogError = logs[0]
}
if len(logs) >= 2 {
c.LogInfo = logs[1]
}
return c
}
func NewConsumerWithConvert(consumer *kafka.Consumer, topics []string, options ...func(context.Context, []byte) ([]byte, error)) *Consumer {
var convert func(context.Context, []byte) ([]byte, error)
if len(options) > 0 {
convert = options[0]
Expand All @@ -63,7 +79,12 @@ func NewConsumer(consumer *kafka.Consumer, topics []string, options ...func(cont
}
func NewConsumerByConfigAndRetries(c ConsumerConfig, convert func(context.Context, []byte) ([]byte, error), retries ...time.Duration) (*Consumer, error) {
if len(retries) == 0 {
return NewConsumerByConfig(c, convert)
cs, err := NewConsumerByConfig(c)
if err != nil {
return cs, err
}
cs.Convert = convert
return cs, nil
} else {
return NewConsumerByConfigAndRetryArray(c, retries)
}
Expand Down Expand Up @@ -99,43 +120,60 @@ func NewConsumerByConfigAndRetryArray(c ConsumerConfig, retries []time.Duration,
}, nil
}

func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, []byte, map[string]string, error) error) {
Consume(ctx, c.Consumer, c.Topics, handle, c.Convert)
}

func Consume(ctx context.Context, consumer *kafka.Consumer, topics []string, handle func(context.Context, []byte, map[string]string, error) error, convert func(context.Context, []byte)([]byte, error)) {
defer consumer.Close()
func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, *mq.Message, error) error) {
defer c.Consumer.Close()

err := consumer.SubscribeTopics(topics, nil)
err := c.Consumer.SubscribeTopics(c.Topics, nil)
if err != nil {
fmt.Printf("%% Subscribe Topic err: %v\n", err)
if c.LogError != nil {
c.LogError(ctx, fmt.Sprintf("Subscribe Topic err: %v", err))
}
return
}

run := true
for run == true {
ev := consumer.Poll(0)
ev := c.Consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value))
if c.LogInfo != nil {
c.LogInfo(ctx, fmt.Sprintf("Message on %s: %s", e.TopicPartition, string(e.Value)))
}
h := HeaderToMap(e.Headers)
if convert == nil {
handle(ctx, e.Value, h, nil)
message := &mq.Message{
Id: string(e.Key),
Data: e.Value,
Attributes: h,
Timestamp: &e.Timestamp,
Raw: e,
}
if c.Convert == nil {
handle(ctx, message, nil)
} else {
data, err := convert(ctx, e.Value)
handle(ctx, data, h, err)
data, err := c.Convert(ctx, e.Value)
if err != nil {
handle(ctx, message, err)
} else {
message.Data = data
handle(ctx, message, err)
}

}
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
if c.LogInfo != nil {
c.LogInfo(ctx, fmt.Sprintf("Reached %v", e))

}
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
handle(ctx, nil, nil, e)
if c.LogError != nil {
c.LogError(ctx, fmt.Sprintf("Error: %v", e))
}
handle(ctx, nil, e)
run = false
default:
}
}
}
func HeaderToMap(headers []kafka.Header) map[string]string {
func HeaderToMap(headers []kafka.Header) map[string]string {
attributes := make(map[string]string, 0)
for _, v := range headers {
attributes[v.Key] = v.String()
Expand Down
89 changes: 71 additions & 18 deletions confluent/simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,49 +12,102 @@ type (
Consumer *kafka.Consumer
Topics []string
Convert func(context.Context, []byte) ([]byte, error)
LogError func(context.Context, string)
LogInfo func(context.Context, string)
}
)
func NewSimpleConsumerByConfigMap(conf kafka.ConfigMap, topics []string, options ...func(context.Context, []byte) ([]byte, error)) (*SimpleConsumer, error) {
func NewSimpleConsumerByConfigMap(conf kafka.ConfigMap, topics []string, logs ...func(context.Context, string)) (*SimpleConsumer, error) {
consumer, err := kafka.NewConsumer(&conf)
if err != nil {
fmt.Printf("Failed to create Consumer: %s\n", err)
return nil, err
}
var convert func(context.Context, []byte) ([]byte, error)
if len(options) > 0 {
convert = options[0]
}
return &SimpleConsumer{
cs := &SimpleConsumer{
Consumer: consumer,
Topics: topics,
Convert: convert,
}, nil
}
if len(logs) >= 1 {
cs.LogError = logs[0]
}
if len(logs) >= 2 {
cs.LogInfo = logs[1]
}
return cs, nil
}
func NewSimpleConsumerByConfig(c ConsumerConfig, options ...func(context.Context, []byte) ([]byte, error)) (*SimpleConsumer, error) {
func NewSimpleConsumerByConfig(c ConsumerConfig, logs ...func(context.Context, string)) (*SimpleConsumer, error) {
consumer, err := NewKafkaConsumerByConfig(c)
if err != nil {
fmt.Printf("Failed to create Consumer: %s\n", err)
return nil, err
}
var convert func(context.Context, []byte) ([]byte, error)
if len(options) > 0 {
convert = options[0]
}
return &SimpleConsumer{
cs := &SimpleConsumer{
Consumer: consumer,
Topics: []string{c.Topic},
Convert: convert,
}, nil
}
if len(logs) >= 1 {
cs.LogError = logs[0]
}
if len(logs) >= 2 {
cs.LogInfo = logs[1]
}
return cs, nil
}
func NewSimpleConsumer(consumer *kafka.Consumer, topics []string, logs ...func(context.Context, string)) *SimpleConsumer {
c := &SimpleConsumer{Consumer: consumer, Topics: topics}
if len(logs) >= 1 {
c.LogError = logs[0]
}
if len(logs) >= 2 {
c.LogInfo = logs[1]
}
return c
}
func NewSimpleConsumer(consumer *kafka.Consumer, topics []string, options ...func(context.Context, []byte) ([]byte, error)) *SimpleConsumer {
func NewSimpleConsumerWithConvert(consumer *kafka.Consumer, topics []string, options ...func(context.Context, []byte) ([]byte, error)) *SimpleConsumer {
var convert func(context.Context, []byte) ([]byte, error)
if len(options) > 0 {
convert = options[0]
}
return &SimpleConsumer{Consumer: consumer, Topics: topics, Convert: convert}
}
func (c *SimpleConsumer) Consume(ctx context.Context, handle func(context.Context, []byte, map[string]string, error) error) {
Consume(ctx, c.Consumer, c.Topics, handle, c.Convert)
defer c.Consumer.Close()

err := c.Consumer.SubscribeTopics(c.Topics, nil)
if err != nil {
if c.LogError != nil {
c.LogError(ctx, fmt.Sprintf("Subscribe Topic err: %v", err))
}
return
}
run := true
for run == true {
ev := c.Consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
if c.LogInfo != nil {
c.LogInfo(ctx, fmt.Sprintf("Message on %s: %s", e.TopicPartition, string(e.Value)))
}
h := HeaderToMap(e.Headers)
if c.Convert == nil {
handle(ctx, e.Value, h, nil)
} else {
data, err := c.Convert(ctx, e.Value)
handle(ctx, data, h, err)
}
case kafka.PartitionEOF:
if c.LogInfo != nil {
c.LogInfo(ctx, fmt.Sprintf("Reached %v", e))

}
case kafka.Error:
if c.LogError != nil {
c.LogError(ctx, fmt.Sprintf("Error: %v", e))
}
handle(ctx, nil, nil, e)
run = false
default:
}
}
}

func NewKafkaConsumerByConfig(c ConsumerConfig) (*kafka.Consumer, error) {
Expand Down

0 comments on commit 54cab08

Please sign in to comment.