...

Source file src/github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/auditconfig/iamauditconfig_controller.go

Documentation: github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/auditconfig

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package auditconfig
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"time"
    22  
    23  	iamv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/iam/v1beta1"
    24  	condition "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1"
    25  	kcciamclient "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/iamclient"
    26  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/jitter"
    27  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler"
    28  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/metrics"
    29  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
    30  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/ratelimiter"
    31  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher"
    32  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/dcl/conversion"
    33  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/execution"
    34  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
    35  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/servicemapping/servicemappingloader"
    36  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/util"
    37  
    38  	mmdcl "github.com/GoogleCloudPlatform/declarative-resource-client-library/dcl"
    39  	tfschema "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
    40  	"golang.org/x/sync/semaphore"
    41  	corev1 "k8s.io/api/core/v1"
    42  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    43  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    44  	"k8s.io/apimachinery/pkg/runtime"
    45  	"k8s.io/apimachinery/pkg/types"
    46  	"k8s.io/client-go/rest"
    47  	"sigs.k8s.io/controller-runtime/pkg/builder"
    48  	"sigs.k8s.io/controller-runtime/pkg/client"
    49  	"sigs.k8s.io/controller-runtime/pkg/controller"
    50  	"sigs.k8s.io/controller-runtime/pkg/event"
    51  	"sigs.k8s.io/controller-runtime/pkg/handler"
    52  	klog "sigs.k8s.io/controller-runtime/pkg/log"
    53  	"sigs.k8s.io/controller-runtime/pkg/manager"
    54  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    55  	"sigs.k8s.io/controller-runtime/pkg/source"
    56  )
    57  
    58  const controllerName = "iamauditconfig-controller"
    59  
    60  var logger = klog.Log.WithName(controllerName)
    61  
    62  func Add(mgr manager.Manager, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader,
    63  	converter *conversion.Converter, dclConfig *mmdcl.Config) error {
    64  	immediateReconcileRequests := make(chan event.GenericEvent, k8s.ImmediateReconcileRequestsBufferSize)
    65  	resourceWatcherRoutines := semaphore.NewWeighted(k8s.MaxNumResourceWatcherRoutines)
    66  	reconciler, err := NewReconciler(mgr, provider, smLoader, converter, dclConfig, immediateReconcileRequests, resourceWatcherRoutines)
    67  	if err != nil {
    68  		return err
    69  	}
    70  	return add(mgr, reconciler)
    71  }
    72  
    73  func NewReconciler(mgr manager.Manager, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, converter *conversion.Converter, dclConfig *mmdcl.Config, immediateReconcileRequests chan event.GenericEvent, resourceWatcherRoutines *semaphore.Weighted) (*Reconciler, error) {
    74  	r := Reconciler{
    75  		LifecycleHandler: lifecyclehandler.NewLifecycleHandler(
    76  			mgr.GetClient(),
    77  			mgr.GetEventRecorderFor(controllerName),
    78  		),
    79  		Client:                     mgr.GetClient(),
    80  		iamClient:                  kcciamclient.New(provider, smLoader, mgr.GetClient(), converter, dclConfig).TFIAMClient,
    81  		scheme:                     mgr.GetScheme(),
    82  		config:                     mgr.GetConfig(),
    83  		immediateReconcileRequests: immediateReconcileRequests,
    84  		resourceWatcherRoutines:    resourceWatcherRoutines,
    85  	}
    86  	return &r, nil
    87  }
    88  
    89  // add adds a new Controller to mgr with r as the reconcile.Reconciler.
    90  func add(mgr manager.Manager, r *Reconciler) error {
    91  	obj := &iamv1beta1.IAMAuditConfig{}
    92  	_, err := builder.
    93  		ControllerManagedBy(mgr).
    94  		Named(controllerName).
    95  		WithOptions(controller.Options{MaxConcurrentReconciles: k8s.ControllerMaxConcurrentReconciles, RateLimiter: ratelimiter.NewRateLimiter()}).
    96  		Watches(&source.Channel{Source: r.immediateReconcileRequests}, &handler.EnqueueRequestForObject{}).
    97  		For(obj, builder.OnlyMetadata, builder.WithPredicates(predicate.UnderlyingResourceOutOfSyncPredicate{})).
    98  		Build(r)
    99  	if err != nil {
   100  		return fmt.Errorf("error creating new controller: %v", err)
   101  	}
   102  	return nil
   103  }
   104  
   105  var _ reconcile.Reconciler = &Reconciler{}
   106  
   107  type Reconciler struct {
   108  	lifecyclehandler.LifecycleHandler
   109  	client.Client
   110  	metrics.ReconcilerMetrics
   111  	iamClient *kcciamclient.TFIAMClient
   112  	scheme    *runtime.Scheme
   113  	config    *rest.Config
   114  	// Fields used for triggering reconciliations when dependencies are ready
   115  	immediateReconcileRequests chan event.GenericEvent
   116  	resourceWatcherRoutines    *semaphore.Weighted // Used to cap number of goroutines watching unready dependencies
   117  }
   118  
   119  type reconcileContext struct {
   120  	Reconciler     *Reconciler
   121  	Ctx            context.Context
   122  	NamespacedName types.NamespacedName
   123  }
   124  
   125  func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (result reconcile.Result, err error) {
   126  	logger.Info("Starting reconcile", "resource", request.NamespacedName)
   127  	startTime := time.Now()
   128  	ctx, cancel := context.WithTimeout(ctx, k8s.ReconcileDeadline)
   129  	defer cancel()
   130  	r.RecordReconcileWorkers(ctx, iamv1beta1.IAMAuditConfigGVK)
   131  	defer r.AfterReconcile()
   132  	defer r.RecordReconcileMetrics(ctx, iamv1beta1.IAMAuditConfigGVK, request.Namespace, request.Name, startTime, &err)
   133  
   134  	var auditConfig iamv1beta1.IAMAuditConfig
   135  	if err := r.Get(context.TODO(), request.NamespacedName, &auditConfig); err != nil {
   136  		if apierrors.IsNotFound(err) {
   137  			logger.Info("resource not found in API server; finishing reconcile", "resource", request.NamespacedName)
   138  			return reconcile.Result{}, nil
   139  		}
   140  		return reconcile.Result{}, err
   141  	}
   142  	reconcileContext := &reconcileContext{
   143  		Reconciler:     r,
   144  		Ctx:            ctx,
   145  		NamespacedName: request.NamespacedName,
   146  	}
   147  	requeue, err := reconcileContext.doReconcile(&auditConfig)
   148  	if err != nil {
   149  		return reconcile.Result{}, err
   150  	}
   151  	if requeue {
   152  		return reconcile.Result{Requeue: true}, nil
   153  	}
   154  	jitteredPeriod, err := jitter.GenerateJitteredReenqueuePeriod(iamv1beta1.IAMAuditConfigGVK, nil, nil, &auditConfig)
   155  	if err != nil {
   156  		return reconcile.Result{}, err
   157  	}
   158  	logger.Info("successfully finished reconcile", "resource", request.NamespacedName, "time to next reconciliation", jitteredPeriod)
   159  	return reconcile.Result{RequeueAfter: jitteredPeriod}, nil
   160  }
   161  
   162  func (r *reconcileContext) doReconcile(auditConfig *iamv1beta1.IAMAuditConfig) (requeue bool, err error) {
   163  	defer execution.RecoverWithInternalError(&err)
   164  	if !auditConfig.DeletionTimestamp.IsZero() {
   165  		if !k8s.HasFinalizer(auditConfig, k8s.ControllerFinalizerName) {
   166  			// Resource has no controller finalizer; no finalization necessary
   167  			return false, nil
   168  		}
   169  		if k8s.HasFinalizer(auditConfig, k8s.DeletionDefenderFinalizerName) {
   170  			// Deletion defender has not yet been finalized; requeuing
   171  			logger.Info("deletion defender has not yet been finalized; requeuing", "resource", k8s.GetNamespacedName(auditConfig))
   172  			return true, nil
   173  		}
   174  		if !k8s.HasAbandonAnnotation(auditConfig) {
   175  			if err := r.Reconciler.iamClient.DeleteAuditConfig(r.Ctx, auditConfig); err != nil {
   176  				if !errors.Is(err, kcciamclient.NotFoundError) && !k8s.IsReferenceNotFoundError(err) {
   177  					if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
   178  						logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(auditConfig))
   179  						resource, err := ToK8sResource(auditConfig)
   180  						if err != nil {
   181  							return false, fmt.Errorf("error converting IAMAuditConfig to k8s resource while handling unresolvable dependencies event: %w", err)
   182  						}
   183  						// Requeue resource for reconciliation with exponential backoff applied
   184  						return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, unwrappedErr)
   185  					}
   186  					return false, r.handleDeleteFailed(auditConfig, err)
   187  				}
   188  			}
   189  		}
   190  		return false, r.handleDeleted(auditConfig)
   191  	}
   192  	if _, err := r.Reconciler.iamClient.GetAuditConfig(r.Ctx, auditConfig); err != nil {
   193  		if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
   194  			logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(auditConfig))
   195  			return r.handleUnresolvableDeps(auditConfig, unwrappedErr)
   196  		}
   197  		if !errors.Is(err, kcciamclient.NotFoundError) {
   198  			return false, r.handleUpdateFailed(auditConfig, err)
   199  		}
   200  	}
   201  	if !k8s.EnsureFinalizers(auditConfig, k8s.ControllerFinalizerName, k8s.DeletionDefenderFinalizerName) {
   202  		if err := r.update(auditConfig); err != nil {
   203  			return false, r.handleUpdateFailed(auditConfig, err)
   204  		}
   205  	}
   206  	if _, err := r.Reconciler.iamClient.SetAuditConfig(r.Ctx, auditConfig); err != nil {
   207  		if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
   208  			logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(auditConfig))
   209  			return r.handleUnresolvableDeps(auditConfig, unwrappedErr)
   210  		}
   211  		return false, r.handleUpdateFailed(auditConfig, fmt.Errorf("error setting audit config: %w", err))
   212  	}
   213  	if isAPIServerUpdateRequired(auditConfig) {
   214  		return false, r.handleUpToDate(auditConfig)
   215  	}
   216  	return false, nil
   217  }
   218  
   219  func (r *reconcileContext) update(auditConfig *iamv1beta1.IAMAuditConfig) error {
   220  	if err := r.Reconciler.Client.Update(r.Ctx, auditConfig); err != nil {
   221  		return fmt.Errorf("error updating '%v' in API server: %w", r.NamespacedName, err)
   222  	}
   223  	return nil
   224  }
   225  
   226  func (r *reconcileContext) handleUpToDate(auditConfig *iamv1beta1.IAMAuditConfig) error {
   227  	resource, err := ToK8sResource(auditConfig)
   228  	if err != nil {
   229  		return fmt.Errorf("error converting IAMAuditConfig to k8s resource while handling %v event: %w", k8s.UpToDate, err)
   230  	}
   231  	return r.Reconciler.HandleUpToDate(r.Ctx, resource)
   232  }
   233  
   234  func (r *reconcileContext) handleUpdateFailed(auditConfig *iamv1beta1.IAMAuditConfig, origErr error) error {
   235  	resource, err := ToK8sResource(auditConfig)
   236  	if err != nil {
   237  		logger.Error(err, "error converting IAMAuditConfig to k8s resource while handling event",
   238  			"resource", k8s.GetNamespacedName(auditConfig), "event", k8s.UpdateFailed)
   239  		return fmt.Errorf("Update call failed: %w", origErr)
   240  	}
   241  	return r.Reconciler.HandleUpdateFailed(r.Ctx, resource, origErr)
   242  }
   243  
   244  func (r *reconcileContext) handleDeleted(auditConfig *iamv1beta1.IAMAuditConfig) error {
   245  	resource, err := ToK8sResource(auditConfig)
   246  	if err != nil {
   247  		return fmt.Errorf("error converting IAMAuditConfig to k8s resource while handling %v event: %w", k8s.Deleted, err)
   248  	}
   249  	return r.Reconciler.HandleDeleted(r.Ctx, resource)
   250  }
   251  
   252  func (r *reconcileContext) handleDeleteFailed(auditConfig *iamv1beta1.IAMAuditConfig, origErr error) error {
   253  	resource, err := ToK8sResource(auditConfig)
   254  	if err != nil {
   255  		logger.Error(err, "error converting IAMAuditConfig to k8s resource while handling event",
   256  			"resource", k8s.GetNamespacedName(auditConfig), "event", k8s.DeleteFailed)
   257  		return fmt.Errorf(k8s.DeleteFailedMessageTmpl, origErr)
   258  	}
   259  	return r.Reconciler.HandleDeleteFailed(r.Ctx, resource, origErr)
   260  }
   261  
   262  func (r *Reconciler) supportsImmediateReconciliations() bool {
   263  	return r.immediateReconcileRequests != nil
   264  }
   265  
   266  func (r *reconcileContext) handleUnresolvableDeps(auditConfig *iamv1beta1.IAMAuditConfig, origErr error) (requeue bool, err error) {
   267  	resource, err := ToK8sResource(auditConfig)
   268  	if err != nil {
   269  		return false, fmt.Errorf("error converting IAMAuditConfig to k8s resource while handling unresolvable dependencies event: %w", err)
   270  	}
   271  	refGVK, refNN, ok := lifecyclehandler.CausedByUnreadyOrNonexistentResourceRefs(origErr)
   272  	if !ok || !r.Reconciler.supportsImmediateReconciliations() {
   273  		// Requeue resource for reconciliation with exponential backoff applied
   274  		return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
   275  	}
   276  	// Check that the number of active resource watches
   277  	// does not exceed the controller's cap. If the
   278  	// capacity is not exceeded, The number of active
   279  	// resource watches is incremented by one and a watch
   280  	// is started
   281  	if !r.Reconciler.resourceWatcherRoutines.TryAcquire(1) {
   282  		// Requeue resource for reconciliation with exponential backoff applied
   283  		return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
   284  	}
   285  	// Create a logger for ResourceWatcher that contains info
   286  	// about the referencing resource. This is done since the
   287  	// messages logged by ResourceWatcher only include the
   288  	// information of the resource it is watching by default.
   289  	watcherLogger := logger.WithValues(
   290  		"referencingResource", resource.GetNamespacedName(),
   291  		"referencingResourceGVK", resource.GroupVersionKind())
   292  	watcher, err := resourcewatcher.New(r.Reconciler.config, watcherLogger)
   293  	if err != nil {
   294  		return false, r.Reconciler.HandleUpdateFailed(r.Ctx, resource, fmt.Errorf("error initializing new resourcewatcher: %w", err))
   295  	}
   296  
   297  	logger := logger.WithValues(
   298  		"resource", resource.GetNamespacedName(),
   299  		"resourceGVK", resource.GroupVersionKind(),
   300  		"reference", refNN,
   301  		"referenceGVK", refGVK)
   302  	go func() {
   303  		// Decrement the count of active resource watches after
   304  		// the watch finishes
   305  		defer r.Reconciler.resourceWatcherRoutines.Release(1)
   306  		timeoutPeriod := jitter.GenerateWatchJitteredTimeoutPeriod()
   307  		ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
   308  		defer cancel()
   309  		logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
   310  		if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
   311  			logger.Error(err, "error while waiting for resource's reference to be ready")
   312  			return
   313  		}
   314  		logger.Info("enqueuing resource for immediate reconciliation now that its reference is ready")
   315  		r.Reconciler.enqueueForImmediateReconciliation(resource.GetNamespacedName())
   316  	}()
   317  
   318  	// Do not requeue resource for immediate reconciliation. Wait for either
   319  	// the next periodic reconciliation or for the referenced resource to be ready (which
   320  	// triggers a reconciliation), whichever comes first.
   321  	return false, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
   322  }
   323  
   324  // enqueueForImmediateReconciliation enqueues the given resource for immediate
   325  // reconciliation. Note that this function only takes in the name and namespace
   326  // of the resource and not its GVK since the controller instance that this
   327  // reconcile instance belongs to can only reconcile resources of one GVK.
   328  func (r *Reconciler) enqueueForImmediateReconciliation(resourceNN types.NamespacedName) {
   329  	genEvent := event.GenericEvent{}
   330  	genEvent.Object = &unstructured.Unstructured{}
   331  	genEvent.Object.SetNamespace(resourceNN.Namespace)
   332  	genEvent.Object.SetName(resourceNN.Name)
   333  	r.immediateReconcileRequests <- genEvent
   334  }
   335  
   336  func isAPIServerUpdateRequired(auditConfig *iamv1beta1.IAMAuditConfig) bool {
   337  	// TODO: even in the event of an actual update to GCP, this function will
   338  	// return false because the condition comparison doesn't account for time.
   339  	conditions := []condition.Condition{
   340  		k8s.NewCustomReadyCondition(corev1.ConditionTrue, k8s.UpToDate, k8s.UpToDateMessage),
   341  	}
   342  	if !k8s.ConditionSlicesEqual(auditConfig.Status.Conditions, conditions) {
   343  		return true
   344  	}
   345  	if auditConfig.Status.ObservedGeneration != auditConfig.GetGeneration() {
   346  		return true
   347  	}
   348  	return false
   349  }
   350  
   351  func ToK8sResource(auditConfig *iamv1beta1.IAMAuditConfig) (*k8s.Resource, error) {
   352  	kcciamclient.SetGVK(auditConfig)
   353  	resource := k8s.Resource{}
   354  	if err := util.Marshal(auditConfig, &resource); err != nil {
   355  		return nil, fmt.Errorf("error marshalling IAMAuditConfig to k8s resource: %w", err)
   356  	}
   357  	return &resource, nil
   358  }
   359  

View as plain text