1
16
17 package ensurer
18
19 import (
20 "context"
21 "fmt"
22 "strconv"
23
24 "github.com/google/go-cmp/cmp"
25 flowcontrolv1 "k8s.io/api/flowcontrol/v1"
26 apierrors "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/labels"
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/apimachinery/pkg/util/sets"
31 "k8s.io/klog/v2"
32 )
33
34 const (
35 fieldManager = "api-priority-and-fairness-config-producer-v1"
36 )
37
38
39
40
41
42
43
44
45
46
47
48 type EnsureStrategy[ObjectType configurationObjectType] interface {
49
50
51 Name() string
52
53
54
55
56
57
58
59
60
61
62 ReviseIfNeeded(objectOps objectLocalOps[ObjectType], current, bootstrap ObjectType) (revised ObjectType, ok bool, err error)
63 }
64
65
66 type objectLocalOps[ObjectType configurationObject] interface {
67 DeepCopy(ObjectType) ObjectType
68
69
70 ReplaceSpec(into, from ObjectType) ObjectType
71
72
73
74
75 SpecEqualish(expected, actual ObjectType) bool
76 }
77
78
79 type ObjectOps[ObjectType configurationObject] interface {
80 client[ObjectType]
81 cache[ObjectType]
82 objectLocalOps[ObjectType]
83 }
84
85
86 type client[ObjectType configurationObject] interface {
87 Create(ctx context.Context, obj ObjectType, opts metav1.CreateOptions) (ObjectType, error)
88 Update(ctx context.Context, obj ObjectType, opts metav1.UpdateOptions) (ObjectType, error)
89 Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
90 }
91
92
93 type cache[ObjectType configurationObject] interface {
94 List(labels.Selector) ([]ObjectType, error)
95 Get(name string) (ObjectType, error)
96 }
97
98
99 type configurationObject interface {
100 metav1.Object
101 runtime.Object
102 }
103
104
105
106 type configurationObjectType interface {
107 comparable
108 configurationObject
109 }
110
111 type objectOps[ObjectType configurationObjectType] struct {
112 client[ObjectType]
113 cache[ObjectType]
114 deepCopy func(ObjectType) ObjectType
115 replaceSpec func(ObjectType, ObjectType) ObjectType
116 specEqualish func(expected, actual ObjectType) bool
117 }
118
119 func NewObjectOps[ObjectType configurationObjectType](client client[ObjectType], cache cache[ObjectType],
120 deepCopy func(ObjectType) ObjectType,
121 replaceSpec func(ObjectType, ObjectType) ObjectType,
122 specEqualish func(expected, actual ObjectType) bool,
123 ) ObjectOps[ObjectType] {
124 return objectOps[ObjectType]{client: client,
125 cache: cache,
126 deepCopy: deepCopy,
127 replaceSpec: replaceSpec,
128 specEqualish: specEqualish}
129 }
130
131 func (oo objectOps[ObjectType]) DeepCopy(obj ObjectType) ObjectType { return oo.deepCopy(obj) }
132
133 func (oo objectOps[ObjectType]) ReplaceSpec(into, from ObjectType) ObjectType {
134 return oo.replaceSpec(into, from)
135 }
136
137 func (oo objectOps[ObjectType]) SpecEqualish(expected, actual ObjectType) bool {
138 return oo.specEqualish(expected, actual)
139 }
140
141
142 func NewSuggestedEnsureStrategy[ObjectType configurationObjectType]() EnsureStrategy[ObjectType] {
143 return &strategy[ObjectType]{
144 alwaysAutoUpdateSpec: false,
145 name: "suggested",
146 }
147 }
148
149
150 func NewMandatoryEnsureStrategy[ObjectType configurationObjectType]() EnsureStrategy[ObjectType] {
151 return &strategy[ObjectType]{
152 alwaysAutoUpdateSpec: true,
153 name: "mandatory",
154 }
155 }
156
157
158 type strategy[ObjectType configurationObjectType] struct {
159 alwaysAutoUpdateSpec bool
160 name string
161 }
162
163 func (s *strategy[ObjectType]) Name() string {
164 return s.name
165 }
166
167 func (s *strategy[ObjectType]) ReviseIfNeeded(objectOps objectLocalOps[ObjectType], current, bootstrap ObjectType) (ObjectType, bool, error) {
168 var zero ObjectType
169 if current == zero {
170 return zero, false, nil
171 }
172
173 autoUpdateSpec := s.alwaysAutoUpdateSpec
174 if !autoUpdateSpec {
175 autoUpdateSpec = shouldUpdateSpec(current)
176 }
177 updateAnnotation := shouldUpdateAnnotation(current, autoUpdateSpec)
178
179 specChanged := autoUpdateSpec && !objectOps.SpecEqualish(bootstrap, current)
180
181 if !(updateAnnotation || specChanged) {
182
183 return zero, false, nil
184 }
185
186 var revised ObjectType
187 if specChanged {
188 revised = objectOps.ReplaceSpec(current, bootstrap)
189 } else {
190 revised = objectOps.DeepCopy(current)
191 }
192 if updateAnnotation {
193 setAutoUpdateAnnotation(revised, autoUpdateSpec)
194 }
195
196 return revised, true, nil
197 }
198
199
200
201 func shouldUpdateSpec(accessor metav1.Object) bool {
202 value := accessor.GetAnnotations()[flowcontrolv1.AutoUpdateAnnotationKey]
203 if autoUpdate, err := strconv.ParseBool(value); err == nil {
204 return autoUpdate
205 }
206
207
208
209
210
211
212
213 if accessor.GetGeneration() == 1 {
214 return true
215 }
216 return false
217 }
218
219
220
221 func shouldUpdateAnnotation(accessor metav1.Object, desired bool) bool {
222 if value, ok := accessor.GetAnnotations()[flowcontrolv1.AutoUpdateAnnotationKey]; ok {
223 if current, err := strconv.ParseBool(value); err == nil && current == desired {
224 return false
225 }
226 }
227
228 return true
229 }
230
231
232 func setAutoUpdateAnnotation(accessor metav1.Object, autoUpdate bool) {
233 if accessor.GetAnnotations() == nil {
234 accessor.SetAnnotations(map[string]string{})
235 }
236
237 accessor.GetAnnotations()[flowcontrolv1.AutoUpdateAnnotationKey] = strconv.FormatBool(autoUpdate)
238 }
239
240
241
242 func EnsureConfigurations[ObjectType configurationObjectType](ctx context.Context, ops ObjectOps[ObjectType], boots []ObjectType, strategy EnsureStrategy[ObjectType]) error {
243 for _, bo := range boots {
244 err := EnsureConfiguration(ctx, ops, bo, strategy)
245 if err != nil {
246 return err
247 }
248 }
249 return nil
250 }
251
252
253 func EnsureConfiguration[ObjectType configurationObjectType](ctx context.Context, ops ObjectOps[ObjectType], bootstrap ObjectType, strategy EnsureStrategy[ObjectType]) error {
254 name := bootstrap.GetName()
255 configurationType := strategy.Name()
256
257 var current ObjectType
258 var err error
259 for {
260 current, err = ops.Get(name)
261 if err == nil {
262 break
263 }
264 if !apierrors.IsNotFound(err) {
265 return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
266 }
267
268
269 if _, err = ops.Create(ctx, ops.DeepCopy(bootstrap), metav1.CreateOptions{FieldManager: fieldManager}); err == nil {
270 klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
271 return nil
272 }
273
274 if !apierrors.IsAlreadyExists(err) {
275 return fmt.Errorf("cannot create %s type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
276 }
277 klog.V(5).InfoS(fmt.Sprintf("Something created the %s concurrently", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
278 }
279
280 klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
281 newObject, update, err := strategy.ReviseIfNeeded(ops, current, bootstrap)
282 if err != nil {
283 return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
284 }
285 if !update {
286 if klogV := klog.V(5); klogV.Enabled() {
287 klogV.InfoS("No update required", "wrapper", bootstrap.GetObjectKind().GroupVersionKind().Kind, "type", configurationType, "name", name,
288 "diff", cmp.Diff(current, bootstrap))
289 }
290 return nil
291 }
292
293 if _, err = ops.Update(ctx, newObject, metav1.UpdateOptions{FieldManager: fieldManager}); err == nil {
294 klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, cmp.Diff(current, bootstrap))
295 return nil
296 }
297
298 if apierrors.IsConflict(err) {
299 klog.V(2).InfoS(fmt.Sprintf("Something updated the %s concurrently, I will check its spec later", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
300 return nil
301 }
302
303 return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
304 }
305
306
307
308
309
310 func RemoveUnwantedObjects[ObjectType configurationObjectType](ctx context.Context, objectOps ObjectOps[ObjectType], boots []ObjectType) error {
311 current, err := objectOps.List(labels.Everything())
312 if err != nil {
313 return err
314 }
315 wantedNames := namesOfBootstrapObjects(boots)
316 for _, object := range current {
317 name := object.GetName()
318 if wantedNames.Has(name) {
319 continue
320 }
321 var value string
322 var ok, autoUpdate bool
323 var err error
324 if value, ok = object.GetAnnotations()[flowcontrolv1.AutoUpdateAnnotationKey]; !ok {
325
326
327
328 klog.V(5).InfoS("Skipping deletion of APF object with no "+flowcontrolv1.AutoUpdateAnnotationKey+" annotation", "name", name)
329 continue
330 }
331 autoUpdate, err = strconv.ParseBool(value)
332 if err != nil {
333
334 klog.V(4).InfoS("Skipping deletion of APF object with malformed "+flowcontrolv1.AutoUpdateAnnotationKey+" annotation", "name", name, "annotationValue", value, "parseError", err)
335 continue
336 }
337 if !autoUpdate {
338 klog.V(5).InfoS("Skipping deletion of APF object with "+flowcontrolv1.AutoUpdateAnnotationKey+"=false annotation", "name", name)
339 continue
340 }
341
342 err = objectOps.Delete(ctx, object.GetName(), metav1.DeleteOptions{ })
343 if err == nil {
344 klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the unwanted %s", object.GetObjectKind().GroupVersionKind().Kind), "name", name)
345 continue
346 }
347 if apierrors.IsNotFound(err) {
348 klog.V(5).InfoS("Unwanted APF object was concurrently deleted", "name", name)
349 } else {
350 return fmt.Errorf("failed to delete unwatned APF object %q - %w", name, err)
351 }
352 }
353 return nil
354 }
355
356 func namesOfBootstrapObjects[ObjectType configurationObjectType](bos []ObjectType) sets.String {
357 names := sets.NewString()
358 for _, bo := range bos {
359 names.Insert(bo.GetName())
360 }
361 return names
362 }
363
View as plain text