...

Source file src/github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/policymember/iampolicymember_controller.go

Documentation: github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/policymember

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package policymember
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"time"
    22  
    23  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/iam/v1beta1"
    24  	iamv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/iam/v1beta1"
    25  	condition "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1"
    26  	kcciamclient "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/iamclient"
    27  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/jitter"
    28  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler"
    29  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/metrics"
    30  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
    31  	kccratelimiter "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/ratelimiter"
    32  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher"
    33  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/dcl/conversion"
    34  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/execution"
    35  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
    36  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/servicemapping/servicemappingloader"
    37  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/util"
    38  
    39  	mmdcl "github.com/GoogleCloudPlatform/declarative-resource-client-library/dcl"
    40  	tfschema "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
    41  	"golang.org/x/sync/semaphore"
    42  	corev1 "k8s.io/api/core/v1"
    43  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    44  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    45  	"k8s.io/apimachinery/pkg/runtime"
    46  	"k8s.io/apimachinery/pkg/types"
    47  	"k8s.io/client-go/rest"
    48  	"sigs.k8s.io/controller-runtime/pkg/builder"
    49  	"sigs.k8s.io/controller-runtime/pkg/client"
    50  	"sigs.k8s.io/controller-runtime/pkg/controller"
    51  	"sigs.k8s.io/controller-runtime/pkg/event"
    52  	"sigs.k8s.io/controller-runtime/pkg/handler"
    53  	klog "sigs.k8s.io/controller-runtime/pkg/log"
    54  	"sigs.k8s.io/controller-runtime/pkg/manager"
    55  	"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
    56  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    57  	"sigs.k8s.io/controller-runtime/pkg/source"
    58  )
    59  
    60  const controllerName = "iampolicymember-controller"
    61  
    62  var logger = klog.Log.WithName(controllerName)
    63  
    64  // Add creates a new IAM Policy Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
    65  // and start it when the Manager is started.
    66  func Add(mgr manager.Manager, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader,
    67  	converter *conversion.Converter, dclConfig *mmdcl.Config) error {
    68  	immediateReconcileRequests := make(chan event.GenericEvent, k8s.ImmediateReconcileRequestsBufferSize)
    69  	resourceWatcherRoutines := semaphore.NewWeighted(k8s.MaxNumResourceWatcherRoutines)
    70  	reconciler, err := NewReconciler(mgr, provider, smLoader, converter, dclConfig, immediateReconcileRequests, resourceWatcherRoutines)
    71  	if err != nil {
    72  		return err
    73  	}
    74  	return add(mgr, reconciler)
    75  }
    76  
    77  // NewReconciler returns a new reconcile.Reconciler.
    78  func NewReconciler(mgr manager.Manager, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, converter *conversion.Converter, dclConfig *mmdcl.Config, immediateReconcileRequests chan event.GenericEvent, resourceWatcherRoutines *semaphore.Weighted) (*Reconciler, error) {
    79  	r := Reconciler{
    80  		LifecycleHandler: lifecyclehandler.NewLifecycleHandler(
    81  			mgr.GetClient(),
    82  			mgr.GetEventRecorderFor(controllerName),
    83  		),
    84  		Client:                     mgr.GetClient(),
    85  		iamClient:                  kcciamclient.New(provider, smLoader, mgr.GetClient(), converter, dclConfig),
    86  		scheme:                     mgr.GetScheme(),
    87  		config:                     mgr.GetConfig(),
    88  		immediateReconcileRequests: immediateReconcileRequests,
    89  		resourceWatcherRoutines:    resourceWatcherRoutines,
    90  		requeueRateLimiter:         kccratelimiter.RequeueRateLimiter(),
    91  	}
    92  
    93  	return &r, nil
    94  }
    95  
    96  // add adds a new Controller to mgr with r as the reconcile.Reconciler.
    97  func add(mgr manager.Manager, r *Reconciler) error {
    98  	obj := &iamv1beta1.IAMPolicyMember{}
    99  	_, err := builder.
   100  		ControllerManagedBy(mgr).
   101  		Named(controllerName).
   102  		WithOptions(controller.Options{MaxConcurrentReconciles: k8s.ControllerMaxConcurrentReconciles, RateLimiter: kccratelimiter.NewRateLimiter()}).
   103  		Watches(&source.Channel{Source: r.immediateReconcileRequests}, &handler.EnqueueRequestForObject{}).
   104  		For(obj, builder.OnlyMetadata, builder.WithPredicates(predicate.UnderlyingResourceOutOfSyncPredicate{})).
   105  		Build(r)
   106  	if err != nil {
   107  		return fmt.Errorf("error creating new controller: %v", err)
   108  	}
   109  	return nil
   110  }
   111  
   112  var _ reconcile.Reconciler = &Reconciler{}
   113  
   114  type Reconciler struct {
   115  	lifecyclehandler.LifecycleHandler
   116  	client.Client
   117  	metrics.ReconcilerMetrics
   118  	iamClient *kcciamclient.IAMClient
   119  	scheme    *runtime.Scheme
   120  	config    *rest.Config
   121  	// Fields used for triggering reconciliations when dependencies are ready
   122  	immediateReconcileRequests chan event.GenericEvent
   123  	resourceWatcherRoutines    *semaphore.Weighted // Used to cap number of goroutines watching unready dependencies
   124  
   125  	// rate limit requeues (periodic re-reconciliation), so we don't use the whole rate limit on re-reconciles
   126  	requeueRateLimiter ratelimiter.RateLimiter
   127  }
   128  
   129  type reconcileContext struct {
   130  	Reconciler     *Reconciler
   131  	Ctx            context.Context
   132  	NamespacedName types.NamespacedName
   133  }
   134  
   135  // Reconcile checks k8s for the current state of the resource.
   136  func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (result reconcile.Result, err error) {
   137  	logger.Info("Starting reconcile", "resource", request.NamespacedName)
   138  	startTime := time.Now()
   139  	ctx, cancel := context.WithTimeout(ctx, k8s.ReconcileDeadline)
   140  	defer cancel()
   141  	r.RecordReconcileWorkers(ctx, iamv1beta1.IAMPolicyMemberGVK)
   142  	defer r.AfterReconcile()
   143  	defer r.RecordReconcileMetrics(ctx, iamv1beta1.IAMPolicyMemberGVK, request.Namespace, request.Name, startTime, &err)
   144  
   145  	var memberPolicy iamv1beta1.IAMPolicyMember
   146  	if err := r.Get(context.TODO(), request.NamespacedName, &memberPolicy); err != nil {
   147  		if apierrors.IsNotFound(err) {
   148  			return reconcile.Result{}, nil
   149  		}
   150  		return reconcile.Result{}, err
   151  	}
   152  	reconcileContext := &reconcileContext{
   153  		Reconciler:     r,
   154  		Ctx:            ctx,
   155  		NamespacedName: request.NamespacedName,
   156  	}
   157  	requeue, err := reconcileContext.doReconcile(&memberPolicy)
   158  	if err != nil {
   159  		return reconcile.Result{}, err
   160  	}
   161  	if requeue {
   162  		return reconcile.Result{Requeue: true}, nil
   163  	}
   164  
   165  	jitteredPeriod, err := jitter.GenerateJitteredReenqueuePeriod(iamv1beta1.IAMPolicyMemberGVK, nil, nil, &memberPolicy)
   166  	if err != nil {
   167  		return reconcile.Result{}, err
   168  	}
   169  	requeueDelay := r.requeueRateLimiter.When(request)
   170  	requeueAfter := jitteredPeriod + requeueDelay
   171  	logger.Info("successfully finished reconcile", "resource", request.NamespacedName, "time to next reconciliation", requeueAfter)
   172  	return reconcile.Result{RequeueAfter: requeueAfter}, nil
   173  }
   174  
   175  func (r *reconcileContext) doReconcile(policyMember *iamv1beta1.IAMPolicyMember) (requeue bool, err error) {
   176  	defer execution.RecoverWithInternalError(&err)
   177  	if !policyMember.DeletionTimestamp.IsZero() {
   178  		if !k8s.HasFinalizer(policyMember, k8s.ControllerFinalizerName) {
   179  			// Resource has no controller finalizer; no finalization necessary
   180  			return false, nil
   181  		}
   182  		if k8s.HasFinalizer(policyMember, k8s.DeletionDefenderFinalizerName) {
   183  			// deletion defender has not yet been finalized; requeuing
   184  			logger.Info("deletion defender has not yet been finalized; requeuing", "resource", k8s.GetNamespacedName(policyMember))
   185  			return true, nil
   186  		}
   187  		if !k8s.HasAbandonAnnotation(policyMember) {
   188  			if err := r.Reconciler.iamClient.DeletePolicyMember(r.Ctx, policyMember); err != nil {
   189  				if !errors.Is(err, kcciamclient.NotFoundError) && !k8s.IsReferenceNotFoundError(err) {
   190  					if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
   191  						logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policyMember))
   192  						resource, err := ToK8sResource(policyMember)
   193  						if err != nil {
   194  							return false, fmt.Errorf("error converting IAMPolicyMember to k8s resource while handling unresolvable dependencies event: %w", err)
   195  						}
   196  						// Requeue resource for reconciliation with exponential backoff applied
   197  						return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, unwrappedErr)
   198  					}
   199  					return false, r.handleDeleteFailed(policyMember, err)
   200  				}
   201  			}
   202  		}
   203  		return false, r.handleDeleted(policyMember)
   204  	}
   205  	if _, err := r.Reconciler.iamClient.GetPolicyMember(r.Ctx, policyMember); err != nil {
   206  		if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
   207  			logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policyMember))
   208  			return r.handleUnresolvableDeps(policyMember, unwrappedErr)
   209  		}
   210  		if !errors.Is(err, kcciamclient.NotFoundError) {
   211  			return false, r.handleUpdateFailed(policyMember, err)
   212  		}
   213  	}
   214  	if !k8s.EnsureFinalizers(policyMember, k8s.ControllerFinalizerName, k8s.DeletionDefenderFinalizerName) {
   215  		if err := r.update(policyMember); err != nil {
   216  			return false, r.handleUpdateFailed(policyMember, err)
   217  		}
   218  	}
   219  	if _, err := r.Reconciler.iamClient.SetPolicyMember(r.Ctx, policyMember); err != nil {
   220  		if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
   221  			logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policyMember))
   222  			return r.handleUnresolvableDeps(policyMember, unwrappedErr)
   223  		}
   224  		return false, r.handleUpdateFailed(policyMember, fmt.Errorf("error setting policy member: %w", err))
   225  	}
   226  	if isAPIServerUpdateRequired(policyMember) {
   227  		return false, r.handleUpToDate(policyMember)
   228  	}
   229  	return false, nil
   230  }
   231  
   232  func (r *reconcileContext) update(policyMember *iamv1beta1.IAMPolicyMember) error {
   233  	if err := r.Reconciler.Client.Update(r.Ctx, policyMember); err != nil {
   234  		return fmt.Errorf("error updating '%v' in API server: %w", r.NamespacedName, err)
   235  	}
   236  	return nil
   237  }
   238  
   239  func (r *reconcileContext) handleUpToDate(policyMember *iamv1beta1.IAMPolicyMember) error {
   240  	resource, err := ToK8sResource(policyMember)
   241  	if err != nil {
   242  		return fmt.Errorf("error converting IAMPolicyMember to k8s resource while handling %v event: %w", k8s.UpToDate, err)
   243  	}
   244  	return r.Reconciler.HandleUpToDate(r.Ctx, resource)
   245  }
   246  
   247  func (r *reconcileContext) handleUpdateFailed(policyMember *iamv1beta1.IAMPolicyMember, origErr error) error {
   248  	resource, err := ToK8sResource(policyMember)
   249  	if err != nil {
   250  		logger.Error(err, "error converting IAMPolicyMember to k8s resource while handling event",
   251  			"resource", k8s.GetNamespacedName(policyMember), "event", k8s.UpdateFailed)
   252  		return fmt.Errorf("Update call failed: %w", origErr)
   253  	}
   254  	return r.Reconciler.HandleUpdateFailed(r.Ctx, resource, origErr)
   255  }
   256  
   257  func (r *reconcileContext) handleDeleted(policyMember *iamv1beta1.IAMPolicyMember) error {
   258  	resource, err := ToK8sResource(policyMember)
   259  	if err != nil {
   260  		return fmt.Errorf("error converting IAMPolicyMember to k8s resource while handling %v event: %w", k8s.Deleted, err)
   261  	}
   262  	return r.Reconciler.HandleDeleted(r.Ctx, resource)
   263  }
   264  
   265  func (r *reconcileContext) handleDeleteFailed(policyMember *iamv1beta1.IAMPolicyMember, origErr error) error {
   266  	resource, err := ToK8sResource(policyMember)
   267  	if err != nil {
   268  		logger.Error(err, "error converting IAMPolicyMember to k8s resource while handling event",
   269  			"resource", k8s.GetNamespacedName(policyMember), "event", k8s.DeleteFailed)
   270  		return fmt.Errorf(k8s.DeleteFailedMessageTmpl, origErr)
   271  	}
   272  	return r.Reconciler.HandleDeleteFailed(r.Ctx, resource, origErr)
   273  }
   274  
   275  func (r *Reconciler) supportsImmediateReconciliations() bool {
   276  	return r.immediateReconcileRequests != nil
   277  }
   278  
   279  func (r *reconcileContext) handleUnresolvableDeps(policyMember *v1beta1.IAMPolicyMember, origErr error) (requeue bool, err error) {
   280  	resource, err := ToK8sResource(policyMember)
   281  	if err != nil {
   282  		return false, fmt.Errorf("error converting IAMPolicyMember to k8s resource while handling unresolvable dependencies event: %w", err)
   283  	}
   284  	refGVK, refNN, ok := lifecyclehandler.CausedByUnreadyOrNonexistentResourceRefs(origErr)
   285  	if !ok || !r.Reconciler.supportsImmediateReconciliations() {
   286  		// Requeue resource for reconciliation with exponential backoff applied
   287  		return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
   288  	}
   289  	// Check that the number of active resource watches
   290  	// does not exceed the controller's cap. If the
   291  	// capacity is not exceeded, The number of active
   292  	// resource watches is incremented by one and a watch
   293  	// is started
   294  	if !r.Reconciler.resourceWatcherRoutines.TryAcquire(1) {
   295  		// Requeue resource for reconciliation with exponential backoff applied
   296  		return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
   297  	}
   298  	// Create a logger for ResourceWatcher that contains info
   299  	// about the referencing resource. This is done since the
   300  	// messages logged by ResourceWatcher only include the
   301  	// information of the resource it is watching by default.
   302  	watcherLogger := logger.WithValues(
   303  		"referencingResource", resource.GetNamespacedName(),
   304  		"referencingResourceGVK", resource.GroupVersionKind())
   305  	watcher, err := resourcewatcher.New(r.Reconciler.config, watcherLogger)
   306  	if err != nil {
   307  		return false, r.Reconciler.HandleUpdateFailed(r.Ctx, resource, fmt.Errorf("error initializing new resourcewatcher: %w", err))
   308  	}
   309  
   310  	logger := logger.WithValues(
   311  		"resource", resource.GetNamespacedName(),
   312  		"resourceGVK", resource.GroupVersionKind(),
   313  		"reference", refNN,
   314  		"referenceGVK", refGVK)
   315  	go func() {
   316  		// Decrement the count of active resource watches after
   317  		// the watch finishes
   318  		defer r.Reconciler.resourceWatcherRoutines.Release(1)
   319  		timeoutPeriod := jitter.GenerateWatchJitteredTimeoutPeriod()
   320  		ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
   321  		defer cancel()
   322  		logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
   323  		if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
   324  			logger.Error(err, "error while waiting for resource's reference to be ready")
   325  			return
   326  		}
   327  		logger.Info("enqueuing resource for immediate reconciliation now that its reference is ready")
   328  		r.Reconciler.enqueueForImmediateReconciliation(resource.GetNamespacedName())
   329  	}()
   330  
   331  	// Do not requeue resource for immediate reconciliation. Wait for either
   332  	// the next periodic reconciliation or for the referenced resource to be ready (which
   333  	// triggers a reconciliation), whichever comes first.
   334  	return false, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
   335  }
   336  
   337  // enqueueForImmediateReconciliation enqueues the given resource for immediate
   338  // reconciliation. Note that this function only takes in the name and namespace
   339  // of the resource and not its GVK since the controller instance that this
   340  // reconcile instance belongs to can only reconcile resources of one GVK.
   341  func (r *Reconciler) enqueueForImmediateReconciliation(resourceNN types.NamespacedName) {
   342  	genEvent := event.GenericEvent{}
   343  	genEvent.Object = &unstructured.Unstructured{}
   344  	genEvent.Object.SetNamespace(resourceNN.Namespace)
   345  	genEvent.Object.SetName(resourceNN.Name)
   346  	r.immediateReconcileRequests <- genEvent
   347  }
   348  
   349  func isAPIServerUpdateRequired(policyMember *iamv1beta1.IAMPolicyMember) bool {
   350  	// TODO: even in the event of an actual update to GCP, this function will
   351  	// return false because the condition comparison doesn't account for time.
   352  	conditions := []condition.Condition{
   353  		k8s.NewCustomReadyCondition(corev1.ConditionTrue, k8s.UpToDate, k8s.UpToDateMessage),
   354  	}
   355  	if !k8s.ConditionSlicesEqual(policyMember.Status.Conditions, conditions) {
   356  		return true
   357  	}
   358  	if policyMember.Status.ObservedGeneration != policyMember.GetGeneration() {
   359  		return true
   360  	}
   361  	return false
   362  }
   363  
   364  func ToK8sResource(policyMember *iamv1beta1.IAMPolicyMember) (*k8s.Resource, error) {
   365  	kcciamclient.SetGVK(policyMember)
   366  	resource := k8s.Resource{}
   367  	if err := util.Marshal(policyMember, &resource); err != nil {
   368  		return nil, fmt.Errorf("error marshalling IAMPolicyMember to k8s resource: %w", err)
   369  	}
   370  	return &resource, nil
   371  }
   372  

View as plain text