...

Source file src/github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler/handler.go

Documentation: github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package lifecyclehandler
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  
    21  	corekccv1alpha1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/core/v1alpha1"
    22  	k8sv1alpha1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1"
    23  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/deepcopy"
    24  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
    25  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/label"
    26  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/lease/leaser"
    27  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/resourceoverrides"
    28  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/util"
    29  
    30  	corev1 "k8s.io/api/core/v1"
    31  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    34  	"k8s.io/apimachinery/pkg/runtime/schema"
    35  	"k8s.io/apimachinery/pkg/types"
    36  	"k8s.io/client-go/tools/record"
    37  	"sigs.k8s.io/controller-runtime/pkg/client"
    38  )
    39  
    40  // The LifecycleHandler contains common methods to handle the lifecycle of the reconciliation
    41  type LifecycleHandler struct {
    42  	client.Client
    43  	Recorder   record.EventRecorder
    44  	fieldOwner string
    45  }
    46  
    47  func NewLifecycleHandler(c client.Client, r record.EventRecorder) LifecycleHandler {
    48  	return NewLifecycleHandlerWithFieldOwner(c, r, k8s.ControllerManagedFieldManager)
    49  }
    50  
    51  func NewLifecycleHandlerWithFieldOwner(c client.Client, r record.EventRecorder, fieldOwner string) LifecycleHandler {
    52  	return LifecycleHandler{
    53  		Client:     c,
    54  		Recorder:   r,
    55  		fieldOwner: fieldOwner,
    56  	}
    57  }
    58  
    59  func (r *LifecycleHandler) updateStatus(ctx context.Context, resource *k8s.Resource) error {
    60  	u, err := resource.MarshalAsUnstructured()
    61  	if err != nil {
    62  		return err
    63  	}
    64  	if err := r.Client.Status().Update(ctx, u, client.FieldOwner(r.fieldOwner)); err != nil {
    65  		if apierrors.IsConflict(err) {
    66  			return fmt.Errorf("couldn't update the API server due to conflict. Re-enqueue the request for another reconciliation attempt: %v", err)
    67  		}
    68  		return fmt.Errorf("error with status update call to API server: %v", err)
    69  	}
    70  	// rejections by some validating webhooks won't be returned as an error; instead, they will be
    71  	// objects of kind "Status" with a "Failure" status.
    72  	if isFailureStatus(u) {
    73  		return fmt.Errorf("error with status update call to API server: %v", u.Object["message"])
    74  	}
    75  	// sync the resource up with the updated metadata.
    76  	if err := util.Marshal(u, resource); err != nil {
    77  		return err
    78  	}
    79  	return resourceoverrides.Handler.PostUpdateStatusTransform(resource)
    80  }
    81  
    82  // WARNING: This function should NOT be exported and invoked directly outside the package.
    83  // Controllers are supposed to call exported functions to handle lifecycle transitions.
    84  func (r *LifecycleHandler) updateAPIServer(ctx context.Context, resource *k8s.Resource) error {
    85  	// Preserve the intended status, as the client.Update call will ignore the given status
    86  	// and return the stale existing status.
    87  	status := deepcopy.MapStringInterface(resource.Status)
    88  	// Get the current generation as the observed generation because the following client.Update
    89  	// might increase the generation. We want the next reconciliation to handle the new generation.
    90  	observedGeneration := resource.GetGeneration()
    91  	u, err := resource.MarshalAsUnstructured()
    92  	if err != nil {
    93  		return err
    94  	}
    95  	removeSystemLabels(u)
    96  	if err := r.Client.Update(ctx, u, client.FieldOwner(r.fieldOwner)); err != nil {
    97  		if apierrors.IsConflict(err) {
    98  			return fmt.Errorf("couldn't update the API server due to conflict. Re-enqueue the request for another reconciliation attempt: %v", err)
    99  		}
   100  		return fmt.Errorf("error with update call to API server: %v", err)
   101  	}
   102  	// rejections by validating webhooks won't be returned as an error; instead, they will be
   103  	// objects of kind "Status" with a "Failure" status.
   104  	if isFailureStatus(u) {
   105  		return fmt.Errorf("error with update call to API server: %v", u.Object["message"])
   106  	}
   107  	// sync the resource up with the updated metadata
   108  	if err := util.Marshal(u, resource); err != nil {
   109  		return fmt.Errorf("error syncing updated resource metadata: %w", err)
   110  	}
   111  	if !u.GetDeletionTimestamp().IsZero() && len(u.GetFinalizers()) == 0 {
   112  		// This resource is set for garbage collection and any status updates would be racey.
   113  		// Status updates for successful deletions must be handled independently.
   114  		return nil
   115  	}
   116  	resource.Status = status
   117  	setObservedGeneration(resource, observedGeneration)
   118  	return r.updateStatus(ctx, resource)
   119  }
   120  
   121  func isFailureStatus(u *unstructured.Unstructured) bool {
   122  	return u.GetKind() == "Status" && u.Object["status"] == metav1.StatusFailure
   123  }
   124  
   125  // The system sets various labels on the resource that are not user facing and should not be saved in the API server
   126  // this function removes any that may be present
   127  func removeSystemLabels(u *unstructured.Unstructured) {
   128  	labels := u.GetLabels()
   129  	if labels == nil {
   130  		return
   131  	}
   132  	keys := leaser.GetLabelKeys()
   133  	keys = append(keys, label.CnrmManagedKey)
   134  	for _, k := range keys {
   135  		delete(labels, k)
   136  	}
   137  	// GetLabels(...) returns a new copy of the labels map so we must overwrite that value with our local value
   138  	u.SetLabels(labels)
   139  }
   140  
   141  // CausedByUnreadyOrNonexistentResourceRefs checks to see if the input error
   142  // is related to an unready or non-existent resource reference. Note that
   143  // KeyInSecretNotFoundError is not included in this list.
   144  func CausedByUnreadyOrNonexistentResourceRefs(err error) (refGVK schema.GroupVersionKind, refNN types.NamespacedName, ok bool) {
   145  	if unwrappedErr, ok := k8s.AsReferenceNotReadyError(err); ok {
   146  		return unwrappedErr.RefResourceGVK, unwrappedErr.RefResource, true
   147  	}
   148  	if unwrappedErr, ok := k8s.AsReferenceNotFoundError(err); ok {
   149  		return unwrappedErr.RefResourceGVK, unwrappedErr.RefResource, true
   150  	}
   151  	if unwrappedErr, ok := k8s.AsTransitiveDependencyNotFoundError(err); ok {
   152  		return unwrappedErr.ResourceGVK, unwrappedErr.Resource, true
   153  	}
   154  	if unwrappedErr, ok := k8s.AsTransitiveDependencyNotReadyError(err); ok {
   155  		return unwrappedErr.ResourceGVK, unwrappedErr.Resource, true
   156  	}
   157  	if unwrappedErr, ok := k8s.AsSecretNotFoundError(err); ok {
   158  		return schema.GroupVersionKind{Version: "v1", Kind: "Secret"}, unwrappedErr.Secret, true
   159  	}
   160  	return schema.GroupVersionKind{}, types.NamespacedName{}, false
   161  }
   162  
   163  func CausedByUnresolvableDeps(err error) (unwrappedErr error, ok bool) {
   164  	if unwrappedErr, ok := k8s.AsReferenceNotReadyError(err); ok {
   165  		return unwrappedErr, true
   166  	}
   167  	if unwrappedErr, ok := k8s.AsReferenceNotFoundError(err); ok {
   168  		return unwrappedErr, true
   169  	}
   170  	if unwrappedErr, ok := k8s.AsSecretNotFoundError(err); ok {
   171  		return unwrappedErr, true
   172  	}
   173  	if unwrappedErr, ok := k8s.AsKeyInSecretNotFoundError(err); ok {
   174  		return unwrappedErr, true
   175  	}
   176  	if unwrappedErr, ok := k8s.AsTransitiveDependencyNotFoundError(err); ok {
   177  		return unwrappedErr, true
   178  	}
   179  	if unwrappedErr, ok := k8s.AsTransitiveDependencyNotReadyError(err); ok {
   180  		return unwrappedErr, true
   181  	}
   182  	return nil, false
   183  }
   184  
   185  func reasonForUnresolvableDeps(err error) (string, error) {
   186  	switch err.(type) {
   187  	case *k8s.ReferenceNotReadyError, *k8s.TransitiveDependencyNotReadyError:
   188  		return k8s.DependencyNotReady, nil
   189  	case *k8s.ReferenceNotFoundError, *k8s.SecretNotFoundError, *k8s.TransitiveDependencyNotFoundError:
   190  		return k8s.DependencyNotFound, nil
   191  	case *k8s.KeyInSecretNotFoundError:
   192  		return k8s.DependencyInvalid, nil
   193  	default:
   194  		return "", fmt.Errorf("unrecognized error caused by unresolvable dependencies: %v", err)
   195  	}
   196  }
   197  
   198  func (r *LifecycleHandler) EnsureFinalizers(ctx context.Context, original, resource *k8s.Resource, finalizers ...string) error {
   199  	if !k8s.EnsureFinalizers(resource, finalizers...) {
   200  		u, err := original.MarshalAsUnstructured()
   201  		if err != nil {
   202  			return err
   203  		}
   204  		copy, err := k8s.NewResource(u)
   205  		if err != nil {
   206  			return err
   207  		}
   208  		if !k8s.EnsureFinalizers(copy, finalizers...) {
   209  			if err := r.updateAPIServer(ctx, copy); err != nil {
   210  				return err
   211  			}
   212  			// sync the resource up with the updated metadata.
   213  			resource.ObjectMeta = copy.ObjectMeta
   214  		}
   215  	}
   216  	return nil
   217  }
   218  
   219  func (r *LifecycleHandler) HandleUpToDate(ctx context.Context, resource *k8s.Resource) error {
   220  	setCondition(resource, corev1.ConditionTrue, k8s.UpToDate, k8s.UpToDateMessage)
   221  	if err := r.updateAPIServer(ctx, resource); err != nil {
   222  		return err
   223  	}
   224  	r.recordEvent(resource, corev1.EventTypeNormal, k8s.UpToDate, k8s.UpToDateMessage)
   225  	return nil
   226  }
   227  
   228  func (r *LifecycleHandler) HandleUnresolvableDeps(ctx context.Context, resource *k8s.Resource, originErr error) error {
   229  	reason, err := reasonForUnresolvableDeps(originErr)
   230  	if err != nil {
   231  		return r.HandleUpdateFailed(ctx, resource, err)
   232  	}
   233  	msg := originErr.Error()
   234  	// Only update the API server if there's new information
   235  	if !k8s.ReadyConditionMatches(resource, corev1.ConditionFalse, reason, msg) {
   236  		setCondition(resource, corev1.ConditionFalse, reason, msg)
   237  		setObservedGeneration(resource, resource.GetGeneration())
   238  		if err := r.updateStatus(ctx, resource); err != nil {
   239  			return err
   240  		}
   241  	}
   242  	r.recordEvent(resource, corev1.EventTypeWarning, reason, msg)
   243  	return nil
   244  }
   245  
   246  func (r *LifecycleHandler) HandleObtainLeaseFailed(ctx context.Context, resource *k8s.Resource, err error) error {
   247  	msg := err.Error()
   248  	// Only update the API server if there's new information
   249  	if !k8s.ReadyConditionMatches(resource, corev1.ConditionFalse, k8s.ManagementConflict, msg) {
   250  		setCondition(resource, corev1.ConditionFalse, k8s.ManagementConflict, msg)
   251  		setObservedGeneration(resource, resource.GetGeneration())
   252  		if err := r.updateStatus(ctx, resource); err != nil {
   253  			return err
   254  		}
   255  	}
   256  	r.recordEvent(resource, corev1.EventTypeWarning, k8s.ManagementConflict, msg)
   257  	return err
   258  }
   259  
   260  func (r *LifecycleHandler) HandlePreActuationTransformFailed(ctx context.Context, resource *k8s.Resource, err error) error {
   261  	msg := err.Error()
   262  	// Only update the API server if there's new information
   263  	if !k8s.ReadyConditionMatches(resource, corev1.ConditionFalse, k8s.PreActuationTransformFailed, msg) {
   264  		setCondition(resource, corev1.ConditionFalse, k8s.PreActuationTransformFailed, msg)
   265  		setObservedGeneration(resource, resource.GetGeneration())
   266  		if err := r.updateStatus(ctx, resource); err != nil {
   267  			return err
   268  		}
   269  	}
   270  	r.recordEvent(resource, corev1.EventTypeWarning, k8s.PreActuationTransformFailed, msg)
   271  	return err
   272  }
   273  
   274  func (r *LifecycleHandler) HandlePostActuationTransformFailed(ctx context.Context, resource *k8s.Resource, err error) error {
   275  	msg := err.Error()
   276  	// Only update the API server if there's new information
   277  	if !k8s.ReadyConditionMatches(resource, corev1.ConditionFalse, k8s.PostActuationTransformFailed, msg) {
   278  		setCondition(resource, corev1.ConditionFalse, k8s.PostActuationTransformFailed, msg)
   279  		setObservedGeneration(resource, resource.GetGeneration())
   280  		if err := r.updateStatus(ctx, resource); err != nil {
   281  			return err
   282  		}
   283  	}
   284  	r.recordEvent(resource, corev1.EventTypeWarning, k8s.PostActuationTransformFailed, msg)
   285  	return err
   286  }
   287  
   288  func (r *LifecycleHandler) HandleUpdating(ctx context.Context, resource *k8s.Resource) error {
   289  	setCondition(resource, corev1.ConditionFalse, k8s.Updating, k8s.UpdatingMessage)
   290  	setObservedGeneration(resource, resource.GetGeneration())
   291  	if err := r.updateStatus(ctx, resource); err != nil {
   292  		return err
   293  	}
   294  	r.recordEvent(resource, corev1.EventTypeNormal, k8s.Updating, k8s.UpdatingMessage)
   295  	return nil
   296  }
   297  
   298  func (r *LifecycleHandler) HandleUpdateFailed(ctx context.Context, resource *k8s.Resource, err error) error {
   299  	msg := fmt.Sprintf("Update call failed: %v", err)
   300  	setCondition(resource, corev1.ConditionFalse, k8s.UpdateFailed, msg)
   301  	setObservedGeneration(resource, resource.GetGeneration())
   302  	if err := r.updateStatus(ctx, resource); err != nil {
   303  		return err
   304  	}
   305  	r.recordEvent(resource, corev1.EventTypeWarning, k8s.UpdateFailed, msg)
   306  	return fmt.Errorf("Update call failed: %w", err)
   307  }
   308  
   309  func (r *LifecycleHandler) HandleDeleting(ctx context.Context, resource *k8s.Resource) error {
   310  	setCondition(resource, corev1.ConditionFalse, k8s.Deleting, k8s.DeletingMessage)
   311  	setObservedGeneration(resource, resource.GetGeneration())
   312  	if err := r.updateStatus(ctx, resource); err != nil {
   313  		return err
   314  	}
   315  	r.recordEvent(resource, corev1.EventTypeNormal, k8s.Deleting, k8s.DeletingMessage)
   316  	return nil
   317  }
   318  
   319  func (r *LifecycleHandler) HandleDeleted(ctx context.Context, resource *k8s.Resource) error {
   320  	setCondition(resource, corev1.ConditionFalse, k8s.Deleted, k8s.DeletedMessage)
   321  	setObservedGeneration(resource, resource.GetGeneration())
   322  	// Do an explicit status update first to prevent a race between the status update and the API
   323  	// server pruning the resource if there are no more finalizers present.
   324  	if err := r.updateStatus(ctx, resource); err != nil {
   325  		return fmt.Errorf("error updating status: %w", err)
   326  	}
   327  	r.recordEvent(resource, corev1.EventTypeNormal, k8s.Deleted, k8s.DeletedMessage)
   328  
   329  	k8s.RemoveFinalizer(resource, k8s.ControllerFinalizerName)
   330  	return r.updateAPIServer(ctx, resource)
   331  }
   332  
   333  func (r *LifecycleHandler) HandleDeleteFailed(ctx context.Context, resource *k8s.Resource, err error) error {
   334  	msg := fmt.Sprintf(k8s.DeleteFailedMessageTmpl, err)
   335  	setCondition(resource, corev1.ConditionFalse, k8s.DeleteFailed, msg)
   336  	setObservedGeneration(resource, resource.GetGeneration())
   337  	if err := r.updateStatus(ctx, resource); err != nil {
   338  		return err
   339  	}
   340  	r.recordEvent(resource, corev1.EventTypeWarning, k8s.DeleteFailed, msg)
   341  	return fmt.Errorf("Delete call failed: %w", err)
   342  }
   343  
   344  func (r *LifecycleHandler) HandleUnmanaged(ctx context.Context, resource *k8s.Resource) error {
   345  	msg := fmt.Sprintf(k8s.UnmanagedMessageTmpl, resource.GetNamespace())
   346  	setCondition(resource, corev1.ConditionFalse, k8s.Unmanaged, msg)
   347  	setObservedGeneration(resource, resource.GetGeneration())
   348  	if err := r.updateStatus(ctx, resource); err != nil {
   349  		return err
   350  	}
   351  	r.recordEvent(resource, corev1.EventTypeWarning, k8s.Unmanaged, msg)
   352  	return nil
   353  }
   354  
   355  func setCondition(resource *k8s.Resource, status corev1.ConditionStatus, reason, msg string) {
   356  	if resource.Status == nil {
   357  		resource.Status = make(map[string]interface{})
   358  	}
   359  	newReadyCondition := k8s.NewCustomReadyCondition(status, reason, msg)
   360  	// We should only update the ready condition's last transition time if there was a transition
   361  	// since its last state. The function sets it to time.Now(), so let's replace it if there was
   362  	// no transition.
   363  	if currentReadyCondition, found := k8s.GetReadyCondition(resource); found {
   364  		if currentReadyCondition.Status == status {
   365  			newReadyCondition.LastTransitionTime = currentReadyCondition.LastTransitionTime
   366  		}
   367  	}
   368  	resource.Status["conditions"] = []k8sv1alpha1.Condition{newReadyCondition}
   369  }
   370  
   371  func setObservedGeneration(resource *k8s.Resource, observedGeneration int64) {
   372  	if resource.Status == nil {
   373  		resource.Status = make(map[string]interface{})
   374  	}
   375  	resource.Status["observedGeneration"] = observedGeneration
   376  }
   377  
   378  func (r *LifecycleHandler) recordEvent(resource *k8s.Resource, eventtype, reason, message string) error {
   379  	u, err := resource.MarshalAsUnstructured()
   380  	if err != nil {
   381  		return err
   382  	}
   383  	r.Recorder.Event(u, eventtype, reason, message)
   384  	return nil
   385  }
   386  
   387  func IsOrphaned(resource *k8s.Resource, parentReferenceConfigs []corekccv1alpha1.TypeConfig, kubeClient client.Client) (orphaned bool, parent *k8s.Resource, err error) {
   388  	if len(parentReferenceConfigs) == 0 {
   389  		return false, nil, nil
   390  	}
   391  	for _, refConfig := range parentReferenceConfigs {
   392  		resourceRefRaw, ok := resource.Spec[refConfig.Key]
   393  		if !ok {
   394  			// This parent type isn't present. Check if another parent type is.
   395  			continue
   396  		}
   397  		resourceRef := &corekccv1alpha1.ResourceReference{}
   398  		if err := util.Marshal(resourceRefRaw, resourceRef); err != nil {
   399  			return false, nil, fmt.Errorf("'spec.%v' is an unrecognized format", refConfig.Key)
   400  		}
   401  		if resourceRef.External != "" {
   402  			return false, nil, nil
   403  		}
   404  		parent, err := k8s.GetReferencedResource(resourceRef, refConfig.GVK, resource.GetNamespace(), kubeClient)
   405  		if err != nil {
   406  			if k8s.IsReferenceNotFoundError(err) {
   407  				return true, nil, nil
   408  			}
   409  			return false, nil, fmt.Errorf("error getting parent reference 'spec.%v': %v", refConfig.Key, err)
   410  		}
   411  		return false, parent, nil
   412  	}
   413  	return false, nil, fmt.Errorf("no parent reference found in resource")
   414  }
   415  

View as plain text