Skip to content

Commit

Permalink
separate fields into two separate arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
DStrand1 committed May 16, 2024
1 parent 17f4cb4 commit da0ec0f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 58 deletions.
10 changes: 5 additions & 5 deletions plugins/processors/parser/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## The name of the fields whose value will be parsed.
parse_fields = ["message"]

## Fields to base64 decode.
## These fields do not need to be specified in parse_fields.
## Fields specified here will have base64 decode applied to them.
# parse_fields_base64 = []

## The name of the tags whose value will be parsed.
# parse_tags = []

Expand All @@ -41,11 +46,6 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

## Fields to base64 decode
## This list of fields must be a subset of parse_fields. Fields specified
## here will have base64 decode applied to them.
# base64_fields = []
```

## Example
Expand Down
91 changes: 43 additions & 48 deletions plugins/processors/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/base64"
gobin "encoding/binary"
"fmt"
"slices"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -21,8 +20,8 @@ type Parser struct {
DropOriginal bool `toml:"drop_original"`
Merge string `toml:"merge"`
ParseFields []string `toml:"parse_fields"`
Base64Fields []string `toml:"parse_fields_base64"`
ParseTags []string `toml:"parse_tags"`
Base64Fields []string `toml:"base64_fields"`
Log telegraf.Logger `toml:"-"`
parser telegraf.Parser
}
Expand All @@ -34,20 +33,6 @@ func (p *Parser) Init() error {
return fmt.Errorf("unrecognized merge value: %s", p.Merge)
}

// Validate that Base64Fields is a subset of ParseFields
for _, field64 := range p.Base64Fields {
var found bool
for _, field := range p.ParseFields {
if field64 == field {
found = true
break
}
}
if !found {
return fmt.Errorf("unrecognized base64 parse field '%s' found, should also be included in parse_fields", field64)
}
}

return nil
}

Expand All @@ -71,39 +56,12 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {

// parse fields
for _, key := range p.ParseFields {
b64 := slices.Contains(p.Base64Fields, key)
for _, field := range metric.FieldList() {
if field.Key != key {
continue
}
value, err := p.toBytes(field.Value, b64)
if err != nil {
p.Log.Errorf("could not convert field %s: %v; skipping", key, err)
continue
}
fromFieldMetric, err := p.parser.Parse(value)
if err != nil {
p.Log.Errorf("could not parse field %s: %v", key, err)
continue
}

for _, m := range fromFieldMetric {
// The parser get the parent plugin's name as
// default measurement name. Thus, in case the
// parsed metric does not provide a name itself,
// the parser will return 'parser' as we are in
// processors.parser. In those cases we want to
// keep the original metric name.
if m.Name() == "" || m.Name() == "parser" {
m.SetName(metric.Name())
}
}
newMetrics = append(newMetrics, p.parseField(key, metric, false)...)
}

// multiple parsed fields shouldn't create multiple
// metrics so we'll merge tags/fields down into one
// prior to returning.
newMetrics = append(newMetrics, fromFieldMetric...)
}
// parse base64 fields
for _, key := range p.Base64Fields {
newMetrics = append(newMetrics, p.parseField(key, metric, true)...)
}

// parse tags
Expand Down Expand Up @@ -145,6 +103,43 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
return results
}

func (p *Parser) parseField(key string, metric telegraf.Metric, b64 bool) []telegraf.Metric {
newMetrics := []telegraf.Metric{}
for _, field := range metric.FieldList() {
if field.Key != key {
continue
}
value, err := p.toBytes(field.Value, b64)
if err != nil {
p.Log.Errorf("could not convert field %s: %v; skipping", key, err)
continue
}
fromFieldMetric, err := p.parser.Parse(value)
if err != nil {
p.Log.Errorf("could not parse field %s: %v", key, err)
continue
}

for _, m := range fromFieldMetric {
// The parser get the parent plugin's name as
// default measurement name. Thus, in case the
// parsed metric does not provide a name itself,
// the parser will return 'parser' as we are in
// processors.parser. In those cases we want to
// keep the original metric name.
if m.Name() == "" || m.Name() == "parser" {
m.SetName(metric.Name())
}
}

// multiple parsed fields shouldn't create multiple
// metrics so we'll merge tags/fields down into one
// prior to returning.
newMetrics = append(newMetrics, fromFieldMetric...)
}
return newMetrics
}

func merge(base telegraf.Metric, metrics []telegraf.Metric) telegraf.Metric {
for _, metric := range metrics {
for _, field := range metric.FieldList() {
Expand Down
10 changes: 5 additions & 5 deletions plugins/processors/parser/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
## The name of the fields whose value will be parsed.
parse_fields = ["message"]

## Fields to base64 decode.
## These fields do not need to be specified in parse_fields.
## Fields specified here will have base64 decode applied to them.
# parse_fields_base64 = []

## The name of the tags whose value will be parsed.
# parse_tags = []

Expand All @@ -24,8 +29,3 @@
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

## Fields to base64 decode
## This list of fields must be a subset of parse_fields. Fields specified
## here will have base64 decode applied to them.
# base64_fields = []

0 comments on commit da0ec0f

Please sign in to comment.