...

Source file src/k8s.io/kubernetes/pkg/registry/flowcontrol/ensurer/strategy.go

Documentation: k8s.io/kubernetes/pkg/registry/flowcontrol/ensurer

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package ensurer
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"strconv"
    23  
    24  	"github.com/google/go-cmp/cmp"
    25  	flowcontrolv1 "k8s.io/api/flowcontrol/v1"
    26  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/labels"
    29  	"k8s.io/apimachinery/pkg/runtime"
    30  	"k8s.io/apimachinery/pkg/util/sets"
    31  	"k8s.io/klog/v2"
    32  )
    33  
    34  const (
    35  	fieldManager = "api-priority-and-fairness-config-producer-v1"
    36  )
    37  
    38  // EnsureStrategy provides a maintenance strategy for APF configuration objects.
    39  // We have two types of strategy, corresponding to the two types of config objetcs:
    40  //
    41  //   - mandatory: the mandatory configurationWrapper objects are about ensuring that the P&F
    42  //     system itself won't crash; we have to be sure there's 'catch-all' place for
    43  //     everything to go. Any changes made by the cluster operators to these
    44  //     configurationWrapper objects will be stomped by the apiserver.
    45  //
    46  //   - suggested: additional configurationWrapper objects for initial behavior.
    47  //     the cluster operators have an option to edit or delete these configurationWrapper objects.
    48  type EnsureStrategy[ObjectType configurationObjectType] interface {
    49  	// Name of the strategy, for now we have two: 'mandatory' and 'suggested'.
    50  	// This comes handy in logging.
    51  	Name() string
    52  
    53  	// ReviseIfNeeded accepts a pair of the current and the bootstrap configuration, determines
    54  	// whether an update is necessary, and returns a (revised if appropriate) copy of the object.
    55  	// current is the existing in-cluster configuration object.
    56  	// bootstrap is the configuration the kube-apiserver maintains in-memory.
    57  	//
    58  	// revised: the new object represents the new configuration to be stored in-cluster.
    59  	// ok: true if auto update is required, otherwise false
    60  	// err: err is set when the function runs into an error and can not
    61  	//      determine if auto update is needed.
    62  	ReviseIfNeeded(objectOps objectLocalOps[ObjectType], current, bootstrap ObjectType) (revised ObjectType, ok bool, err error)
    63  }
    64  
    65  // objectLocalOps is the needed operations on an individual configurationObject
    66  type objectLocalOps[ObjectType configurationObject] interface {
    67  	DeepCopy(ObjectType) ObjectType
    68  
    69  	// replaceSpec returns a deep copy of `into` except that the spec is a deep copy of `from`
    70  	ReplaceSpec(into, from ObjectType) ObjectType
    71  
    72  	// SpecEqualish says whether applying defaulting to `expected`
    73  	// makes its spec more or less equal (as appropriate for the
    74  	// object at hand) that of `actual`.
    75  	SpecEqualish(expected, actual ObjectType) bool
    76  }
    77  
    78  // ObjectOps is the needed operations, both as a receiver from a server and server-independent, on configurationObjects
    79  type ObjectOps[ObjectType configurationObject] interface {
    80  	client[ObjectType]
    81  	cache[ObjectType]
    82  	objectLocalOps[ObjectType]
    83  }
    84  
    85  // Client is the needed fragment of the typed generated client stubs for the given object type
    86  type client[ObjectType configurationObject] interface {
    87  	Create(ctx context.Context, obj ObjectType, opts metav1.CreateOptions) (ObjectType, error)
    88  	Update(ctx context.Context, obj ObjectType, opts metav1.UpdateOptions) (ObjectType, error)
    89  	Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
    90  }
    91  
    92  // cache is the needed fragment of the typed generated access ("lister") to an informer's local cache
    93  type cache[ObjectType configurationObject] interface {
    94  	List(labels.Selector) ([]ObjectType, error)
    95  	Get(name string) (ObjectType, error)
    96  }
    97  
    98  // configurationObject is the relevant interfaces that each API object type implements
    99  type configurationObject interface {
   100  	metav1.Object
   101  	runtime.Object
   102  }
   103  
   104  // configurationObjectType adds the type constraint `comparable` and is thus
   105  // only usable as a type constraint.
   106  type configurationObjectType interface {
   107  	comparable
   108  	configurationObject
   109  }
   110  
   111  type objectOps[ObjectType configurationObjectType] struct {
   112  	client[ObjectType]
   113  	cache[ObjectType]
   114  	deepCopy     func(ObjectType) ObjectType
   115  	replaceSpec  func(ObjectType, ObjectType) ObjectType
   116  	specEqualish func(expected, actual ObjectType) bool
   117  }
   118  
   119  func NewObjectOps[ObjectType configurationObjectType](client client[ObjectType], cache cache[ObjectType],
   120  	deepCopy func(ObjectType) ObjectType,
   121  	replaceSpec func(ObjectType, ObjectType) ObjectType,
   122  	specEqualish func(expected, actual ObjectType) bool,
   123  ) ObjectOps[ObjectType] {
   124  	return objectOps[ObjectType]{client: client,
   125  		cache:        cache,
   126  		deepCopy:     deepCopy,
   127  		replaceSpec:  replaceSpec,
   128  		specEqualish: specEqualish}
   129  }
   130  
   131  func (oo objectOps[ObjectType]) DeepCopy(obj ObjectType) ObjectType { return oo.deepCopy(obj) }
   132  
   133  func (oo objectOps[ObjectType]) ReplaceSpec(into, from ObjectType) ObjectType {
   134  	return oo.replaceSpec(into, from)
   135  }
   136  
   137  func (oo objectOps[ObjectType]) SpecEqualish(expected, actual ObjectType) bool {
   138  	return oo.specEqualish(expected, actual)
   139  }
   140  
   141  // NewSuggestedEnsureStrategy returns an EnsureStrategy for suggested config objects
   142  func NewSuggestedEnsureStrategy[ObjectType configurationObjectType]() EnsureStrategy[ObjectType] {
   143  	return &strategy[ObjectType]{
   144  		alwaysAutoUpdateSpec: false,
   145  		name:                 "suggested",
   146  	}
   147  }
   148  
   149  // NewMandatoryEnsureStrategy returns an EnsureStrategy for mandatory config objects
   150  func NewMandatoryEnsureStrategy[ObjectType configurationObjectType]() EnsureStrategy[ObjectType] {
   151  	return &strategy[ObjectType]{
   152  		alwaysAutoUpdateSpec: true,
   153  		name:                 "mandatory",
   154  	}
   155  }
   156  
   157  // auto-update strategy for the configuration objects
   158  type strategy[ObjectType configurationObjectType] struct {
   159  	alwaysAutoUpdateSpec bool
   160  	name                 string
   161  }
   162  
   163  func (s *strategy[ObjectType]) Name() string {
   164  	return s.name
   165  }
   166  
   167  func (s *strategy[ObjectType]) ReviseIfNeeded(objectOps objectLocalOps[ObjectType], current, bootstrap ObjectType) (ObjectType, bool, error) {
   168  	var zero ObjectType
   169  	if current == zero {
   170  		return zero, false, nil
   171  	}
   172  
   173  	autoUpdateSpec := s.alwaysAutoUpdateSpec
   174  	if !autoUpdateSpec {
   175  		autoUpdateSpec = shouldUpdateSpec(current)
   176  	}
   177  	updateAnnotation := shouldUpdateAnnotation(current, autoUpdateSpec)
   178  
   179  	specChanged := autoUpdateSpec && !objectOps.SpecEqualish(bootstrap, current)
   180  
   181  	if !(updateAnnotation || specChanged) {
   182  		// the annotation key is up to date and the spec has not changed, no update is necessary
   183  		return zero, false, nil
   184  	}
   185  
   186  	var revised ObjectType
   187  	if specChanged {
   188  		revised = objectOps.ReplaceSpec(current, bootstrap)
   189  	} else {
   190  		revised = objectOps.DeepCopy(current)
   191  	}
   192  	if updateAnnotation {
   193  		setAutoUpdateAnnotation(revised, autoUpdateSpec)
   194  	}
   195  
   196  	return revised, true, nil
   197  }
   198  
   199  // shouldUpdateSpec inspects the auto-update annotation key and generation field to determine
   200  // whether the config object should be auto-updated.
   201  func shouldUpdateSpec(accessor metav1.Object) bool {
   202  	value := accessor.GetAnnotations()[flowcontrolv1.AutoUpdateAnnotationKey]
   203  	if autoUpdate, err := strconv.ParseBool(value); err == nil {
   204  		return autoUpdate
   205  	}
   206  
   207  	// We are here because of either a or b:
   208  	// a. the annotation key is missing.
   209  	// b. the annotation key is present but the value does not represent a boolean.
   210  	// In either case, if the operator hasn't changed the spec, we can safely auto update.
   211  	// Please note that we can't protect the changes made by the operator in the following scenario:
   212  	// - The operator deletes and recreates the same object with a variant spec (generation resets to 1).
   213  	if accessor.GetGeneration() == 1 {
   214  		return true
   215  	}
   216  	return false
   217  }
   218  
   219  // shouldUpdateAnnotation determines whether the current value of the auto-update annotation
   220  // key matches the desired value.
   221  func shouldUpdateAnnotation(accessor metav1.Object, desired bool) bool {
   222  	if value, ok := accessor.GetAnnotations()[flowcontrolv1.AutoUpdateAnnotationKey]; ok {
   223  		if current, err := strconv.ParseBool(value); err == nil && current == desired {
   224  			return false
   225  		}
   226  	}
   227  
   228  	return true
   229  }
   230  
   231  // setAutoUpdateAnnotation sets the auto-update annotation key to the specified value.
   232  func setAutoUpdateAnnotation(accessor metav1.Object, autoUpdate bool) {
   233  	if accessor.GetAnnotations() == nil {
   234  		accessor.SetAnnotations(map[string]string{})
   235  	}
   236  
   237  	accessor.GetAnnotations()[flowcontrolv1.AutoUpdateAnnotationKey] = strconv.FormatBool(autoUpdate)
   238  }
   239  
   240  // EnsureConfigurations applies the given maintenance strategy to the given objects.
   241  // At the first error, if any, it stops and returns that error.
   242  func EnsureConfigurations[ObjectType configurationObjectType](ctx context.Context, ops ObjectOps[ObjectType], boots []ObjectType, strategy EnsureStrategy[ObjectType]) error {
   243  	for _, bo := range boots {
   244  		err := EnsureConfiguration(ctx, ops, bo, strategy)
   245  		if err != nil {
   246  			return err
   247  		}
   248  	}
   249  	return nil
   250  }
   251  
   252  // EnsureConfiguration applies the given maintenance strategy to the given object.
   253  func EnsureConfiguration[ObjectType configurationObjectType](ctx context.Context, ops ObjectOps[ObjectType], bootstrap ObjectType, strategy EnsureStrategy[ObjectType]) error {
   254  	name := bootstrap.GetName()
   255  	configurationType := strategy.Name()
   256  
   257  	var current ObjectType
   258  	var err error
   259  	for {
   260  		current, err = ops.Get(name)
   261  		if err == nil {
   262  			break
   263  		}
   264  		if !apierrors.IsNotFound(err) {
   265  			return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
   266  		}
   267  
   268  		// we always re-create a missing configuration object
   269  		if _, err = ops.Create(ctx, ops.DeepCopy(bootstrap), metav1.CreateOptions{FieldManager: fieldManager}); err == nil {
   270  			klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
   271  			return nil
   272  		}
   273  
   274  		if !apierrors.IsAlreadyExists(err) {
   275  			return fmt.Errorf("cannot create %s type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
   276  		}
   277  		klog.V(5).InfoS(fmt.Sprintf("Something created the %s concurrently", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
   278  	}
   279  
   280  	klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
   281  	newObject, update, err := strategy.ReviseIfNeeded(ops, current, bootstrap)
   282  	if err != nil {
   283  		return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
   284  	}
   285  	if !update {
   286  		if klogV := klog.V(5); klogV.Enabled() {
   287  			klogV.InfoS("No update required", "wrapper", bootstrap.GetObjectKind().GroupVersionKind().Kind, "type", configurationType, "name", name,
   288  				"diff", cmp.Diff(current, bootstrap))
   289  		}
   290  		return nil
   291  	}
   292  
   293  	if _, err = ops.Update(ctx, newObject, metav1.UpdateOptions{FieldManager: fieldManager}); err == nil {
   294  		klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, cmp.Diff(current, bootstrap))
   295  		return nil
   296  	}
   297  
   298  	if apierrors.IsConflict(err) {
   299  		klog.V(2).InfoS(fmt.Sprintf("Something updated the %s concurrently, I will check its spec later", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
   300  		return nil
   301  	}
   302  
   303  	return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
   304  }
   305  
   306  // RemoveUnwantedObjects attempts to delete the configuration objects
   307  // that exist, are annotated `apf.kubernetes.io/autoupdate-spec=true`, and do not
   308  // have a name in the given set.  A refusal due to concurrent update is logged
   309  // and not considered an error; the object will be reconsidered later.
   310  func RemoveUnwantedObjects[ObjectType configurationObjectType](ctx context.Context, objectOps ObjectOps[ObjectType], boots []ObjectType) error {
   311  	current, err := objectOps.List(labels.Everything())
   312  	if err != nil {
   313  		return err
   314  	}
   315  	wantedNames := namesOfBootstrapObjects(boots)
   316  	for _, object := range current {
   317  		name := object.GetName()
   318  		if wantedNames.Has(name) {
   319  			continue
   320  		}
   321  		var value string
   322  		var ok, autoUpdate bool
   323  		var err error
   324  		if value, ok = object.GetAnnotations()[flowcontrolv1.AutoUpdateAnnotationKey]; !ok {
   325  			// the configuration object does not have the annotation key,
   326  			// it's probably a user defined configuration object,
   327  			// so we can skip it.
   328  			klog.V(5).InfoS("Skipping deletion of APF object with no "+flowcontrolv1.AutoUpdateAnnotationKey+" annotation", "name", name)
   329  			continue
   330  		}
   331  		autoUpdate, err = strconv.ParseBool(value)
   332  		if err != nil {
   333  			// Log this because it is not an expected situation.
   334  			klog.V(4).InfoS("Skipping deletion of APF object with malformed "+flowcontrolv1.AutoUpdateAnnotationKey+" annotation", "name", name, "annotationValue", value, "parseError", err)
   335  			continue
   336  		}
   337  		if !autoUpdate {
   338  			klog.V(5).InfoS("Skipping deletion of APF object with "+flowcontrolv1.AutoUpdateAnnotationKey+"=false annotation", "name", name)
   339  			continue
   340  		}
   341  		// TODO: expectedResourceVersion := object.GetResourceVersion()
   342  		err = objectOps.Delete(ctx, object.GetName(), metav1.DeleteOptions{ /* TODO: expectedResourceVersion */ })
   343  		if err == nil {
   344  			klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the unwanted %s", object.GetObjectKind().GroupVersionKind().Kind), "name", name)
   345  			continue
   346  		}
   347  		if apierrors.IsNotFound(err) {
   348  			klog.V(5).InfoS("Unwanted APF object was concurrently deleted", "name", name)
   349  		} else {
   350  			return fmt.Errorf("failed to delete unwatned APF object %q - %w", name, err)
   351  		}
   352  	}
   353  	return nil
   354  }
   355  
   356  func namesOfBootstrapObjects[ObjectType configurationObjectType](bos []ObjectType) sets.String {
   357  	names := sets.NewString()
   358  	for _, bo := range bos {
   359  		names.Insert(bo.GetName())
   360  	}
   361  	return names
   362  }
   363  

View as plain text