From 1b68b9018cb2ff41151cd99ec897fe37ab05db82 Mon Sep 17 00:00:00 2001 From: ReallyLiri Date: Mon, 1 Nov 2021 16:27:43 +0200 Subject: [PATCH] use strings instead of hashes --- dedup/dedup.go | 33 ++++++++++ dedup/dedup_test.go | 62 +++++++++++++++++++ dedup/temporal.go | 34 +++++++++++ dedup/temporal_test.go | 36 +++++++++++ diag/diag.go | 133 ++++++++++++++++------------------------- diag/diag_test.go | 14 ++--- diag/events_test.go | 16 ++--- diag/state.go | 52 ++++++++-------- diag/state_model.go | 26 ++++---- diag/tests_verify.go | 2 +- diag/util.go | 97 ------------------------------ diag/util_test.go | 79 ------------------------ integration_test.go | 2 +- kubeclient/client.go | 3 +- main.go | 2 +- store/entityName.go | 16 +++++ store/store.go | 80 ++++++++++++++++++------- store/store_test.go | 129 ++++++++++++++++++++------------------- 18 files changed, 417 insertions(+), 399 deletions(-) create mode 100644 dedup/dedup.go create mode 100644 dedup/dedup_test.go create mode 100644 dedup/temporal.go create mode 100644 dedup/temporal_test.go create mode 100644 store/entityName.go diff --git a/dedup/dedup.go b/dedup/dedup.go new file mode 100644 index 0000000..44ad6b2 --- /dev/null +++ b/dedup/dedup.go @@ -0,0 +1,33 @@ +package dedup + +import "github.com/adrg/strutil/metrics" + +var levenshtein *metrics.Levenshtein + +const smallFactor = 1 +const bigFactor = 3 + +func init() { + levenshtein = metrics.NewLevenshtein() + levenshtein.CaseSensitive = true + levenshtein.InsertCost = bigFactor + levenshtein.DeleteCost = bigFactor + levenshtein.ReplaceCost = smallFactor +} + +func max(a int, b int) int { + if a >= b { + return a + } + return b +} + +func AreSimilar(a string, b string, similarityThreshold float64) bool { + maxLenFactor := bigFactor * max(len(a), len(b)) + if maxLenFactor == 0 { + return true + } + distance := levenshtein.Distance(a, b) + score := 1 - float64(distance)/float64(maxLenFactor) + return score >= similarityThreshold +} diff --git a/dedup/dedup_test.go b/dedup/dedup_test.go new file mode 100644 index 0000000..35979ea --- /dev/null +++ b/dedup/dedup_test.go @@ -0,0 +1,62 @@ +package dedup + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_dedup(t *testing.T) { + + assert.True(t, AreSimilar("", "", 0)) + assert.True(t, AreSimilar("", "", 0.5)) + assert.True(t, AreSimilar("", "", 1)) + + assert.True(t, AreSimilar("a", "", 0)) + assert.False(t, AreSimilar("a", "", 0.1)) + assert.False(t, AreSimilar("", "a", 0.1)) + + assert.True(t, AreSimilar( + `Event by kubelet: Failed x since , : + Failed to pull image "nginx:l4t3st": rpc error: code = Unknown desc = Error response from daemon: manifest for nginx:l4t3st not found: manifest unknown: manifest unknown`, + `Event by kubelet: Failed x since , : + Error: ErrImagePull`, + 0.1, + )) + assert.False(t, AreSimilar( + `Event by kubelet: Failed x since , : + Failed to pull image "nginx:l4t3st": rpc error: code = Unknown desc = Error response from daemon: manifest for nginx:l4t3st not found: manifest unknown: manifest unknown`, + `Event by kubelet: Failed x since , : + Error: ErrImagePull`, + 0.75, + )) + assert.False(t, AreSimilar( + `Event by kubelet: Failed x since , : + Failed to pull image "nginx:l4t3st": rpc error: code = Unknown desc = Error response from daemon: manifest for nginx:l4t3st not found: manifest unknown: manifest unknown`, + `Event by kubelet: Failed x since , : + Error: ImagePullBackOff`, + 0.75, + )) + + assert.True(t, AreSimilar( + `Event by kubelet: Failed x since , : + Error: ErrImagePull`, + `Event by kubelet: Failed x since , : + Error: ImagePullBackOff`, + 0.6, + )) + assert.False(t, AreSimilar( + `Event by kubelet: Failed x since , : + Error: ErrImagePull`, + `Event by kubelet: Failed x since , : + Error: ImagePullBackOff`, + 0.95, + )) + + assert.True(t, AreSimilar( + `Event by kernel-monitor: TaskHung since , : +INFO: task runc:[2:INIT]:293016 blocked for more than 327 seconds.`, + `Event by kernel-monitor: TaskHung since , : +INFO: task runc:[2:INIT]:309147 blocked for more than 327 seconds.`, + 0.8, + )) +} diff --git a/dedup/temporal.go b/dedup/temporal.go new file mode 100644 index 0000000..e31cf87 --- /dev/null +++ b/dedup/temporal.go @@ -0,0 +1,34 @@ +package dedup + +import ( + "fmt" + log "github.com/sirupsen/logrus" + "strings" +) + +const temporalStart = "" +const temporalEnd = "" + +func NormalizeTemporal(message string) string { + for { + temporalStartIndex := strings.Index(message, temporalStart) + if temporalStartIndex == -1 { + break + } + temporalEndIndex := strings.Index(message, temporalEnd) + if temporalEndIndex == -1 || temporalEndIndex < temporalStartIndex { + log.Errorf("invalid temporal format for %v", message) + break + } + message = message[:temporalStartIndex] + message[(temporalEndIndex+len(temporalEnd)):] + } + return message +} + +func CleanTemporal(message string) string { + return strings.ReplaceAll(strings.ReplaceAll(message, temporalStart, ""), temporalEnd, "") +} + +func WrapTemporal(item interface{}) string { + return fmt.Sprintf("%v%v%v", temporalStart, item, temporalEnd) +} diff --git a/dedup/temporal_test.go b/dedup/temporal_test.go new file mode 100644 index 0000000..d77596a --- /dev/null +++ b/dedup/temporal_test.go @@ -0,0 +1,36 @@ +package dedup + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_normalizeMessage(t *testing.T) { + assert.Equal(t, "", NormalizeTemporal("")) + assert.Equal(t, "abc", NormalizeTemporal("abc")) + assert.Equal(t, "hello world", NormalizeTemporal("hello world")) + assert.Equal(t, "", NormalizeTemporal("hello world")) + assert.Equal(t, "", NormalizeTemporal("")) + assert.Equal(t, "The is here", NormalizeTemporal("The hello world is here")) + assert.Equal(t, "The brown jumps the dog", NormalizeTemporal("The quick brown fox jumps over the lazy dog")) + assert.Equal(t, "tt", NormalizeTemporal("tt")) + assert.Equal(t, "tat", NormalizeTemporal("tat")) + assert.Equal(t, "tt", NormalizeTemporal("ttt")) + assert.Equal(t, "tt", NormalizeTemporal("ttt")) + assert.Equal(t, "tt", NormalizeTemporal("ttt")) +} + +func Test_cleanMessage(t *testing.T) { + assert.Equal(t, "", CleanTemporal("")) + assert.Equal(t, "abc", CleanTemporal("abc")) + assert.Equal(t, "hello world", CleanTemporal("hello world")) + assert.Equal(t, "hello world", CleanTemporal("hello world")) + assert.Equal(t, "", CleanTemporal("")) + assert.Equal(t, "The hello world is here", CleanTemporal("The hello world is here")) + assert.Equal(t, "The quick brown fox jumps over the lazy dog", CleanTemporal("The quick brown fox jumps over the lazy dog")) + assert.Equal(t, "tt", CleanTemporal("tt")) + assert.Equal(t, "tat", CleanTemporal("tat")) + assert.Equal(t, "ttt", CleanTemporal("ttt")) + assert.Equal(t, "ttt", CleanTemporal("ttt")) + assert.Equal(t, "ttt", CleanTemporal("ttt")) +} diff --git a/diag/diag.go b/diag/diag.go index 097288a..9313aa9 100644 --- a/diag/diag.go +++ b/diag/diag.go @@ -3,12 +3,12 @@ package diag import ( "github.com/reallyliri/kubescout/alert" "github.com/reallyliri/kubescout/config" + "github.com/reallyliri/kubescout/dedup" "github.com/reallyliri/kubescout/internal" "github.com/reallyliri/kubescout/kubeclient" "github.com/reallyliri/kubescout/store" log "github.com/sirupsen/logrus" "go.uber.org/multierr" - "sort" "time" ) @@ -19,8 +19,8 @@ type diagContext struct { includedNamespacesSet map[string]bool excludedNamespacesSet map[string]bool client kubeclient.KubernetesClient - statesByName map[entityName]*entityState - eventsByName map[entityName][]*eventState + statesByName map[store.EntityName]*entityState + eventsByName map[store.EntityName][]*eventState } var excludeStandaloneEventsOnKinds = map[string]bool{ @@ -44,19 +44,18 @@ func testContextWithClient(now time.Time, client kubeclient.KubernetesClient) *d return &diagContext{ config: cfg, client: client, - statesByName: map[entityName]*entityState{}, - eventsByName: map[entityName][]*eventState{}, + statesByName: map[store.EntityName]*entityState{}, + eventsByName: map[store.EntityName][]*eventState{}, now: now, } } -func unhealthyEvents(state *entityState, events []*eventState) []*eventState { - var unhealthy []*eventState +func unhealthyEvents(state *entityState, events []*eventState) (unhealthy []*eventState) { for _, evState := range events { if evState.isHealthy() { continue } - if !evState.lastTimestamp.IsZero() && !state.createdTimestamp.IsZero() { + if !evState.lastTimestamp.IsZero() && state != nil && !state.createdTimestamp.IsZero() { sinceCreation := evState.lastTimestamp.Sub(state.createdTimestamp) if sinceCreation < graceTimeForEventSinceEntityCreation { continue @@ -64,47 +63,25 @@ func unhealthyEvents(state *entityState, events []*eventState) []*eventState { } unhealthy = append(unhealthy, evState) } - - sort.SliceStable(unhealthy, func(i, j int) bool { - message1 := normalizeMessage(unhealthy[i].message) - message2 := normalizeMessage(unhealthy[j].message) - return message1 > message2 // sorting reverse - }) - - cast := make([]interface{}, len(unhealthy)) - for i, item := range unhealthy { - cast[i] = item - } - const similarityThreshold = 0.6 - indexes := dedup(cast, func(item interface{}) string { - return normalizeMessage(item.(*eventState).message) - }, similarityThreshold) - if indexes == nil || unhealthy == nil { - return unhealthy - } - dedupedUnhealthy := make([]*eventState, len(indexes)) - for i, index := range indexes { - dedupedUnhealthy[i] = unhealthy[index] - } - return dedupedUnhealthy + return } -func (context *diagContext) handleEntityState(state *entityState, events []*eventState) (stored bool) { +func (context *diagContext) handleEntityState(state *entityState, events []*eventState) { isHealthy := state.isHealthy() events = unhealthyEvents(state, events) - if state.name.kind == "Node" && len(events) > 0 { + if state.name.Kind == "Node" && len(events) > 0 { isHealthy = false } if isHealthy { log.Trace(state.String()) - return false + return } entityAlert := &alert.EntityAlert{ ClusterName: context.store.Cluster, - Namespace: state.name.namespace, - Name: state.name.name, - Kind: state.name.kind, + Namespace: state.name.Namespace, + Name: state.name.Name, + Kind: state.name.Kind, Node: state.node, Messages: []string{}, Events: []string{}, @@ -112,65 +89,59 @@ func (context *diagContext) handleEntityState(state *entityState, events []*even Timestamp: context.now, } - addedHashes := make(map[string]bool) for _, message := range state.messages { - messageHash := hash(state.name, normalizeMessage(message)) - if !addedHashes[messageHash] && context.store.ShouldAdd(messageHash, context.now) { - addedHashes[messageHash] = true - entityAlert.Messages = append(entityAlert.Messages, cleanMessage(message)) - } - } - for _, event := range events { - messageHash := hash(event.name, normalizeMessage(event.message)) - if !addedHashes[messageHash] && context.store.ShouldAdd(messageHash, context.now) { - addedHashes[messageHash] = true - entityAlert.Events = append(entityAlert.Events, cleanMessage(event.message)) + stored := context.store.TryAdd(state.name, message, context.now) + if stored { + entityAlert.Messages = append(entityAlert.Messages, dedup.CleanTemporal(message)) } } - deduped := len(addedHashes) == 0 - if deduped { + if len(entityAlert.Messages) == 0 { log.Infof("[DEDUPED] %v", state) - } else { - log.Info(state.String()) - entityAlert.LogsByContainerName = state.logsCollections - context.store.Add(entityAlert, internal.Keys(addedHashes), context.now) + return } - return deduped + for _, event := range events { + stored := context.store.TryAdd(state.name, event.message, context.now) + if stored { + entityAlert.Events = append(entityAlert.Events, dedup.CleanTemporal(event.message)) + } + } + + log.Info(state.String()) + entityAlert.LogsByContainerName = state.logsCollections + context.store.Alerts = append(context.store.Alerts, entityAlert) } -func (context *diagContext) handleStandaloneEvent(state *eventState) (stored bool) { - if state.isHealthy() { - log.Tracef(state.String()) - return false - } +func (context *diagContext) handleStandaloneEvents(name store.EntityName, events []*eventState) { - log.Infof(state.String()) + events = unhealthyEvents(nil, events) - if excludeStandaloneEventsOnKinds[state.name.kind] { - return false + if excludeStandaloneEventsOnKinds[name.Kind] { + return } entityAlert := &alert.EntityAlert{ ClusterName: context.store.Cluster, - Namespace: state.name.namespace, - Name: state.name.name, - Kind: state.name.kind, + Namespace: name.Namespace, + Name: name.Name, + Kind: name.Kind, Messages: []string{}, Events: []string{}, LogsByContainerName: map[string]string{}, Timestamp: context.now, } - messageHash := hash(state.name, normalizeMessage(state.message)) - if !context.store.ShouldAdd(messageHash, context.now) { - return false + for _, event := range events { + stored := context.store.TryAdd(name, event.message, context.now) + if stored { + entityAlert.Events = append(entityAlert.Events, dedup.CleanTemporal(event.message)) + } } - entityAlert.Events = append(entityAlert.Events, cleanMessage(state.message)) - context.store.Add(entityAlert, []string{messageHash}, context.now) - return true + if len(entityAlert.Events) > 0 { + context.store.Alerts = append(context.store.Alerts, entityAlert) + } } func (context *diagContext) isNamespaceRelevant(namespaceName string) bool { @@ -183,16 +154,16 @@ func (context *diagContext) isNamespaceRelevant(namespaceName string) bool { return true } -func DiagnoseCluster(client kubeclient.KubernetesClient, cfg *config.Config, store *store.ClusterStore, now time.Time) (aggregatedError error) { +func DiagnoseCluster(client kubeclient.KubernetesClient, cfg *config.Config, clusterStore *store.ClusterStore, now time.Time) (aggregatedError error) { context := diagContext{ config: cfg, - store: store, + store: clusterStore, now: now, includedNamespacesSet: internal.ToBoolMap(cfg.IncludeNamespaces), excludedNamespacesSet: internal.ToBoolMap(cfg.ExcludeNamespaces), client: client, - statesByName: map[entityName]*entityState{}, - eventsByName: map[entityName][]*eventState{}, + statesByName: map[store.EntityName]*entityState{}, + eventsByName: map[store.EntityName][]*eventState{}, } err := context.collectStates() @@ -205,12 +176,8 @@ func DiagnoseCluster(client kubeclient.KubernetesClient, cfg *config.Config, sto delete(context.eventsByName, name) } - for _, states := range context.eventsByName { - for _, state := range states { - if !state.isHealthy() { - context.handleStandaloneEvent(state) - } - } + for entityName, states := range context.eventsByName { + context.handleStandaloneEvents(entityName, states) } return diff --git a/diag/diag_test.go b/diag/diag_test.go index 8dbe07f..9eb5016 100644 --- a/diag/diag_test.go +++ b/diag/diag_test.go @@ -70,8 +70,8 @@ func Test_Diagnose(t *testing.T) { assert.Equal(t, 2, len(alerts[i].Events)) assert.Equal(t, `Event by kubelet: Failed x4 since 17 Oct 21 14:15 UTC, 4 minutes ago (last seen 2 minutes ago): Failed to pull image "nginx:l4t3st": rpc error: code = Unknown desc = Error response from daemon: manifest for nginx:l4t3st not found: manifest unknown: manifest unknown`, alerts[i].Events[0]) - assert.Equal(t, `Event by kubelet: Failed x6 since 17 Oct 21 14:15 UTC, 4 minutes ago (last seen 2 minutes ago): - Error: ImagePullBackOff`, alerts[i].Events[1]) + assert.Equal(t, `Event by kubelet: Failed x4 since 17 Oct 21 14:15 UTC, 4 minutes ago (last seen 2 minutes ago): + Error: ErrImagePull`, alerts[i].Events[1]) assert.Equal(t, 1, len(alerts[i].LogsByContainerName)) assert.Equal(t, "default/test-2-broken-image-7cbf974df9-4jv8f/test-2-broken-image/logs", alerts[i].LogsByContainerName["test-2-broken-image"]) @@ -228,7 +228,7 @@ func Test_Diagnose_EventsDeduplicationOnLivenessCheckFail(t *testing.T) { stor, err = store.LoadOrCreate(cfg) require.Nil(t, err) - now = now.Add(time.Minute * time.Duration(17)) + now = now.Add(time.Minute * time.Duration(17)) clusterStore = stor.GetClusterStore(clusterName, now) assert.Equal(t, 0, len(clusterStore.Alerts)) err = DiagnoseCluster(client, cfg, clusterStore, now) @@ -266,15 +266,15 @@ func Test_Diagnose_EventsDeduplicationOnRpcError(t *testing.T) { assert.Equal(t, 1, len(alerts[i].Messages)) assert.Equal(t, "4 containers are still initializing [ init-container (init), run-migrations (init), wait-for-database (init), wait-for-queue (init) ] (since 1 hour ago)", alerts[i].Messages[0]) assert.Equal(t, 2, len(alerts[i].Events)) - assert.Equal(t, `Event by kubelet: FailedCreatePodSandBox x2 since 31 Oct 21 14:21 UTC, 8 minutes ago (last seen 7 minutes ago): - Failed to create pod sandbox: rpc error: code = Unknown desc = failed to reserve sandbox name "api-74767b9df-xxsrs_ci_6e8c94db-df75-480c-824b-92eb95e99296_0": name "api-74767b9df-xxsrs_ci_6e8c94db-df75-480c-824b-92eb95e99296_0" is reserved for "067d2ab2c9c553f48b0294d2b07926ea278a6b1ad74c716dd59b43cf3d2ca6e9"`, alerts[i].Events[0]) assert.Equal(t, `Event by kubelet: FailedCreatePodSandBox x18 since 31 Oct 21 13:32 UTC, 57 minutes ago (last seen now): - Failed to create pod sandbox: rpc error: code = DeadlineExceeded desc = context deadline exceeded`, alerts[i].Events[1]) + Failed to create pod sandbox: rpc error: code = DeadlineExceeded desc = context deadline exceeded`, alerts[i].Events[0]) + assert.Equal(t, `Event by kubelet: FailedCreatePodSandBox since 31 Oct 21 13:54 UTC, 35 minutes ago: + Failed to create pod sandbox: rpc error: code = Unknown desc = failed to reserve sandbox name "api-74767b9df-xxsrs_ci_6e8c94db-df75-480c-824b-92eb95e99296_0": name "api-74767b9df-xxsrs_ci_6e8c94db-df75-480c-824b-92eb95e99296_0" is reserved for "6e9a80c560f39bd7a0052d571903183bd099e9f6f2a7e7ccd92381dc721ac5f0"`, alerts[i].Events[1]) assert.Equal(t, 0, len(alerts[i].LogsByContainerName)) stor, err = store.LoadOrCreate(cfg) require.Nil(t, err) - now = now.Add(time.Minute * time.Duration(17)) + now = now.Add(time.Minute * time.Duration(17)) clusterStore = stor.GetClusterStore(clusterName, now) assert.Equal(t, 0, len(clusterStore.Alerts)) err = DiagnoseCluster(client, cfg, clusterStore, now) diff --git a/diag/events_test.go b/diag/events_test.go index 481eb4b..a6b1294 100644 --- a/diag/events_test.go +++ b/diag/events_test.go @@ -27,7 +27,7 @@ func TestEventState_StandardEvents(t *testing.T) { require.Nil(t, err) log.Debug(state.String()) require.False(t, state.isHealthy()) - require.NotEmpty(t, state.name.name) + require.NotEmpty(t, state.name.Name) messages := strings.Split(state.cleanMessage(), "\n") require.Equal(t, 4, len(messages)) require.Equal(t, "Event by kubelet: Unhealthy x2 since 12 Oct 21 13:54 UTC, 41 seconds ago (last seen 26 seconds ago):", messages[0]) @@ -56,7 +56,7 @@ func TestEventState_MountFailedEvents(t *testing.T) { require.Nil(t, err) log.Debugf("%v) %v", 1, state) require.False(t, state.isHealthy()) - require.NotEmpty(t, state.name.name) + require.NotEmpty(t, state.name.Name) messages := strings.Split(state.cleanMessage(), "\n") assert.Equal(t, 2, len(messages)) assert.Equal(t, "Event by kubelet: Failed x351 since 12 Oct 21 12:00 UTC, 1 hour ago (last seen 9 minutes ago):", messages[0]) @@ -66,7 +66,7 @@ func TestEventState_MountFailedEvents(t *testing.T) { require.Nil(t, err) log.Debugf("%v) %v", 10, state) require.False(t, state.isHealthy()) - require.NotEmpty(t, state.name.name) + require.NotEmpty(t, state.name.Name) messages = strings.Split(state.cleanMessage(), "\n") assert.Equal(t, 2, len(messages)) assert.Equal(t, "Event by default-scheduler: FailedScheduling x476 since 12 Oct 21 12:01 UTC, 1 hour ago (last seen 4 minutes ago):", messages[0]) @@ -76,7 +76,7 @@ func TestEventState_MountFailedEvents(t *testing.T) { require.Nil(t, err) log.Debugf("%v) %v", 11, state) require.False(t, state.isHealthy()) - require.NotEmpty(t, state.name.name) + require.NotEmpty(t, state.name.Name) messages = strings.Split(state.cleanMessage(), "\n") assert.Equal(t, 2, len(messages)) assert.Equal(t, "Event by kubelet: FailedMount x10 since 12 Oct 21 12:02 UTC, 1 hour ago (last seen 3 minutes ago):", messages[0]) @@ -86,7 +86,7 @@ func TestEventState_MountFailedEvents(t *testing.T) { require.Nil(t, err) log.Debugf("%v) %v", 12, state) require.False(t, state.isHealthy()) - require.NotEmpty(t, state.name.name) + require.NotEmpty(t, state.name.Name) messages = strings.Split(state.cleanMessage(), "\n") assert.Equal(t, 2, len(messages)) assert.Equal(t, "Event by kubelet: FailedMount x28 since 12 Oct 21 12:05 UTC, 1 hour ago (last seen 5 minutes ago):", messages[0]) @@ -106,7 +106,7 @@ func TestEventState_NodeProblemDetector(t *testing.T) { require.Nil(t, err) log.Debugf("%v) %v", 1, state) require.False(t, state.isHealthy()) - require.NotEmpty(t, state.name.name) + require.NotEmpty(t, state.name.Name) messages := strings.Split(state.cleanMessage(), "\n") assert.Equal(t, 2, len(messages)) assert.Equal(t, "Event by kernel-monitor: KernelOops since 14 Oct 21 06:10 UTC, 19 minutes ago:", messages[0]) @@ -130,7 +130,7 @@ func TestEventState_FailedJobs(t *testing.T) { require.Nil(t, err) log.Debug(state.String()) require.False(t, state.isHealthy()) - require.NotEmpty(t, state.name.name) + require.NotEmpty(t, state.name.Name) messages := strings.Split(state.cleanMessage(), "\n") require.Equal(t, 2, len(messages)) require.Equal(t, "Event by job-controller: BackoffLimitExceeded since 21 Oct 21 10:06 UTC, 53 minutes ago:", messages[0]) @@ -182,7 +182,7 @@ func TestEventState_LivenessFailed(t *testing.T) { require.Nil(t, err) log.Debug(state.String()) require.False(t, state.isHealthy()) - require.NotEmpty(t, state.name.name) + require.NotEmpty(t, state.name.Name) messages := strings.Split(state.cleanMessage(), "\n") require.True(t, len(messages) >= 2, len(messages)) require.True(t, len(messages) <= 9, len(messages)) diff --git a/diag/state.go b/diag/state.go index 6b44883..b05a4aa 100644 --- a/diag/state.go +++ b/diag/state.go @@ -2,6 +2,8 @@ package diag import ( "fmt" + "github.com/reallyliri/kubescout/dedup" + "github.com/reallyliri/kubescout/store" log "github.com/sirupsen/logrus" v12 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -13,10 +15,10 @@ import ( ) func (context *diagContext) getOrAddState(namespace, kind, name string, createdTimestamp time.Time) *entityState { - eName := entityName{ - namespace: namespace, - kind: kind, - name: name, + eName := store.EntityName{ + Namespace: namespace, + Kind: kind, + Name: name, } state, found := context.statesByName[eName] if !found { @@ -26,7 +28,7 @@ func (context *diagContext) getOrAddState(namespace, kind, name string, createdT return state } -func (context *diagContext) addEventState(eName entityName) *eventState { +func (context *diagContext) addEventState(eName store.EntityName) *eventState { eventsOfEntity, exists := context.eventsByName[eName] if !exists { eventsOfEntity = []*eventState{} @@ -92,10 +94,10 @@ func (state *entityState) checkContainerStatuses(pod *v1.Pod, context *diagConte sort.Strings(*pending) state.appendMessage( "%v still %v [ %v ] (since %v)", - wrapTemporal(formatPlural(len(*pending), "One container is", "containers are")), + dedup.WrapTemporal(formatPlural(len(*pending), "One container is", "containers are")), pendingVerb, strings.Join(*pending, ", "), - wrapTemporal(formatDuration(pod.CreationTimestamp.Time, context.now)), + dedup.WrapTemporal(formatDuration(pod.CreationTimestamp.Time, context.now)), ) } @@ -108,14 +110,14 @@ func (state *entityState) checkContainerStatuses(pod *v1.Pod, context *diagConte "%v: %v (last transition: %v)", splitToWords(condition.Reason), condition.Message, - wrapTemporal(formatDuration(condition.LastTransitionTime.Time, context.now)), + dedup.WrapTemporal(formatDuration(condition.LastTransitionTime.Time, context.now)), ) } } if !anyConditionMessage { sinceCreation := context.now.Sub(pod.CreationTimestamp.Time).Seconds() if pod.Status.Phase != v1.PodPending || sinceCreation >= context.config.PodStartingGracePeriodSeconds { - state.appendMessage("Pod is in %v phase (since %v)", pod.Status.Phase, wrapTemporal(formatDuration(pod.CreationTimestamp.Time, context.now))) + state.appendMessage("Pod is in %v phase (since %v)", pod.Status.Phase, dedup.WrapTemporal(formatDuration(pod.CreationTimestamp.Time, context.now))) } } } @@ -146,7 +148,7 @@ func (state *entityState) checkContainerStatus(pod *v1.Pod, containerStatus v1.C } else if stateWaiting.Reason == "PodInitializing" && startingGracePassed { waitingToInitialize = true } else if !ignoreWaitingReasons[stateWaiting.Reason] { - state.appendMessage("%v still waiting due to %v: %v", title, stateWaiting.Reason, wrapTemporal(stateWaiting.Message)) + state.appendMessage("%v still waiting due to %v: %v", title, stateWaiting.Reason, dedup.WrapTemporal(stateWaiting.Message)) shouldCollectLogs = true } } @@ -159,9 +161,9 @@ func (state *entityState) checkContainerStatus(pod *v1.Pod, containerStatus v1.C prefix = fmt.Sprintf("%v is in %v:", title, stateWaiting.Reason) } if stateTerminated != nil { - state.appendMessage("%v restarted %v times, last exit due to %v (exit code %v)", prefix, wrapTemporal(containerStatus.RestartCount), stateTerminated.Reason, stateTerminated.ExitCode) + state.appendMessage("%v restarted %v times, last exit due to %v (exit code %v)", prefix, dedup.WrapTemporal(containerStatus.RestartCount), stateTerminated.Reason, stateTerminated.ExitCode) } else { - state.appendMessage("%v restarted %v times", prefix, wrapTemporal(containerStatus.RestartCount)) + state.appendMessage("%v restarted %v times", prefix, dedup.WrapTemporal(containerStatus.RestartCount)) } shouldCollectLogs = true } @@ -203,7 +205,7 @@ func (context *diagContext) podState(pod *v1.Pod) (state *entityState, err error if statusReason != "" { statusMessage := strings.TrimSpace(pod.Status.Message) if statusReason == "Evicted" { - statusMessage = wrapTemporal(formatUnitsSize(statusMessage)) + statusMessage = dedup.WrapTemporal(formatUnitsSize(statusMessage)) } state.appendMessage("Pod is in %v phase due to %v: %v", podPhase, statusReason, statusMessage) } else if pod.DeletionTimestamp != nil { @@ -213,7 +215,7 @@ func (context *diagContext) podState(pod *v1.Pod) (state *entityState, err error if pod.DeletionGracePeriodSeconds != nil && *pod.DeletionGracePeriodSeconds != 0 { suffix = fmt.Sprintf(" (deletion grace is %v sec)", *pod.DeletionGracePeriodSeconds) } - state.appendMessage("Pod is Terminating since %v%v", wrapTemporal(formatDuration(deletionTime, context.now)), suffix) + state.appendMessage("Pod is Terminating since %v%v", dedup.WrapTemporal(formatDuration(deletionTime, context.now)), suffix) } } else if podPhase != v1.PodRunning && podPhase != v1.PodPending { state.appendMessage("Pod is in %v phase", podPhase) @@ -242,7 +244,7 @@ func (context *diagContext) nodeState(node *v1.Node, forceCheckResources bool) ( "%v: %v (last transition: %v)", splitToWords(condition.Reason), formatUnitsSize(condition.Message), - wrapTemporal(formatDuration(condition.LastTransitionTime.Time, context.now)), + dedup.WrapTemporal(formatDuration(condition.LastTransitionTime.Time, context.now)), ) } @@ -305,20 +307,20 @@ func (context *diagContext) replicaSetState(replicaSet *v12.ReplicaSet) (state * "%v: %v (last transition: %v)", splitToWords(condition.Reason), formatUnitsSize(condition.Message), - wrapTemporal(formatDuration(condition.LastTransitionTime.Time, context.now)), + dedup.WrapTemporal(formatDuration(condition.LastTransitionTime.Time, context.now)), ) } return } func (context *diagContext) eventState(event *v1.Event) (state *eventState, err error) { - var eName entityName + var eName store.EntityName if event.InvolvedObject.Name != "" { - eName.namespace = event.InvolvedObject.Namespace - eName.kind = event.InvolvedObject.Kind - eName.name = event.InvolvedObject.Name + eName.Namespace = event.InvolvedObject.Namespace + eName.Kind = event.InvolvedObject.Kind + eName.Name = event.InvolvedObject.Name } else { - eName.kind = "Cluster" + eName.Kind = "Cluster" } state = context.addEventState(eName) @@ -355,17 +357,17 @@ func (context *diagContext) eventState(event *v1.Event) (state *eventState, err event.Reason, )) if count > 1 { - builder.WriteString(fmt.Sprintf("x%v ", wrapTemporal(count))) + builder.WriteString(fmt.Sprintf("x%v ", dedup.WrapTemporal(count))) } builder.WriteString(fmt.Sprintf( "since %v, %v", - wrapTemporal(formatTime(firstTimestamp, context.config.TimeFormat, context.config.Locale)), - wrapTemporal(formatDuration(firstTimestamp, context.now)), + dedup.WrapTemporal(formatTime(firstTimestamp, context.config.TimeFormat, context.config.Locale)), + dedup.WrapTemporal(formatDuration(firstTimestamp, context.now)), )) if firstTimestamp != lastTimestamp && !lastTimestamp.IsZero() { - builder.WriteString(wrapTemporal(fmt.Sprintf( + builder.WriteString(dedup.WrapTemporal(fmt.Sprintf( " (last seen %v)", formatDuration(lastTimestamp, context.now), ))) diff --git a/diag/state_model.go b/diag/state_model.go index 204b22e..4ea6b75 100644 --- a/diag/state_model.go +++ b/diag/state_model.go @@ -3,19 +3,15 @@ package diag import ( "fmt" "github.com/goombaio/orderedmap" + "github.com/reallyliri/kubescout/dedup" "github.com/reallyliri/kubescout/internal" + "github.com/reallyliri/kubescout/store" "strings" "time" ) -type entityName struct { - namespace string - kind string - name string -} - type entityState struct { - name entityName + name store.EntityName messages []string node string createdTimestamp time.Time @@ -23,12 +19,12 @@ type entityState struct { } type eventState struct { - name entityName + name store.EntityName message string lastTimestamp time.Time } -func newState(entityName entityName, createdTimestamp time.Time) *entityState { +func newState(entityName store.EntityName, createdTimestamp time.Time) *entityState { return &entityState{ name: entityName, messages: []string{}, @@ -43,16 +39,16 @@ func (state *entityState) isHealthy() bool { func (state *entityState) String() string { if state.isHealthy() { - return fmt.Sprintf("%v is healthy\n", state.name.name) + return fmt.Sprintf("%v is healthy\n", state.name.Name) } - messages := append([]string{fmt.Sprintf("%v %v is un-healthy", state.name.kind, state.name.name)}, state.cleanMessages()...) + messages := append([]string{fmt.Sprintf("%v %v is un-healthy", state.name.Kind, state.name.Name)}, state.cleanMessages()...) return strings.Join(messages, "\n\t") } func (state *entityState) cleanMessages() []string { cleanMessages := orderedmap.NewOrderedMap() for _, message := range state.messages { - a := cleanMessage(message) + a := dedup.CleanTemporal(message) cleanMessages.Put(a, true) } return internal.CastToString(cleanMessages.Keys()) @@ -77,11 +73,11 @@ func (state *eventState) isHealthy() bool { func (state *eventState) String() string { if state.isHealthy() { - return fmt.Sprintf("%v has a healthy event\n", state.name.name) + return fmt.Sprintf("%v has a healthy event\n", state.name.Name) } - return fmt.Sprintf("Event on %v %v: %v", state.name.kind, state.name.name, cleanMessage(state.message)) + return fmt.Sprintf("Event on %v %v: %v", state.name.Kind, state.name.Name, dedup.CleanTemporal(state.message)) } func (state *eventState) cleanMessage() string { - return cleanMessage(state.message) + return dedup.CleanTemporal(state.message) } diff --git a/diag/tests_verify.go b/diag/tests_verify.go index 052f65b..ee37a46 100644 --- a/diag/tests_verify.go +++ b/diag/tests_verify.go @@ -23,7 +23,7 @@ func verifyEventHealthy(t *testing.T, event *v1.Event, now time.Time, index int) assert.Nil(t, err) log.Debugf("%v) %v", index, state) assert.True(t, state.isHealthy()) - assert.NotEmpty(t, state.name.kind) + assert.NotEmpty(t, state.name.Kind) assert.Empty(t, state.message) } diff --git a/diag/util.go b/diag/util.go index 02ba83b..4e47970 100644 --- a/diag/util.go +++ b/diag/util.go @@ -1,26 +1,19 @@ package diag import ( - "crypto/sha1" "fmt" - "github.com/adrg/strutil/metrics" "github.com/dustin/go-humanize" "github.com/fatih/camelcase" - log "github.com/sirupsen/logrus" "regexp" "strconv" "strings" "time" ) -const temporalStart = "" -const temporalEnd = "" - var kiRegex *regexp.Regexp var miRegex *regexp.Regexp var giRegex *regexp.Regexp var mRegex *regexp.Regexp -var levenshtein *metrics.Levenshtein func init() { var err error @@ -40,35 +33,6 @@ func init() { if err != nil { panic(fmt.Errorf("failed to compile regex: %v", err)) } - levenshtein = metrics.NewLevenshtein() - levenshtein.CaseSensitive = true - levenshtein.InsertCost = 3 - levenshtein.DeleteCost = 3 - levenshtein.ReplaceCost = 1 -} - -func normalizeMessage(message string) string { - for { - temporalStartIndex := strings.Index(message, temporalStart) - if temporalStartIndex == -1 { - break - } - temporalEndIndex := strings.Index(message, temporalEnd) - if temporalEndIndex == -1 || temporalEndIndex < temporalStartIndex { - log.Errorf("invalid temporal format for %v", message) - break - } - message = message[:temporalStartIndex] + message[(temporalEndIndex+len(temporalEnd)):] - } - return message -} - -func cleanMessage(message string) string { - return strings.ReplaceAll(strings.ReplaceAll(message, temporalStart, ""), temporalEnd, "") -} - -func wrapTemporal(item interface{}) string { - return fmt.Sprintf("%v%v%v", temporalStart, item, temporalEnd) } func splitToWords(value string) string { @@ -183,64 +147,3 @@ func formatPlural(count int, singular string, plural string) string { } return fmt.Sprintf("%v %v", count, plural) } - -func hash(entityName entityName, message string) string { - sha := sha1.New() - sha.Write([]byte(entityName.namespace)) - sha.Write([]byte(entityName.kind)) - sha.Write([]byte(entityName.name)) - sha.Write([]byte(message)) - asBytes := sha.Sum(nil) - return fmt.Sprintf("%x", asBytes) -} - -func forRange(from int, until int, action func(int) bool) { - i := from - for { - if i >= until { - return - } - shouldContinue := action(i) - if !shouldContinue { - return - } - i++ - } -} - -func max(a int, b int) int { - if a >= b { - return a - } - return b -} - -func dedup(items []interface{}, dedupOnValue func(interface{}) string, similarityThreshold float64) []int { - if len(items) == 0 { - return nil - } - values := make([]string, len(items)) - for i, item := range items { - values[i] = dedupOnValue(item) - } - - var indexes []int - forRange(0, len(values), func(i int) bool { - anySimilar := false - forRange(0, i, func(j int) bool { - distance := levenshtein.Distance(values[i], values[j]) - score := 1 - float64(distance)/float64(max(len(values[i]), len(values[j]))) - if score >= similarityThreshold { - anySimilar = true - return false - } - return true - }) - if !anySimilar { - indexes = append(indexes, i) - } - return true - }) - - return indexes -} diff --git a/diag/util_test.go b/diag/util_test.go index ef7c398..459d887 100644 --- a/diag/util_test.go +++ b/diag/util_test.go @@ -51,82 +51,3 @@ func Test_formatResourceUsage(t *testing.T) { assert.Equal(t, "", formatResourceUsage(48433408, 53485824, "Memory", 0.75)) assert.Equal(t, "Excessive usage of Memory: 48MB/54MB (90.6% usage)", formatResourceUsage(5052416, 53485824, "Memory", 0.75)) } - -func Test_normalizeMessage(t *testing.T) { - assert.Equal(t, "", normalizeMessage("")) - assert.Equal(t, "abc", normalizeMessage("abc")) - assert.Equal(t, "hello world", normalizeMessage("hello world")) - assert.Equal(t, "", normalizeMessage("hello world")) - assert.Equal(t, "", normalizeMessage("")) - assert.Equal(t, "The is here", normalizeMessage("The hello world is here")) - assert.Equal(t, "The brown jumps the dog", normalizeMessage("The quick brown fox jumps over the lazy dog")) - assert.Equal(t, "tt", normalizeMessage("tt")) - assert.Equal(t, "tat", normalizeMessage("tat")) - assert.Equal(t, "tt", normalizeMessage("ttt")) - assert.Equal(t, "tt", normalizeMessage("ttt")) - assert.Equal(t, "tt", normalizeMessage("ttt")) -} - -func Test_cleanMessage(t *testing.T) { - assert.Equal(t, "", cleanMessage("")) - assert.Equal(t, "abc", cleanMessage("abc")) - assert.Equal(t, "hello world", cleanMessage("hello world")) - assert.Equal(t, "hello world", cleanMessage("hello world")) - assert.Equal(t, "", cleanMessage("")) - assert.Equal(t, "The hello world is here", cleanMessage("The hello world is here")) - assert.Equal(t, "The quick brown fox jumps over the lazy dog", cleanMessage("The quick brown fox jumps over the lazy dog")) - assert.Equal(t, "tt", cleanMessage("tt")) - assert.Equal(t, "tat", cleanMessage("tat")) - assert.Equal(t, "ttt", cleanMessage("ttt")) - assert.Equal(t, "ttt", cleanMessage("ttt")) - assert.Equal(t, "ttt", cleanMessage("ttt")) -} - -func Test_dedup(t *testing.T) { - items := []interface{}{ - "Frodo Baggins", "Tom Sawyer", "Bilbo Baggin", "Samuel L. Jackson", "F. Baggins", "Frody Baggins", "Bilbo Baggins", - } - deduped := dedup(items, func(item interface{}) string { - return item.(string) - }, 0.7) - assert.Len(t, deduped, 5) - - deduped = dedup(items, func(item interface{}) string { - return item.(string) - }, 1) - assert.Len(t, deduped, 7) - - deduped = dedup(items, func(item interface{}) string { - return item.(string) - }, 0) - assert.Len(t, deduped, 1) - - items = []interface{}{ - "Tom", "Dick", "Harry", - } - deduped = dedup(items, func(item interface{}) string { - return item.(string) - }, 0.7) - assert.Len(t, deduped, 3) - - items = []interface{}{ - "Tom", "Dick", "Harry", "Harry", "Dick", "Tom", - } - deduped = dedup(items, func(item interface{}) string { - return item.(string) - }, 0.7) - assert.Len(t, deduped, 3) - - items = []interface{}{ - `Event by kubelet: Failed x since , : - Failed to pull image "nginx:l4t3st": rpc error: code = Unknown desc = Error response from daemon: manifest for nginx:l4t3st not found: manifest unknown: manifest unknown`, - `Event by kubelet: Failed x since , : - Error: ErrImagePull`, - `Event by kubelet: Failed x since , : - Error: ImagePullBackOff`, - } - deduped = dedup(items, func(item interface{}) string { - return item.(string) - }, 0.6) - assert.Len(t, deduped, 2) -} diff --git a/integration_test.go b/integration_test.go index 478b107..122b23e 100644 --- a/integration_test.go +++ b/integration_test.go @@ -229,7 +229,7 @@ Container test-2-broken-image still waiting due to * Event by kubelet: Failed x* since * (last seen * ago): Failed to pull image "nginx:l4t3st": rpc error: code = Unknown desc = Error response from daemon: manifest for nginx:l4t3st not found: manifest unknown: manifest unknown Event by kubelet: Failed x* since * (last seen * ago): - Error: ImagePullBackOff`, + Error: ErrImagePull`, `Pod default/test-3-excessive-resources-* is un-healthy: Unschedulable: 0/1 nodes are available: 1 Insufficient memory. (last transition: * ago) diff --git a/kubeclient/client.go b/kubeclient/client.go index abd33e1..d8f71a0 100644 --- a/kubeclient/client.go +++ b/kubeclient/client.go @@ -168,7 +168,8 @@ func (client *remoteKubernetesClient) GetPodLogs(namespace string, podName strin return "", fmt.Errorf("error in stream copy for %v/%v/%v : %v", namespace, podName, containerName, err) } logs = buf.String() - if strings.HasPrefix(logs, "unable to retrieve container logs for") { + if strings.HasPrefix(logs, "unable to retrieve container logs for") || + strings.HasPrefix(logs, "failed to try resolving symlinks in path") { return "", fmt.Errorf("failed to retrieve logs of %v/%v/%v : %v", namespace, podName, containerName, logs) } return logs, nil diff --git a/main.go b/main.go index 9daf512..f406814 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,7 @@ import ( "os" ) -const VERSION = "0.1.8" +const VERSION = "0.1.9" func main() { diff --git a/store/entityName.go b/store/entityName.go new file mode 100644 index 0000000..dbfc43b --- /dev/null +++ b/store/entityName.go @@ -0,0 +1,16 @@ +package store + +import "fmt" + +type EntityName struct { + Namespace string `json:"namespace"` + Kind string `json:"kind"` + Name string `json:"name"` +} + +func (entityName *EntityName) String() string { + if entityName.Namespace == "" { + return fmt.Sprintf("%v/%v", entityName.Kind, entityName.Name) + } + return fmt.Sprintf("%v/%v/%v", entityName.Kind, entityName.Namespace, entityName.Name) +} diff --git a/store/store.go b/store/store.go index d2a9d35..b16f645 100644 --- a/store/store.go +++ b/store/store.go @@ -6,6 +6,8 @@ import ( "fmt" "github.com/reallyliri/kubescout/alert" "github.com/reallyliri/kubescout/config" + "github.com/reallyliri/kubescout/dedup" + log "github.com/sirupsen/logrus" "io/fs" "io/ioutil" "time" @@ -13,16 +15,16 @@ import ( type Store struct { ClusterStoresByName map[string]*ClusterStore `json:"cluster_stores_by_name"` + LastRunAt time.Time `json:"last_run_at"` dedupDuration time.Duration filePath string - LastRunAt time.Time `json:"last_run_at"` } type ClusterStore struct { - parent *Store - Cluster string `json:"cluster"` - HashWithTimestamp map[string]time.Time `json:"hash_with_timestamp"` - Alerts alert.EntityAlerts `json:"-"` + parent *Store + Cluster string `json:"cluster"` + Alerts alert.EntityAlerts `json:"-"` + MessagesWithTimestampPerEntity map[string]map[string]time.Time `json:"messages_with_timestamp_per_entity"` } func LoadOrCreate(config *config.Config) (*Store, error) { @@ -57,34 +59,72 @@ func (store *Store) GetClusterStore(name string, now time.Time) *ClusterStore { clusterStore, exists := store.ClusterStoresByName[name] if !exists { clusterStore = &ClusterStore{ - Cluster: name, - HashWithTimestamp: make(map[string]time.Time), - Alerts: []*alert.EntityAlert{}, + Cluster: name, + MessagesWithTimestampPerEntity: make(map[string]map[string]time.Time), + Alerts: []*alert.EntityAlert{}, } store.ClusterStoresByName[name] = clusterStore } clusterStore.parent = store - for hash, timestamp := range clusterStore.HashWithTimestamp { - if store.dedupDuration > 0 && now.Sub(timestamp) > store.dedupDuration { - delete(clusterStore.HashWithTimestamp, hash) + for entityName, messagesByTimestamp := range clusterStore.MessagesWithTimestampPerEntity { + for message, timestamp := range messagesByTimestamp { + if store.dedupDuration > 0 && now.Sub(timestamp) > store.dedupDuration { + delete(messagesByTimestamp, message) + } + } + if len(messagesByTimestamp) == 0 { + delete(clusterStore.MessagesWithTimestampPerEntity, entityName) } } return clusterStore } -func (clusterStore *ClusterStore) ShouldAdd(hash string, now time.Time) bool { - timestamp, found := clusterStore.HashWithTimestamp[hash] - if !found || clusterStore.parent.dedupDuration == 0 || now.Sub(timestamp) > clusterStore.parent.dedupDuration { - return true +func tryMatch(messagesByTimestamp map[string]time.Time, candidate string) (match string) { + if _, found := messagesByTimestamp[candidate]; found { + return candidate } - return false + + const similarityThreshold = 0.85 + for stored := range messagesByTimestamp { + if dedup.AreSimilar(stored, candidate, similarityThreshold) { + return stored + } + } + return "" } -func (clusterStore *ClusterStore) Add(entityAlert *alert.EntityAlert, hashes []string, now time.Time) { - for _, hash := range hashes { - clusterStore.HashWithTimestamp[hash] = now +func (clusterStore *ClusterStore) TryAdd(entityName EntityName, message string, now time.Time) bool { + message = dedup.NormalizeTemporal(message) + truncMessage := message + if len(message) > 50 { + truncMessage = message[:50] + "..." + } + + messagesByTimestamp, found := clusterStore.MessagesWithTimestampPerEntity[entityName.String()] + if !found { + log.Tracef("no match was found for message '%v' for entity %v - adding it", truncMessage, entityName) + messagesByTimestamp = map[string]time.Time{ + message: now, + } + clusterStore.MessagesWithTimestampPerEntity[entityName.String()] = messagesByTimestamp + return true } - clusterStore.Alerts = append(clusterStore.Alerts, entityAlert) + + match := tryMatch(messagesByTimestamp, message) + if match != "" { + timestamp := messagesByTimestamp[match] + if clusterStore.parent.dedupDuration > 0 && now.Sub(timestamp) <= clusterStore.parent.dedupDuration { + log.Tracef("match was found for message '%v' for entity %v and its timestamp is in dedup grace time - skipping", truncMessage, entityName) + return false + } + log.Tracef("match was found for message '%v' for entity %v but its timestamp is out of dedup grace time - adding it", truncMessage, entityName) + messagesByTimestamp[message] = now + return true + } + + log.Tracef("no match was found for message '%v' for entity %v - adding it", truncMessage, entityName) + messagesByTimestamp[message] = now + return true } func (store *Store) Flush(now time.Time) error { diff --git a/store/store_test.go b/store/store_test.go index f1e9dff..38c3186 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1,7 +1,6 @@ package store import ( - "github.com/reallyliri/kubescout/alert" "github.com/reallyliri/kubescout/config" "github.com/stretchr/testify/require" "io/ioutil" @@ -46,28 +45,22 @@ func TestStoreAddFlow(t *testing.T) { clusterStore := store.GetClusterStore("test", now) - require.Equal(t, 0, len(clusterStore.Alerts)) - require.True(t, clusterStore.ShouldAdd("hash1", now)) - clusterStore.Add(&alert.EntityAlert{Messages: []string{"message1"}}, []string{"hash1"}, now) - require.Equal(t, 1, len(clusterStore.Alerts)) - require.False(t, clusterStore.ShouldAdd("hash1", now)) - require.Equal(t, 1, len(clusterStore.Alerts)) + name := EntityName{Name: "ent1"} + + require.Equal(t, 0, len(clusterStore.MessagesWithTimestampPerEntity[name.String()])) + require.True(t, clusterStore.TryAdd(name, "m", now)) + require.Equal(t, 1, len(clusterStore.MessagesWithTimestampPerEntity[name.String()])) + require.False(t, clusterStore.TryAdd(name, "m", now)) + require.Equal(t, 1, len(clusterStore.MessagesWithTimestampPerEntity[name.String()])) nearFuture := now.Add(time.Second * time.Duration(50)) - require.False(t, clusterStore.ShouldAdd("hash1", nearFuture)) - require.Equal(t, 1, len(clusterStore.Alerts)) + require.False(t, clusterStore.TryAdd(name, "m", nearFuture)) + require.Equal(t, 1, len(clusterStore.MessagesWithTimestampPerEntity[name.String()])) farFuture := now.Add(time.Minute * time.Duration(2)) - require.True(t, clusterStore.ShouldAdd("hash1", farFuture)) - clusterStore.Add(&alert.EntityAlert{Messages: []string{"message1"}}, []string{"hash1"}, farFuture) - require.Equal(t, 2, len(clusterStore.Alerts)) - - require.True(t, clusterStore.ShouldAdd("hash2", now)) - clusterStore.Add(&alert.EntityAlert{Messages: []string{"message2"}}, []string{"hash2"}, now) - require.Equal(t, 3, len(clusterStore.Alerts)) - require.True(t, clusterStore.ShouldAdd("hash3", now)) - clusterStore.Add(&alert.EntityAlert{Messages: []string{"message2"}}, []string{"hash3"}, now) - require.Equal(t, 4, len(clusterStore.Alerts)) - require.False(t, clusterStore.ShouldAdd("hash3", now)) - require.Equal(t, 4, len(clusterStore.Alerts)) + require.True(t, clusterStore.TryAdd(name, "m", farFuture)) + require.Equal(t, 1, len(clusterStore.MessagesWithTimestampPerEntity[name.String()])) + + require.True(t, clusterStore.TryAdd(name, "message", farFuture)) + require.Equal(t, 2, len(clusterStore.MessagesWithTimestampPerEntity[name.String()])) } func TestLoadAfterFlush(t *testing.T) { @@ -84,15 +77,17 @@ func TestLoadAfterFlush(t *testing.T) { clusterStore := store.GetClusterStore("test", now) - clusterStore.Add(&alert.EntityAlert{Messages: []string{"message"}}, []string{"hash1", "hash2", "hash3"}, now) - require.Equal(t, 1, len(clusterStore.Alerts)) - require.Equal(t, 3, len(clusterStore.HashWithTimestamp)) + name := EntityName{Name: "ent1"} + + require.True(t, clusterStore.TryAdd(name, "a", now)) + require.True(t, clusterStore.TryAdd(name, "b", now)) + require.True(t, clusterStore.TryAdd(name, "c", now)) + require.Equal(t, 3, len(clusterStore.MessagesWithTimestampPerEntity[name.String()])) storeReloaded, err := LoadOrCreate(cfg) require.Nil(t, err) clusterStoreReloaded := storeReloaded.GetClusterStore("test", now) - require.Equal(t, 0, len(clusterStoreReloaded.Alerts)) - require.Equal(t, 0, len(clusterStoreReloaded.HashWithTimestamp)) + require.Equal(t, 0, len(clusterStoreReloaded.MessagesWithTimestampPerEntity[name.String()])) err = store.Flush(now) require.Nil(t, err) @@ -100,8 +95,7 @@ func TestLoadAfterFlush(t *testing.T) { storeReloaded, err = LoadOrCreate(cfg) require.Nil(t, err) clusterStoreReloaded = storeReloaded.GetClusterStore("test", now) - require.Equal(t, 0, len(clusterStoreReloaded.Alerts)) - require.Equal(t, 3, len(clusterStoreReloaded.HashWithTimestamp)) + require.Equal(t, 3, len(clusterStoreReloaded.MessagesWithTimestampPerEntity[name.String()])) } func TestLoadAfterLongTime(t *testing.T) { @@ -118,25 +112,27 @@ func TestLoadAfterLongTime(t *testing.T) { clusterStore := store.GetClusterStore("test", now) - clusterStore.Add(&alert.EntityAlert{Messages: []string{"message"}}, []string{"hash1", "hash2", "hash3"}, now) - require.Equal(t, 1, len(clusterStore.Alerts)) - require.Equal(t, 3, len(clusterStore.HashWithTimestamp)) + name := EntityName{Name: "ent1"} + + require.True(t, clusterStore.TryAdd(name, "a", now)) + require.True(t, clusterStore.TryAdd(name, "b", now)) + require.True(t, clusterStore.TryAdd(name, "c", now)) + require.Equal(t, 3, len(clusterStore.MessagesWithTimestampPerEntity[name.String()])) + err = store.Flush(now) require.Nil(t, err) storeReloaded, err := LoadOrCreate(cfg) require.Nil(t, err) clusterStoreReloaded := storeReloaded.GetClusterStore("test", now.Add(time.Second*time.Duration(50))) - require.Equal(t, 0, len(clusterStoreReloaded.Alerts)) - require.Equal(t, 3, len(clusterStoreReloaded.HashWithTimestamp)) + require.Equal(t, 3, len(clusterStoreReloaded.MessagesWithTimestampPerEntity[name.String()])) err = storeReloaded.Flush(now) require.Nil(t, err) storeReloaded, err = LoadOrCreate(cfg) require.Nil(t, err) clusterStoreReloaded = storeReloaded.GetClusterStore("test", now.Add(time.Minute*time.Duration(3))) - require.Equal(t, 0, len(clusterStoreReloaded.Alerts)) - require.Equal(t, 0, len(clusterStoreReloaded.HashWithTimestamp)) + require.Equal(t, 0, len(clusterStoreReloaded.MessagesWithTimestampPerEntity[name.String()])) err = storeReloaded.Flush(now) require.Nil(t, err) } @@ -154,34 +150,24 @@ func TestStoreForMultipleClusters(t *testing.T) { require.Nil(t, err) cluster1Store1 := store1.GetClusterStore("test-1", now) - cluster1Store1.Add(&alert.EntityAlert{Messages: []string{"message"}}, []string{"hash1", "hash2", "hash3"}, now) - require.Equal(t, 1, len(cluster1Store1.Alerts)) - require.Equal(t, 3, len(cluster1Store1.HashWithTimestamp)) + name := EntityName{Name: "ent1"} + + require.True(t, cluster1Store1.TryAdd(name, "a", now)) + require.True(t, cluster1Store1.TryAdd(name, "b", now)) + require.True(t, cluster1Store1.TryAdd(name, "c", now)) + require.Equal(t, 3, len(cluster1Store1.MessagesWithTimestampPerEntity[name.String()])) + err = store1.Flush(now) require.Nil(t, err) store2, err := LoadOrCreate(cfg) require.Nil(t, err) + cluster1Store2 := store2.GetClusterStore("test-1", now) + require.Equal(t, 3, len(cluster1Store2.MessagesWithTimestampPerEntity[name.String()])) cluster2Store2 := store2.GetClusterStore("test-2", now) - require.Equal(t, 0, len(cluster2Store2.Alerts)) - require.Equal(t, 0, len(cluster2Store2.HashWithTimestamp)) + require.Equal(t, 0, len(cluster2Store2.MessagesWithTimestampPerEntity[name.String()])) err = store2.Flush(now) require.Nil(t, err) - - require.Equal(t, 2, len(store2.ClusterStoresByName)) - - store3, err := LoadOrCreate(cfg) - require.Nil(t, err) - cluster3Store3 := store3.GetClusterStore("test-3", now) - require.Nil(t, err) - require.Equal(t, 0, len(cluster3Store3.Alerts)) - require.Equal(t, 0, len(cluster3Store3.HashWithTimestamp)) - err = store3.Flush(now) - require.Nil(t, err) - - require.Equal(t, 1, len(store1.ClusterStoresByName)) - require.Equal(t, 2, len(store2.ClusterStoresByName)) - require.Equal(t, 3, len(store3.ClusterStoresByName)) } func TestJsonContent(t *testing.T) { @@ -198,10 +184,26 @@ func TestJsonContent(t *testing.T) { } store, err := LoadOrCreate(cfg) require.Nil(t, err) + clusterStore := store.GetClusterStore("test-json", now) - clusterStore.Add(&alert.EntityAlert{Messages: []string{"message"}}, []string{"hash1", "hash2", "hash3"}, now) - require.Equal(t, 1, len(clusterStore.Alerts)) + name1 := EntityName{ + Name: "po1", + Kind: "Pod", + Namespace: "ns", + } + require.True(t, clusterStore.TryAdd(name1, "a", now)) + require.True(t, clusterStore.TryAdd(name1, "b", now)) + require.True(t, clusterStore.TryAdd(name1, "c", now)) + require.Equal(t, 3, len(clusterStore.MessagesWithTimestampPerEntity[name1.String()])) + + name2 := EntityName{ + Name: "ns", + Kind: "Namespace", + } + require.True(t, clusterStore.TryAdd(name2, "a", now)) + require.Equal(t, 1, len(clusterStore.MessagesWithTimestampPerEntity[name2.String()])) + err = store.Flush(now.Add(time.Minute)) require.Nil(t, err) @@ -212,10 +214,15 @@ func TestJsonContent(t *testing.T) { "cluster_stores_by_name": { "test-json": { "cluster": "test-json", - "hash_with_timestamp": { - "hash1": "2021-10-17T13:00:00Z", - "hash2": "2021-10-17T13:00:00Z", - "hash3": "2021-10-17T13:00:00Z" + "messages_with_timestamp_per_entity": { + "Namespace/ns": { + "a": "2021-10-17T13:00:00Z" + }, + "Pod/ns/po1": { + "a": "2021-10-17T13:00:00Z", + "b": "2021-10-17T13:00:00Z", + "c": "2021-10-17T13:00:00Z" + } } } },