forked from signalfx/splunk-otel-collector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevaluator.go
144 lines (130 loc) · 4.99 KB
/
evaluator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright Splunk, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discoveryreceiver
import (
"encoding/base64"
"fmt"
"regexp"
"sync"
"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"gopkg.in/yaml.v2"
"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)
// exprEnvFunc is to create an expr.Env function from pattern content.
type exprEnvFunc func(pattern string) map[string]any
// evaluator is the base status matcher that determines if telemetry warrants emitting a matching log record.
// It also provides embedded config correlation that its embedding structs will utilize.
type evaluator struct {
logger *zap.Logger
config *Config
correlations *correlationStore
// this ~sync.Map(map[string]struct{}) keeps track of
// whether we've already emitted a record for the statement and can skip processing.
alreadyLogged *sync.Map
exprEnv exprEnvFunc
}
func newEvaluator(logger *zap.Logger, config *Config, correlations *correlationStore, envFunc exprEnvFunc) *evaluator {
return &evaluator{
logger: logger,
config: config,
correlations: correlations,
alreadyLogged: &sync.Map{},
exprEnv: envFunc,
}
}
// evaluateMatch parses the provided Match and returns whether it warrants a status log record
func (e *evaluator) evaluateMatch(match Match, pattern string, status discovery.StatusType, receiverID component.ID, endpointID observer.EndpointID) (bool, error) {
var matchFunc func(p string) (bool, error)
var matchPattern string
var err error
switch {
case match.Strict != "":
matchPattern = match.Strict
matchFunc = func(p string) (bool, error) {
return p == match.Strict, nil
}
case match.Regexp != "":
matchPattern = match.Regexp
var re *regexp.Regexp
if re, err = regexp.Compile(matchPattern); err != nil {
err = fmt.Errorf("invalid match regexp statement: %w", err)
} else {
matchFunc = func(p string) (bool, error) { return re.MatchString(p), nil }
}
case match.Expr != "":
matchPattern = match.Expr
var program *vm.Program
// we need a way to look up fields that aren't valid identifiers https://github.com/antonmedv/expr/issues/106
env := e.exprEnv(pattern)
env["ExprEnv"] = env
// TODO: cache compiled programs for performance benefit
if program, err = expr.Compile(match.Expr, expr.Env(env)); err != nil {
err = fmt.Errorf("invalid match expr statement: %w", err)
} else {
matchFunc = func(_ string) (bool, error) {
ret, runErr := vm.Run(program, env)
if runErr != nil {
return false, runErr
}
return ret.(bool), nil
}
}
default:
err = fmt.Errorf("no valid match field provided")
}
if err != nil {
return false, err
}
var shouldLog bool
shouldLog, err = matchFunc(pattern)
if !shouldLog || err != nil {
return false, err
}
loggedKey := fmt.Sprintf("%s::%s::%s::%s", endpointID, receiverID.String(), status, matchPattern)
if _, ok := e.alreadyLogged.LoadOrStore(loggedKey, struct{}{}); ok {
shouldLog = false
}
e.logger.Debug(fmt.Sprintf("evaluated match %v against %q (should log: %v)", matchPattern, pattern, shouldLog))
return shouldLog, nil
}
// correlateResourceAttributes sets correlation attributes including embedded base64 config content, if configured.
func (e *evaluator) correlateResourceAttributes(cfg *Config, to map[string]string, corr correlation) {
observerID := corr.observerID.String()
if observerID != "" && observerID != discovery.NoType.String() {
to[discovery.ObserverIDAttr] = observerID
}
if e.config.EmbedReceiverConfig {
embeddedConfig := map[string]any{}
rEntry := cfg.Receivers[corr.receiverID] // it's safe to assume this exists.
embeddedReceiversConfig := map[string]any{}
receiverConfig := map[string]any{}
receiverConfig["rule"] = rEntry.Rule
receiverConfig["config"] = rEntry.Config
receiverConfig["resource_attributes"] = rEntry.ResourceAttributes
embeddedReceiversConfig[corr.receiverID.String()] = receiverConfig
embeddedConfig["receivers"] = embeddedReceiversConfig
if observerID != "" && observerID != discovery.NoType.String() {
embeddedConfig["watch_observers"] = []string{observerID}
}
cfgYaml, err := yaml.Marshal(embeddedConfig)
if err != nil {
e.logger.Error("failed embedding receiver config", zap.String("observer", observerID), zap.Error(err))
}
to[discovery.ReceiverConfigAttr] = base64.StdEncoding.EncodeToString(cfgYaml)
}
}