forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcarbon2.go
119 lines (101 loc) · 2.81 KB
/
carbon2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package carbon2
import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/serializers"
)
const sanitizedChars = "!@#$%^&*()+`'\"[]{};<>,?/\\|="
type Serializer struct {
Format string `toml:"carbon2_format"`
SanitizeReplaceChar string `toml:"carbon2_sanitize_replace_char"`
Log telegraf.Logger `toml:"-"`
sanitizeReplacer *strings.Replacer
template string
}
func (s *Serializer) Init() error {
if s.SanitizeReplaceChar == "" {
s.SanitizeReplaceChar = ":"
}
if len(s.SanitizeReplaceChar) > 1 {
return errors.New("sanitize replace char has to be a singular character")
}
// Create replacer to replacing all characters requiring sanitization with the user-specified replacement
pairs := make([]string, 0, 2*len(sanitizedChars))
for _, c := range sanitizedChars {
pairs = append(pairs, string(c), s.SanitizeReplaceChar)
}
s.sanitizeReplacer = strings.NewReplacer(pairs...)
switch s.Format {
case "", "field_separate":
s.Format = "field_separate"
s.template = "metric=%s field=%s "
case "metric_includes_field":
s.template = "metric=%s_%s "
default:
return fmt.Errorf("unknown carbon2 format: %s", s.Format)
}
return nil
}
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.createObject(metric), nil
}
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
for _, metric := range metrics {
batch.Write(s.createObject(metric))
}
return batch.Bytes(), nil
}
func (s *Serializer) createObject(metric telegraf.Metric) []byte {
var m bytes.Buffer
for fieldName, fieldValue := range metric.Fields() {
if _, ok := fieldValue.(string); ok {
continue
}
name := s.sanitizeReplacer.Replace(metric.Name())
var value string
if v, ok := fieldValue.(bool); ok {
if v {
value = "1"
} else {
value = "0"
}
} else {
var err error
value, err = internal.ToString(fieldValue)
if err != nil {
s.Log.Warnf("Cannot convert %v (%T) to string", fieldValue, fieldValue)
continue
}
}
m.WriteString(fmt.Sprintf(s.template, strings.ReplaceAll(name, " ", "_"), strings.ReplaceAll(fieldName, " ", "_")))
for _, tag := range metric.TagList() {
m.WriteString(strings.ReplaceAll(tag.Key, " ", "_"))
m.WriteString("=")
value := tag.Value
if len(value) == 0 {
value = "null"
}
m.WriteString(strings.ReplaceAll(value, " ", "_"))
m.WriteString(" ")
}
m.WriteString(" ")
m.WriteString(value)
m.WriteString(" ")
m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10))
m.WriteString("\n")
}
return m.Bytes()
}
func init() {
serializers.Add("carbon2",
func() telegraf.Serializer {
return &Serializer{}
},
)
}