diff --git a/confluent/consumer.go b/confluent/consumer.go index a98c711..dad2c97 100644 --- a/confluent/consumer.go +++ b/confluent/consumer.go @@ -4,8 +4,8 @@ import ( "context" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/core-go/mq" "log" - "os" "time" ) @@ -14,10 +14,12 @@ type ( Consumer *kafka.Consumer Topics []string Convert func(context.Context, []byte) ([]byte, error) + LogError func(context.Context, string) + LogInfo func(context.Context, string) } ) -func NewConsumerByConfig(c ConsumerConfig, options ...func(context.Context, []byte) ([]byte, error)) (*Consumer, error) { +func NewConsumerByConfig(c ConsumerConfig, logs ...func(context.Context, string)) (*Consumer, error) { if c.Client.Retry != nil && c.Client.Retry.Retry1 > 0 { durations := DurationsFromValue(*c.Client.Retry, "Retry", 9) return NewConsumerByConfigAndRetryArray(c, durations) @@ -27,34 +29,48 @@ func NewConsumerByConfig(c ConsumerConfig, options ...func(context.Context, []by fmt.Printf("Failed to create Consumer: %s\n", err) return nil, err } - var convert func(context.Context, []byte) ([]byte, error) - if len(options) > 0 { - convert = options[0] - } - return &Consumer{ + cs := &Consumer{ Consumer: consumer, Topics: []string{c.Topic}, - Convert: convert, - }, nil + } + if len(logs) >= 1 { + cs.LogError = logs[0] + } + if len(logs) >= 2 { + cs.LogInfo = logs[1] + } + return cs, nil } } -func NewConsumerByConfigMap(conf kafka.ConfigMap, topics []string, options ...func(context.Context, []byte) ([]byte, error)) (*Consumer, error) { +func NewConsumerByConfigMap(conf kafka.ConfigMap, topics []string, logs ...func(context.Context, string)) (*Consumer, error) { consumer, err := kafka.NewConsumer(&conf) if err != nil { fmt.Printf("Failed to create Consumer: %s\n", err) return nil, err } - var convert func(context.Context, []byte) ([]byte, error) - if len(options) > 0 { - convert = options[0] - } - return &Consumer{ + cs := &Consumer{ Consumer: consumer, Topics: topics, - Convert: convert, - }, nil + } + if len(logs) >= 1 { + cs.LogError = logs[0] + } + if len(logs) >= 2 { + cs.LogInfo = logs[1] + } + return cs, nil } -func NewConsumer(consumer *kafka.Consumer, topics []string, options ...func(context.Context, []byte) ([]byte, error)) *Consumer { +func NewConsumer(consumer *kafka.Consumer, topics []string, logs ...func(context.Context, string)) *Consumer { + c := &Consumer{Consumer: consumer, Topics: topics} + if len(logs) >= 1 { + c.LogError = logs[0] + } + if len(logs) >= 2 { + c.LogInfo = logs[1] + } + return c +} +func NewConsumerWithConvert(consumer *kafka.Consumer, topics []string, options ...func(context.Context, []byte) ([]byte, error)) *Consumer { var convert func(context.Context, []byte) ([]byte, error) if len(options) > 0 { convert = options[0] @@ -63,7 +79,12 @@ func NewConsumer(consumer *kafka.Consumer, topics []string, options ...func(cont } func NewConsumerByConfigAndRetries(c ConsumerConfig, convert func(context.Context, []byte) ([]byte, error), retries ...time.Duration) (*Consumer, error) { if len(retries) == 0 { - return NewConsumerByConfig(c, convert) + cs, err := NewConsumerByConfig(c) + if err != nil { + return cs, err + } + cs.Convert = convert + return cs, nil } else { return NewConsumerByConfigAndRetryArray(c, retries) } @@ -99,43 +120,60 @@ func NewConsumerByConfigAndRetryArray(c ConsumerConfig, retries []time.Duration, }, nil } -func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, []byte, map[string]string, error) error) { - Consume(ctx, c.Consumer, c.Topics, handle, c.Convert) -} - -func Consume(ctx context.Context, consumer *kafka.Consumer, topics []string, handle func(context.Context, []byte, map[string]string, error) error, convert func(context.Context, []byte)([]byte, error)) { - defer consumer.Close() +func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, *mq.Message, error) error) { + defer c.Consumer.Close() - err := consumer.SubscribeTopics(topics, nil) + err := c.Consumer.SubscribeTopics(c.Topics, nil) if err != nil { - fmt.Printf("%% Subscribe Topic err: %v\n", err) + if c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("Subscribe Topic err: %v", err)) + } return } - run := true for run == true { - ev := consumer.Poll(0) + ev := c.Consumer.Poll(0) switch e := ev.(type) { case *kafka.Message: - fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) + if c.LogInfo != nil { + c.LogInfo(ctx, fmt.Sprintf("Message on %s: %s", e.TopicPartition, string(e.Value))) + } h := HeaderToMap(e.Headers) - if convert == nil { - handle(ctx, e.Value, h, nil) + message := &mq.Message{ + Id: string(e.Key), + Data: e.Value, + Attributes: h, + Timestamp: &e.Timestamp, + Raw: e, + } + if c.Convert == nil { + handle(ctx, message, nil) } else { - data, err := convert(ctx, e.Value) - handle(ctx, data, h, err) + data, err := c.Convert(ctx, e.Value) + if err != nil { + handle(ctx, message, err) + } else { + message.Data = data + handle(ctx, message, err) + } + } case kafka.PartitionEOF: - fmt.Printf("%% Reached %v\n", e) + if c.LogInfo != nil { + c.LogInfo(ctx, fmt.Sprintf("Reached %v", e)) + + } case kafka.Error: - fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) - handle(ctx, nil, nil, e) + if c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("Error: %v", e)) + } + handle(ctx, nil, e) run = false default: } } } -func HeaderToMap(headers []kafka.Header) map[string]string { +func HeaderToMap(headers []kafka.Header) map[string]string { attributes := make(map[string]string, 0) for _, v := range headers { attributes[v.Key] = v.String() diff --git a/confluent/simple_consumer.go b/confluent/simple_consumer.go index 54871e5..f1d069f 100644 --- a/confluent/simple_consumer.go +++ b/confluent/simple_consumer.go @@ -12,41 +12,57 @@ type ( Consumer *kafka.Consumer Topics []string Convert func(context.Context, []byte) ([]byte, error) + LogError func(context.Context, string) + LogInfo func(context.Context, string) } ) -func NewSimpleConsumerByConfigMap(conf kafka.ConfigMap, topics []string, options ...func(context.Context, []byte) ([]byte, error)) (*SimpleConsumer, error) { +func NewSimpleConsumerByConfigMap(conf kafka.ConfigMap, topics []string, logs ...func(context.Context, string)) (*SimpleConsumer, error) { consumer, err := kafka.NewConsumer(&conf) if err != nil { fmt.Printf("Failed to create Consumer: %s\n", err) return nil, err } - var convert func(context.Context, []byte) ([]byte, error) - if len(options) > 0 { - convert = options[0] - } - return &SimpleConsumer{ + cs := &SimpleConsumer{ Consumer: consumer, Topics: topics, - Convert: convert, - }, nil + } + if len(logs) >= 1 { + cs.LogError = logs[0] + } + if len(logs) >= 2 { + cs.LogInfo = logs[1] + } + return cs, nil } -func NewSimpleConsumerByConfig(c ConsumerConfig, options ...func(context.Context, []byte) ([]byte, error)) (*SimpleConsumer, error) { +func NewSimpleConsumerByConfig(c ConsumerConfig, logs ...func(context.Context, string)) (*SimpleConsumer, error) { consumer, err := NewKafkaConsumerByConfig(c) if err != nil { fmt.Printf("Failed to create Consumer: %s\n", err) return nil, err } - var convert func(context.Context, []byte) ([]byte, error) - if len(options) > 0 { - convert = options[0] - } - return &SimpleConsumer{ + cs := &SimpleConsumer{ Consumer: consumer, Topics: []string{c.Topic}, - Convert: convert, - }, nil + } + if len(logs) >= 1 { + cs.LogError = logs[0] + } + if len(logs) >= 2 { + cs.LogInfo = logs[1] + } + return cs, nil +} +func NewSimpleConsumer(consumer *kafka.Consumer, topics []string, logs ...func(context.Context, string)) *SimpleConsumer { + c := &SimpleConsumer{Consumer: consumer, Topics: topics} + if len(logs) >= 1 { + c.LogError = logs[0] + } + if len(logs) >= 2 { + c.LogInfo = logs[1] + } + return c } -func NewSimpleConsumer(consumer *kafka.Consumer, topics []string, options ...func(context.Context, []byte) ([]byte, error)) *SimpleConsumer { +func NewSimpleConsumerWithConvert(consumer *kafka.Consumer, topics []string, options ...func(context.Context, []byte) ([]byte, error)) *SimpleConsumer { var convert func(context.Context, []byte) ([]byte, error) if len(options) > 0 { convert = options[0] @@ -54,7 +70,44 @@ func NewSimpleConsumer(consumer *kafka.Consumer, topics []string, options ...fun return &SimpleConsumer{Consumer: consumer, Topics: topics, Convert: convert} } func (c *SimpleConsumer) Consume(ctx context.Context, handle func(context.Context, []byte, map[string]string, error) error) { - Consume(ctx, c.Consumer, c.Topics, handle, c.Convert) + defer c.Consumer.Close() + + err := c.Consumer.SubscribeTopics(c.Topics, nil) + if err != nil { + if c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("Subscribe Topic err: %v", err)) + } + return + } + run := true + for run == true { + ev := c.Consumer.Poll(0) + switch e := ev.(type) { + case *kafka.Message: + if c.LogInfo != nil { + c.LogInfo(ctx, fmt.Sprintf("Message on %s: %s", e.TopicPartition, string(e.Value))) + } + h := HeaderToMap(e.Headers) + if c.Convert == nil { + handle(ctx, e.Value, h, nil) + } else { + data, err := c.Convert(ctx, e.Value) + handle(ctx, data, h, err) + } + case kafka.PartitionEOF: + if c.LogInfo != nil { + c.LogInfo(ctx, fmt.Sprintf("Reached %v", e)) + + } + case kafka.Error: + if c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("Error: %v", e)) + } + handle(ctx, nil, nil, e) + run = false + default: + } + } } func NewKafkaConsumerByConfig(c ConsumerConfig) (*kafka.Consumer, error) {