From 3891b3c633dc3826c47c12112f852619a1c5dd8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Tue, 9 Jan 2024 18:25:56 +0100 Subject: [PATCH] fix(k8sprocessor): Pod Service cache invalidation --- .changelog/1425.fixed.txt | 1 + pkg/processor/k8sprocessor/go.mod | 1 + pkg/processor/k8sprocessor/go.sum | 5 +- pkg/processor/k8sprocessor/kube/owner.go | 95 +++++++++++++------ pkg/processor/k8sprocessor/kube/owner_test.go | 19 +++- 5 files changed, 86 insertions(+), 35 deletions(-) create mode 100644 .changelog/1425.fixed.txt diff --git a/.changelog/1425.fixed.txt b/.changelog/1425.fixed.txt new file mode 100644 index 0000000000..2a776a3bce --- /dev/null +++ b/.changelog/1425.fixed.txt @@ -0,0 +1 @@ +fix(k8sprocessor): Pod Service cache invalidation \ No newline at end of file diff --git a/pkg/processor/k8sprocessor/go.mod b/pkg/processor/k8sprocessor/go.mod index 8e43ee3fcc..035b9fccc8 100644 --- a/pkg/processor/k8sprocessor/go.mod +++ b/pkg/processor/k8sprocessor/go.mod @@ -9,6 +9,7 @@ require ( go.opentelemetry.io/collector v0.91.0 go.opentelemetry.io/collector/semconv v0.91.0 go.uber.org/zap v1.26.0 + golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc k8s.io/api v0.28.4 k8s.io/apimachinery v0.28.4 k8s.io/client-go v0.28.4 diff --git a/pkg/processor/k8sprocessor/go.sum b/pkg/processor/k8sprocessor/go.sum index 61a94b6f78..aa3afa1c2c 100644 --- a/pkg/processor/k8sprocessor/go.sum +++ b/pkg/processor/k8sprocessor/go.sum @@ -487,7 +487,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= +golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc h1:ao2WRsKSzW6KuUY9IWPwWahcHCgR0s52IfwutMfEbdM= +golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -686,7 +687,7 @@ golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= -golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y= +golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/processor/k8sprocessor/kube/owner.go b/pkg/processor/k8sprocessor/kube/owner.go index 3671d05c32..4ad4a2bcb2 100644 --- a/pkg/processor/k8sprocessor/kube/owner.go +++ b/pkg/processor/k8sprocessor/kube/owner.go @@ -20,6 +20,7 @@ import ( "time" "go.uber.org/zap" + "golang.org/x/exp/slices" api_v1 "k8s.io/api/core/v1" discovery_v1 "k8s.io/api/discovery/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -139,6 +140,7 @@ func newOwnerProvider( ownerCache.cacheObject, ownerCache.deleteObject, nil, + nil, ) } @@ -150,6 +152,7 @@ func newOwnerProvider( ownerCache.cacheObject, ownerCache.deleteObject, nil, + nil, ) } @@ -161,6 +164,7 @@ func newOwnerProvider( ownerCache.cacheObject, ownerCache.deleteObject, nil, + nil, ) } @@ -172,6 +176,7 @@ func newOwnerProvider( ownerCache.cacheObject, ownerCache.deleteObject, nil, + nil, ) } @@ -182,6 +187,7 @@ func newOwnerProvider( factory.Discovery().V1().EndpointSlices().Informer(), ownerCache.cacheEndpointSlice, ownerCache.deleteEndpointSlice, + ownerCache.updateEndpointSlice, func(object interface{}) (interface{}, error) { originalES, success := object.(*discovery_v1.EndpointSlice) if !success { @@ -201,6 +207,7 @@ func newOwnerProvider( ownerCache.cacheObject, ownerCache.deleteObject, nil, + nil, ) } @@ -223,6 +230,7 @@ func newOwnerProvider( ownerCache.cacheObject, ownerCache.deleteObject, nil, + nil, ) } @@ -344,17 +352,24 @@ func (op *OwnerCache) deferredDelete(evict func(obj any)) func(any) { func (op *OwnerCache) addOwnerInformer( kind string, informer cache.SharedIndexInformer, - cacheFunc func(kind string, obj interface{}), + addFunc func(kind string, obj interface{}), deleteFunc func(obj interface{}), + updateFunc func(oldObj, newObj interface{}), transformFunc cache.TransformFunc, ) { + // if updatefunc is not specified, use addFunc + if updateFunc == nil { + updateFunc = func(_, obj interface{}) { + addFunc(kind, obj) + } + } _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - cacheFunc(kind, obj) + addFunc(kind, obj) observability.RecordOtherAdded(kind) }, - UpdateFunc: func(_, obj interface{}) { - cacheFunc(kind, obj) + UpdateFunc: func(oldObj, newObj interface{}) { + updateFunc(oldObj, newObj) observability.RecordOtherUpdated(kind) }, DeleteFunc: op.deferredDelete(func(obj any) { @@ -440,10 +455,8 @@ func (op *OwnerCache) addServiceToPod(pod string, serviceName string) { return } - for _, it := range services { - if it == serviceName { - return - } + if idx := slices.Index(services, serviceName); idx >= 0 { + return } services = append(services, serviceName) @@ -460,24 +473,9 @@ func (op *OwnerCache) deleteServiceFromPod(pod string, serviceName string) { return } - for i := 0; len(services) > 0; { - service := services[i] - if service == serviceName { - // Remove the ith entry by... - l := len(services) - last := services[l-1] - // ...moving it at the very end (swapping it with the last entry)... - services[l-1], services[i] = service, last - // ... and by truncating the slice by one elem - services = services[:l-1] - } else { - i++ - } - - if i == len(services)-1 { - break - } - } + services = slices.DeleteFunc(services, func(s string) bool { + return s == serviceName + }) if len(services) == 0 { delete(op.podServices, pod) @@ -511,12 +509,43 @@ func (op *OwnerCache) genericEndpointSliceOp(obj interface{}, serviceFunc func(p return } - epLabels := endpointSlice.GetLabels() - serviceName := epLabels[endpointSliceServiceLabel] // see: https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#ownership + serviceName := getServiceName(endpointSlice) for _, endpoint := range endpointSlice.Endpoints { if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" { - serviceFunc(endpoint.TargetRef.Name, serviceName) + podName := endpoint.TargetRef.Name + serviceFunc(podName, serviceName) + } + } +} + +func (op *OwnerCache) updateEndpointSlice(oldObj interface{}, newObj interface{}) { + // for updates, we're guaranteed the objects will be the right type + oldEndpointSlice := oldObj.(*discovery_v1.EndpointSlice) + newEndpointSlice := newObj.(*discovery_v1.EndpointSlice) + + // add the new endpointslice first, the logic is the same + op.cacheEndpointSlice("EndpointSlice", newObj) + + // we also need to remove the Service from Pods which were deleted from the endpointslice + serviceName := getServiceName(newEndpointSlice) + + newPodNames := []string{} + for _, endpoint := range newEndpointSlice.Endpoints { + if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" { + podName := endpoint.TargetRef.Name + newPodNames = append(newPodNames, podName) + } + } + + // for each Pod name which was in the old slice, but not in the new slice, schedule a delete + for _, endpoint := range oldEndpointSlice.Endpoints { + if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" { + podName := endpoint.TargetRef.Name + if slices.Index(newPodNames, podName) == -1 { + // not a deferred delete, as this is a dynamic property which can change often + op.deleteServiceFromPod(podName, serviceName) + } } } } @@ -658,3 +687,11 @@ func removeUnnecessaryEndpointSliceData(endpointSlice *discovery_v1.EndpointSlic return &transformedEndpointSlice } + +// Get the Service name from an EndpointSlice based on a standard label. +// see: https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#ownership +func getServiceName(endpointSlice *discovery_v1.EndpointSlice) string { + epLabels := endpointSlice.GetLabels() + serviceName := epLabels[endpointSliceServiceLabel] + return serviceName +} diff --git a/pkg/processor/k8sprocessor/kube/owner_test.go b/pkg/processor/k8sprocessor/kube/owner_test.go index ffd7a032b7..9a64392b9c 100644 --- a/pkg/processor/k8sprocessor/kube/owner_test.go +++ b/pkg/processor/k8sprocessor/kube/owner_test.go @@ -37,7 +37,7 @@ func waitForWatchToBeEstablished(client *fake.Clientset, resource string) <-chan gvr := action.GetResource() ns := action.GetNamespace() - watch, err := client.Tracker().Watch(gvr, ns) + watcher, err := client.Tracker().Watch(gvr, ns) if err != nil { return false, nil, err } @@ -45,7 +45,7 @@ func waitForWatchToBeEstablished(client *fake.Clientset, resource string) <-chan if action.GetVerb() == "watch" { close(ch) } - return true, watch, nil + return true, watcher, nil }) return ch } @@ -569,7 +569,6 @@ func Test_OwnerProvider_GetServices(t *testing.T) { }) t.Run("updating endpoints", func(t *testing.T) { - t.Skip("Known bug, see https://github.com/SumoLogic/sumologic-otel-collector/issues/1414") _, err = c.DiscoveryV1().EndpointSlices(namespace). Update(context.Background(), endpointSlice2Updated, metav1.UpdateOptions{}) require.NoError(t, err) @@ -579,8 +578,20 @@ func Test_OwnerProvider_GetServices(t *testing.T) { t.Logf("services: %v", services) return false } + return assert.Equal(t, []string{"my-service"}, services) + }, 5*time.Second, 10*time.Millisecond) - return len(services) == 1 + // update back to the original value + _, err = c.DiscoveryV1().EndpointSlices(namespace). + Update(context.Background(), endpointSlice2, metav1.UpdateOptions{}) + require.NoError(t, err) + assert.Eventually(t, func() bool { + services := op.GetServices(pod.Name) + if len(services) != 2 { + t.Logf("services: %v", services) + return false + } + return assert.Equal(t, []string{"my-service", "my-service-2"}, services) }, 5*time.Second, 10*time.Millisecond) })