Skip to content

Commit e115159

Browse files
authored
feat: add flagd-proxy HA configuration (#712)
Signed-off-by: Matthias Riegler <matthias.riegler@ankorstore.com>
1 parent 99b1cd4 commit e115159

File tree

12 files changed

+521
-301
lines changed

12 files changed

+521
-301
lines changed

common/flagdproxy/flagdproxy.go

+139-66
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,19 @@ import (
1111
"golang.org/x/exp/maps"
1212
appsV1 "k8s.io/api/apps/v1"
1313
corev1 "k8s.io/api/core/v1"
14+
policyv1 "k8s.io/api/policy/v1"
1415
"k8s.io/apimachinery/pkg/api/errors"
1516
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1617
"k8s.io/apimachinery/pkg/util/intstr"
18+
"k8s.io/client-go/util/retry"
1719
"sigs.k8s.io/controller-runtime/pkg/client"
1820
)
1921

2022
const (
21-
FlagdProxyDeploymentName = "flagd-proxy"
22-
FlagdProxyServiceAccountName = "open-feature-operator-flagd-proxy"
23-
FlagdProxyServiceName = "flagd-proxy-svc"
23+
FlagdProxyDeploymentName = "flagd-proxy"
24+
FlagdProxyServiceAccountName = "open-feature-operator-flagd-proxy"
25+
FlagdProxyServiceName = "flagd-proxy-svc"
26+
FlagdProxyPodDisruptionBudgetName = "flagd-proxy-pdb"
2427
)
2528

2629
type FlagdProxyHandler struct {
@@ -37,6 +40,7 @@ type FlagdProxyConfiguration struct {
3740
DebugLogging bool
3841
Image string
3942
Tag string
43+
Replicas int
4044
Namespace string
4145
OperatorDeploymentName string
4246
ImagePullSecrets []string
@@ -53,6 +57,7 @@ func NewFlagdProxyConfiguration(env types.EnvConfig, imagePullSecrets []string,
5357
Port: env.FlagdProxyPort,
5458
ManagementPort: env.FlagdProxyManagementPort,
5559
DebugLogging: env.FlagdProxyDebugLogging,
60+
Replicas: env.FlagdProxyReplicaCount,
5661
ImagePullSecrets: imagePullSecrets,
5762
Labels: labels,
5863
Annotations: annotations,
@@ -71,58 +76,99 @@ func (f *FlagdProxyHandler) Config() *FlagdProxyConfiguration {
7176
return f.config
7277
}
7378

74-
func (f *FlagdProxyHandler) createObject(ctx context.Context, obj client.Object) error {
75-
return f.Client.Create(ctx, obj)
79+
func specDiffers(a, b client.Object) (bool, error) {
80+
if a == nil || b == nil {
81+
return false, fmt.Errorf("object is nil")
82+
}
83+
84+
// Compare only spec based on the object type
85+
switch a.(type) {
86+
case *corev1.Service:
87+
return !reflect.DeepEqual(a.(*corev1.Service).Spec, b.(*corev1.Service).Spec), nil
88+
case *appsV1.Deployment:
89+
return !reflect.DeepEqual(a.(*appsV1.Deployment).Spec, b.(*appsV1.Deployment).Spec), nil
90+
case *policyv1.PodDisruptionBudget:
91+
return !reflect.DeepEqual(a.(*policyv1.PodDisruptionBudget).Spec, b.(*policyv1.PodDisruptionBudget).Spec), nil
92+
default:
93+
return false, fmt.Errorf("unsupported object type")
94+
}
7695
}
7796

78-
func (f *FlagdProxyHandler) updateObject(ctx context.Context, obj client.Object) error {
79-
return f.Client.Update(ctx, obj)
97+
// ensureFlagdProxyResource ensures that the given object is reconciled in the cluster. If the object does not exist, it will be created.
98+
func (f *FlagdProxyHandler) ensureFlagdProxyResource(ctx context.Context, obj client.Object) error {
99+
if obj == nil {
100+
return fmt.Errorf("object is nil")
101+
}
102+
103+
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
104+
var old = obj.DeepCopyObject().(client.Object)
105+
f.Log.Info("Ensuring object exists", "name", obj.GetName(), "namespace", obj.GetNamespace())
106+
107+
// Try to get the existing object
108+
err := f.Client.Get(ctx, client.ObjectKey{Name: old.GetName(), Namespace: old.GetNamespace()}, old)
109+
notFound := errors.IsNotFound(err)
110+
if err != nil && !notFound {
111+
return err
112+
}
113+
114+
// If the object is not found, we will create it
115+
if notFound {
116+
return f.Client.Create(ctx, obj)
117+
}
118+
// If the object exists but is not managed by OFO, return an error
119+
if !common.IsManagedByOFO(old) {
120+
return fmt.Errorf("%s not managed by OFO", obj.GetName())
121+
}
122+
123+
// If the object is found, update if necessary
124+
needsUpdate, err := specDiffers(obj, old)
125+
if err != nil {
126+
return err
127+
}
128+
129+
if needsUpdate {
130+
obj.SetResourceVersion(old.GetResourceVersion())
131+
return f.Client.Update(ctx, obj)
132+
}
133+
134+
return nil
135+
})
80136
}
81137

138+
// HandleFlagdProxy ensures flagd-proxy kubernetes components are configured properly
82139
func (f *FlagdProxyHandler) HandleFlagdProxy(ctx context.Context) error {
83-
exists, deployment, err := f.doesFlagdProxyExist(ctx)
84-
if err != nil {
85-
return err
86-
}
140+
var err error
87141

88-
ownerReference, err := f.getOwnerReference(ctx)
142+
ownerRef, err := f.getOwnerReference(ctx)
89143
if err != nil {
90144
return err
91145
}
92-
newDeployment := f.newFlagdProxyManifest(ownerReference)
93-
newService := f.newFlagdProxyServiceManifest(ownerReference)
94-
95-
if !exists {
96-
f.Log.Info("flagd-proxy Deployment does not exist, creating")
97-
return f.deployFlagdProxy(ctx, f.createObject, newDeployment, newService)
98-
}
99-
// flagd-proxy exists, need to check if we should update it
100-
if f.shouldUpdateFlagdProxy(deployment, newDeployment) {
101-
f.Log.Info("flagd-proxy Deployment out of sync, updating")
102-
return f.deployFlagdProxy(ctx, f.updateObject, newDeployment, newService)
103-
}
104-
f.Log.Info("flagd-proxy Deployment up-to-date")
105-
return nil
106-
}
107146

108-
func (f *FlagdProxyHandler) deployFlagdProxy(ctx context.Context, createUpdateFunc CreateUpdateFunc, deployment *appsV1.Deployment, service *corev1.Service) error {
109-
f.Log.Info("deploying the flagd-proxy")
110-
if err := createUpdateFunc(ctx, deployment); err != nil && !errors.IsAlreadyExists(err) {
147+
if err = f.ensureFlagdProxyResource(ctx, f.newFlagdProxyDeployment(ownerRef)); err != nil {
111148
return err
112149
}
113-
f.Log.Info("deploying the flagd-proxy service")
114-
if err := createUpdateFunc(ctx, service); err != nil && !errors.IsAlreadyExists(err) {
150+
151+
if err = f.ensureFlagdProxyResource(ctx, f.newFlagdProxyService(ownerRef)); err != nil {
115152
return err
116153
}
117-
return nil
154+
155+
err = f.ensureFlagdProxyResource(ctx, f.newFlagdProxyPodDisruptionBudget(ownerRef))
156+
return err
118157
}
119158

120-
func (f *FlagdProxyHandler) newFlagdProxyServiceManifest(ownerReference *metav1.OwnerReference) *corev1.Service {
159+
func (f *FlagdProxyHandler) newFlagdProxyService(ownerReference *metav1.OwnerReference) *corev1.Service {
121160
return &corev1.Service{
161+
TypeMeta: metav1.TypeMeta{
162+
Kind: "Service",
163+
APIVersion: "v1",
164+
},
122165
ObjectMeta: metav1.ObjectMeta{
123166
Name: FlagdProxyServiceName,
124167
Namespace: f.config.Namespace,
125168
OwnerReferences: []metav1.OwnerReference{*ownerReference},
169+
Labels: map[string]string{
170+
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
171+
},
126172
},
127173
Spec: corev1.ServiceSpec{
128174
Selector: map[string]string{
@@ -140,8 +186,41 @@ func (f *FlagdProxyHandler) newFlagdProxyServiceManifest(ownerReference *metav1.
140186
}
141187
}
142188

143-
func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerReference) *appsV1.Deployment {
144-
replicas := int32(1)
189+
func (f *FlagdProxyHandler) newFlagdProxyPodDisruptionBudget(ownerReference *metav1.OwnerReference) *policyv1.PodDisruptionBudget {
190+
191+
// Only require pods to be available if there is >1 replica configured (HA setup)
192+
minReplicas := intstr.FromInt(0)
193+
if f.config.Replicas > 1 {
194+
minReplicas = intstr.FromInt(f.config.Replicas / 2)
195+
}
196+
197+
return &policyv1.PodDisruptionBudget{
198+
TypeMeta: metav1.TypeMeta{
199+
Kind: "PodDisruptionBudget",
200+
APIVersion: "policy/v1",
201+
},
202+
ObjectMeta: metav1.ObjectMeta{
203+
Name: FlagdProxyPodDisruptionBudgetName,
204+
Namespace: f.config.Namespace,
205+
OwnerReferences: []metav1.OwnerReference{*ownerReference},
206+
Labels: map[string]string{
207+
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
208+
},
209+
},
210+
Spec: policyv1.PodDisruptionBudgetSpec{
211+
MinAvailable: &minReplicas,
212+
Selector: &metav1.LabelSelector{
213+
MatchLabels: map[string]string{
214+
"app.kubernetes.io/name": FlagdProxyDeploymentName,
215+
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
216+
},
217+
},
218+
},
219+
}
220+
}
221+
222+
func (f *FlagdProxyHandler) newFlagdProxyDeployment(ownerReference *metav1.OwnerReference) *appsV1.Deployment {
223+
replicas := int32(f.config.Replicas)
145224
args := []string{
146225
"start",
147226
"--management-port",
@@ -157,10 +236,10 @@ func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerRe
157236
})
158237
}
159238
flagdLabels := map[string]string{
160-
"app": FlagdProxyDeploymentName,
161-
"app.kubernetes.io/name": FlagdProxyDeploymentName,
162-
"app.kubernetes.io/managed-by": common.ManagedByAnnotationValue,
163-
"app.kubernetes.io/version": f.config.Tag,
239+
"app": FlagdProxyDeploymentName,
240+
"app.kubernetes.io/name": FlagdProxyDeploymentName,
241+
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
242+
"app.kubernetes.io/version": f.config.Tag,
164243
}
165244
if len(f.config.Labels) > 0 {
166245
maps.Copy(flagdLabels, f.config.Labels)
@@ -173,13 +252,17 @@ func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerRe
173252
}
174253

175254
return &appsV1.Deployment{
255+
TypeMeta: metav1.TypeMeta{
256+
Kind: "Deployment",
257+
APIVersion: "apps/v1",
258+
},
176259
ObjectMeta: metav1.ObjectMeta{
177260
Name: FlagdProxyDeploymentName,
178261
Namespace: f.config.Namespace,
179262
Labels: map[string]string{
180-
"app": FlagdProxyDeploymentName,
181-
"app.kubernetes.io/managed-by": common.ManagedByAnnotationValue,
182-
"app.kubernetes.io/version": f.config.Tag,
263+
"app": FlagdProxyDeploymentName,
264+
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
265+
"app.kubernetes.io/version": f.config.Tag,
183266
},
184267
OwnerReferences: []metav1.OwnerReference{*ownerReference},
185268
},
@@ -215,41 +298,31 @@ func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerRe
215298
Args: args,
216299
},
217300
},
301+
TopologySpreadConstraints: []corev1.TopologySpreadConstraint{
302+
{
303+
MaxSkew: 1,
304+
TopologyKey: "kubernetes.io/hostname",
305+
WhenUnsatisfiable: corev1.DoNotSchedule,
306+
LabelSelector: &metav1.LabelSelector{
307+
MatchLabels: map[string]string{
308+
"app.kubernetes.io/name": FlagdProxyDeploymentName,
309+
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
310+
},
311+
},
312+
},
313+
},
218314
},
219315
},
220316
},
221317
}
222318
}
223319

224-
func (f *FlagdProxyHandler) doesFlagdProxyExist(ctx context.Context) (bool, *appsV1.Deployment, error) {
225-
d := &appsV1.Deployment{}
226-
err := f.Client.Get(ctx, client.ObjectKey{Name: FlagdProxyDeploymentName, Namespace: f.config.Namespace}, d)
227-
if err != nil {
228-
if errors.IsNotFound(err) {
229-
// does not exist, is not ready, no error
230-
return false, nil, nil
231-
}
232-
// does not exist, is not ready, is in error
233-
return false, nil, err
234-
}
235-
return true, d, nil
236-
}
237-
238-
func (f *FlagdProxyHandler) shouldUpdateFlagdProxy(old, new *appsV1.Deployment) bool {
239-
if !common.IsManagedByOFO(old) {
240-
f.Log.Info("flagd-proxy Deployment not managed by OFO")
241-
return false
242-
}
243-
return !reflect.DeepEqual(old.Spec, new.Spec)
244-
}
245-
246320
func (f *FlagdProxyHandler) getOperatorDeployment(ctx context.Context) (*appsV1.Deployment, error) {
247321
d := &appsV1.Deployment{}
248322
if err := f.Client.Get(ctx, client.ObjectKey{Name: f.config.OperatorDeploymentName, Namespace: f.config.Namespace}, d); err != nil {
249323
return nil, fmt.Errorf("unable to fetch operator deployment: %w", err)
250324
}
251325
return d, nil
252-
253326
}
254327

255328
func (f *FlagdProxyHandler) getOwnerReference(ctx context.Context) (*metav1.OwnerReference, error) {

0 commit comments

Comments
 (0)