...

Source file src/github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/policy/iampolicy_controller.go

Documentation: github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/policy

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package policy
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"time"
    22  
    23  	iamv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/iam/v1beta1"
    24  	condition "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1"
    25  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/iamclient"
    26  	kcciamclient "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/iamclient"
    27  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/jitter"
    28  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler"
    29  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/metrics"
    30  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
    31  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/ratelimiter"
    32  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher"
    33  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/dcl/conversion"
    34  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/execution"
    35  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
    36  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/servicemapping/servicemappingloader"
    37  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/util"
    38  
    39  	mmdcl "github.com/GoogleCloudPlatform/declarative-resource-client-library/dcl"
    40  	tfschema "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
    41  	"golang.org/x/sync/semaphore"
    42  	corev1 "k8s.io/api/core/v1"
    43  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    44  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    45  	"k8s.io/apimachinery/pkg/runtime"
    46  	"k8s.io/apimachinery/pkg/types"
    47  	"k8s.io/client-go/rest"
    48  	"sigs.k8s.io/controller-runtime/pkg/builder"
    49  	"sigs.k8s.io/controller-runtime/pkg/client"
    50  	"sigs.k8s.io/controller-runtime/pkg/controller"
    51  	"sigs.k8s.io/controller-runtime/pkg/event"
    52  	"sigs.k8s.io/controller-runtime/pkg/handler"
    53  	klog "sigs.k8s.io/controller-runtime/pkg/log"
    54  	"sigs.k8s.io/controller-runtime/pkg/manager"
    55  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    56  	"sigs.k8s.io/controller-runtime/pkg/source"
    57  )
    58  
    59  const controllerName = "iampolicy-controller"
    60  
    61  var logger = klog.Log.WithName(controllerName)
    62  
    63  // Add creates a new IAM Policy Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
    64  // and start it when the Manager is started.
    65  func Add(mgr manager.Manager, tfProvider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader,
    66  	converter *conversion.Converter, dclConfig *mmdcl.Config) error {
    67  	immediateReconcileRequests := make(chan event.GenericEvent, k8s.ImmediateReconcileRequestsBufferSize)
    68  	resourceWatcherRoutines := semaphore.NewWeighted(k8s.MaxNumResourceWatcherRoutines)
    69  	reconciler, err := NewReconciler(mgr, tfProvider, smLoader, converter, dclConfig, immediateReconcileRequests, resourceWatcherRoutines)
    70  	if err != nil {
    71  		return err
    72  	}
    73  	return add(mgr, reconciler)
    74  }
    75  
    76  // NewReconciler returns a new reconcile.Reconciler.
    77  func NewReconciler(mgr manager.Manager, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, converter *conversion.Converter, dclConfig *mmdcl.Config, immediateReconcileRequests chan event.GenericEvent, resourceWatcherRoutines *semaphore.Weighted) (*ReconcileIAMPolicy, error) {
    78  	r := ReconcileIAMPolicy{
    79  		LifecycleHandler: lifecyclehandler.NewLifecycleHandler(
    80  			mgr.GetClient(),
    81  			mgr.GetEventRecorderFor(controllerName),
    82  		),
    83  		Client:                     mgr.GetClient(),
    84  		iamClient:                  iamclient.New(provider, smLoader, mgr.GetClient(), converter, dclConfig),
    85  		config:                     mgr.GetConfig(),
    86  		immediateReconcileRequests: immediateReconcileRequests,
    87  		resourceWatcherRoutines:    resourceWatcherRoutines,
    88  		scheme:                     mgr.GetScheme(),
    89  		ReconcilerMetrics: metrics.ReconcilerMetrics{
    90  			ResourceNameLabel: metrics.ResourceNameLabel,
    91  		},
    92  	}
    93  	return &r, nil
    94  }
    95  
    96  // add adds a new Controller to mgr with r as the reconcile.Reconciler.
    97  func add(mgr manager.Manager, r *ReconcileIAMPolicy) error {
    98  	obj := &iamv1beta1.IAMPolicy{}
    99  	_, err := builder.
   100  		ControllerManagedBy(mgr).
   101  		Named(controllerName).
   102  		WithOptions(controller.Options{MaxConcurrentReconciles: k8s.ControllerMaxConcurrentReconciles, RateLimiter: ratelimiter.NewRateLimiter()}).
   103  		Watches(&source.Channel{Source: r.immediateReconcileRequests}, &handler.EnqueueRequestForObject{}).
   104  		For(obj, builder.OnlyMetadata, builder.WithPredicates(predicate.UnderlyingResourceOutOfSyncPredicate{})).
   105  		Build(r)
   106  	if err != nil {
   107  		return fmt.Errorf("error creating new controller: %v", err)
   108  	}
   109  	return nil
   110  }
   111  
   112  var _ reconcile.Reconciler = &ReconcileIAMPolicy{}
   113  
   114  // ReconcileIAMPolicy is a reconciler for handling IAM policies.
   115  type ReconcileIAMPolicy struct {
   116  	lifecyclehandler.LifecycleHandler
   117  	client.Client
   118  	metrics.ReconcilerMetrics
   119  	iamClient *kcciamclient.IAMClient
   120  	scheme    *runtime.Scheme
   121  	config    *rest.Config
   122  	// Fields used for triggering reconciliations when dependencies are ready
   123  	immediateReconcileRequests chan event.GenericEvent
   124  	resourceWatcherRoutines    *semaphore.Weighted // Used to cap number of goroutines watching unready dependencies
   125  }
   126  
   127  type reconcileContext struct {
   128  	Reconciler     *ReconcileIAMPolicy
   129  	Ctx            context.Context
   130  	NamespacedName types.NamespacedName
   131  }
   132  
   133  // Reconcile checks k8s for the current state of the resource.
   134  func (r *ReconcileIAMPolicy) Reconcile(ctx context.Context, request reconcile.Request) (result reconcile.Result, err error) {
   135  	logger.Info("Running reconcile", "resource", request.NamespacedName)
   136  	startTime := time.Now()
   137  	ctx, cancel := context.WithTimeout(ctx, k8s.ReconcileDeadline)
   138  	defer cancel()
   139  	r.RecordReconcileWorkers(ctx, iamv1beta1.IAMPolicyGVK)
   140  	defer r.AfterReconcile()
   141  	defer r.RecordReconcileMetrics(ctx, iamv1beta1.IAMPolicyGVK, request.Namespace, request.Name, startTime, &err)
   142  
   143  	policy := &iamv1beta1.IAMPolicy{}
   144  	if err := r.Get(context.TODO(), request.NamespacedName, policy); err != nil {
   145  		if apierrors.IsNotFound(err) {
   146  			// Object not found, return.  Created objects are automatically garbage collected.
   147  			// For additional cleanup logic use finalizers.
   148  			return reconcile.Result{}, nil
   149  		}
   150  		// Error reading the object - requeue the request.
   151  		return reconcile.Result{}, err
   152  	}
   153  	runCtx := &reconcileContext{
   154  		Reconciler:     r,
   155  		Ctx:            ctx,
   156  		NamespacedName: request.NamespacedName,
   157  	}
   158  	requeue, err := runCtx.doReconcile(policy)
   159  	if err != nil {
   160  		return reconcile.Result{}, err
   161  	}
   162  	if requeue {
   163  		return reconcile.Result{Requeue: true}, nil
   164  	}
   165  	jitteredPeriod, err := jitter.GenerateJitteredReenqueuePeriod(iamv1beta1.IAMPolicyGVK, nil, nil, policy)
   166  	if err != nil {
   167  		return reconcile.Result{}, err
   168  	}
   169  	logger.Info("successfully finished reconcile", "resource", request.NamespacedName, "time to next reconciliation", jitteredPeriod)
   170  	return reconcile.Result{RequeueAfter: jitteredPeriod}, nil
   171  }
   172  
   173  func (r *reconcileContext) doReconcile(policy *iamv1beta1.IAMPolicy) (requeue bool, err error) {
   174  	defer execution.RecoverWithInternalError(&err)
   175  	if !policy.DeletionTimestamp.IsZero() {
   176  		if !k8s.HasFinalizer(policy, k8s.ControllerFinalizerName) {
   177  			// Resource has no controller finalizer; no finalization necessary
   178  			return false, nil
   179  		}
   180  		if k8s.HasFinalizer(policy, k8s.DeletionDefenderFinalizerName) {
   181  			// deletion defender has not yet been finalized; requeuing
   182  			logger.Info("deletion defender has not yet been finalized; requeuing", "resource", k8s.GetNamespacedName(policy))
   183  			return true, nil
   184  		}
   185  		if !k8s.HasAbandonAnnotation(policy) {
   186  			if err := r.Reconciler.iamClient.DeletePolicy(r.Ctx, policy); err != nil {
   187  				if !errors.Is(err, kcciamclient.NotFoundError) && !k8s.IsReferenceNotFoundError(err) {
   188  					if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
   189  						logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policy))
   190  						resource, err := toK8sResource(policy)
   191  						if err != nil {
   192  							return false, fmt.Errorf("error converting IAMPolicy to k8s resource while handling unresolvable dependencies event: %w", err)
   193  						}
   194  						// Requeue resource for reconciliation with exponential backoff applied
   195  						return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, unwrappedErr)
   196  					}
   197  					return false, r.handleDeleteFailed(policy, err)
   198  				}
   199  			}
   200  		}
   201  		return false, r.handleDeleted(policy)
   202  	}
   203  	if _, err := r.Reconciler.iamClient.GetPolicy(r.Ctx, policy); err != nil {
   204  		if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
   205  			logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policy))
   206  			return r.handleUnresolvableDeps(policy, unwrappedErr)
   207  		}
   208  		return false, r.handleUpdateFailed(policy, err)
   209  	}
   210  	k8s.EnsureFinalizers(policy, k8s.ControllerFinalizerName, k8s.DeletionDefenderFinalizerName)
   211  	// set the etag to an empty string, since IAMPolicy is the authoritative intent, KCC wants to overwrite the underlying policy regardless
   212  	policy.Spec.Etag = ""
   213  	if _, err = r.Reconciler.iamClient.SetPolicy(r.Ctx, policy); err != nil {
   214  		if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
   215  			logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policy))
   216  			return r.handleUnresolvableDeps(policy, unwrappedErr)
   217  		}
   218  		return false, r.handleUpdateFailed(policy, fmt.Errorf("error setting policy: %w", err))
   219  	}
   220  	if isAPIServerUpdateRequired(policy) {
   221  		return false, r.handleUpToDate(policy)
   222  	}
   223  	return false, nil
   224  }
   225  
   226  func (r *reconcileContext) handleUpToDate(policy *iamv1beta1.IAMPolicy) error {
   227  	resource, err := toK8sResource(policy)
   228  	if err != nil {
   229  		return fmt.Errorf("error converting IAMPolicy to k8s resource while handling %v event: %w", k8s.UpToDate, err)
   230  	}
   231  	return r.Reconciler.HandleUpToDate(r.Ctx, resource)
   232  }
   233  
   234  func (r *reconcileContext) handleUpdateFailed(policy *iamv1beta1.IAMPolicy, origErr error) error {
   235  	resource, err := toK8sResource(policy)
   236  	if err != nil {
   237  		logger.Error(err, "error converting IAMPolicy to k8s resource while handling event",
   238  			"resource", k8s.GetNamespacedName(policy), "event", k8s.UpdateFailed)
   239  		return fmt.Errorf("Update call failed: %w", origErr)
   240  	}
   241  	return r.Reconciler.HandleUpdateFailed(r.Ctx, resource, origErr)
   242  }
   243  
   244  func (r *reconcileContext) handleDeleted(policy *iamv1beta1.IAMPolicy) error {
   245  	resource, err := toK8sResource(policy)
   246  	if err != nil {
   247  		return fmt.Errorf("error converting IAMPolicy to k8s resource while handling %v event: %w", k8s.Deleted, err)
   248  	}
   249  	return r.Reconciler.HandleDeleted(r.Ctx, resource)
   250  }
   251  
   252  func (r *reconcileContext) handleDeleteFailed(policy *iamv1beta1.IAMPolicy, origErr error) error {
   253  	resource, err := toK8sResource(policy)
   254  	if err != nil {
   255  		logger.Error(err, "error converting IAMPolicy to k8s resource while handling event",
   256  			"resource", k8s.GetNamespacedName(policy), "event", k8s.DeleteFailed)
   257  		return fmt.Errorf(k8s.DeleteFailedMessageTmpl, origErr)
   258  	}
   259  	return r.Reconciler.HandleDeleteFailed(r.Ctx, resource, origErr)
   260  }
   261  
   262  func (r *ReconcileIAMPolicy) supportsImmediateReconciliations() bool {
   263  	return r.immediateReconcileRequests != nil
   264  }
   265  
   266  func (r *reconcileContext) handleUnresolvableDeps(policy *iamv1beta1.IAMPolicy, origErr error) (requeue bool, err error) {
   267  	resource, err := toK8sResource(policy)
   268  	if err != nil {
   269  		return false, fmt.Errorf("error converting IAMPolicy to k8s resource while handling unresolvable dependencies event: %w", err)
   270  	}
   271  	refGVK, refNN, ok := lifecyclehandler.CausedByUnreadyOrNonexistentResourceRefs(origErr)
   272  	if !ok || !r.Reconciler.supportsImmediateReconciliations() {
   273  		// Requeue resource for reconciliation with exponential backoff applied
   274  		return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
   275  	}
   276  	// Check that the number of active resource watches
   277  	// does not exceed the controller's cap. If the
   278  	// capacity is not exceeded, The number of active
   279  	// resource watches is incremented by one and a watch
   280  	// is started
   281  	if !r.Reconciler.resourceWatcherRoutines.TryAcquire(1) {
   282  		// Requeue resource for reconciliation with exponential backoff applied
   283  		return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
   284  	}
   285  	// Create a logger for ResourceWatcher that contains info
   286  	// about the referencing resource. This is done since the
   287  	// messages logged by ResourceWatcher only include the
   288  	// information of the resource it is watching by default.
   289  	watcherLogger := logger.WithValues(
   290  		"referencingResource", resource.GetNamespacedName(),
   291  		"referencingResourceGVK", resource.GroupVersionKind())
   292  	watcher, err := resourcewatcher.New(r.Reconciler.config, watcherLogger)
   293  	if err != nil {
   294  		return false, r.Reconciler.HandleUpdateFailed(r.Ctx, resource, fmt.Errorf("error initializing new resourcewatcher: %w", err))
   295  	}
   296  
   297  	logger := logger.WithValues(
   298  		"resource", resource.GetNamespacedName(),
   299  		"resourceGVK", resource.GroupVersionKind(),
   300  		"reference", refNN,
   301  		"referenceGVK", refGVK)
   302  	go func() {
   303  		// Decrement the count of active resource watches after
   304  		// the watch finishes
   305  		defer r.Reconciler.resourceWatcherRoutines.Release(1)
   306  		timeoutPeriod := jitter.GenerateWatchJitteredTimeoutPeriod()
   307  		ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
   308  		defer cancel()
   309  		logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
   310  		if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
   311  			logger.Error(err, "error while waiting for resource's reference to be ready")
   312  			return
   313  		}
   314  		logger.Info("enqueuing resource for immediate reconciliation now that its reference is ready")
   315  		r.Reconciler.enqueueForImmediateReconciliation(resource.GetNamespacedName())
   316  	}()
   317  
   318  	// Do not requeue resource for immediate reconciliation. Wait for either
   319  	// the next periodic reconciliation or for the referenced resource to be ready (which
   320  	// triggers a reconciliation), whichever comes first.
   321  	return false, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
   322  }
   323  
   324  // enqueueForImmediateReconciliation enqueues the given resource for immediate
   325  // reconciliation. Note that this function only takes in the name and namespace
   326  // of the resource and not its GVK since the controller instance that this
   327  // reconcile instance belongs to can only reconcile resources of one GVK.
   328  func (r *ReconcileIAMPolicy) enqueueForImmediateReconciliation(resourceNN types.NamespacedName) {
   329  	genEvent := event.GenericEvent{}
   330  	genEvent.Object = &unstructured.Unstructured{}
   331  	genEvent.Object.SetNamespace(resourceNN.Namespace)
   332  	genEvent.Object.SetName(resourceNN.Name)
   333  	r.immediateReconcileRequests <- genEvent
   334  }
   335  
   336  func isAPIServerUpdateRequired(policy *iamv1beta1.IAMPolicy) bool {
   337  	// TODO: even in the event of an actual update to GCP, this function will
   338  	// return false because the condition comparison doesn't account for time.
   339  	conditions := []condition.Condition{
   340  		k8s.NewCustomReadyCondition(corev1.ConditionTrue, k8s.UpToDate, k8s.UpToDateMessage),
   341  	}
   342  	if !k8s.ConditionSlicesEqual(policy.Status.Conditions, conditions) {
   343  		return true
   344  	}
   345  	if policy.Status.ObservedGeneration != policy.GetGeneration() {
   346  		return true
   347  	}
   348  	return false
   349  }
   350  
   351  func toK8sResource(policy *iamv1beta1.IAMPolicy) (*k8s.Resource, error) {
   352  	iamclient.SetGVK(policy)
   353  	resource := k8s.Resource{}
   354  	if err := util.Marshal(policy, &resource); err != nil {
   355  		return nil, fmt.Errorf("error marshalling IAMPolicy to k8s resource: %w", err)
   356  	}
   357  	return &resource, nil
   358  }
   359  

View as plain text