...

Source file src/edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/internal/reconciler.go

Documentation: edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/internal

     1  package internal
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	"github.com/google/go-containerregistry/pkg/authn"
     8  	"github.com/google/go-containerregistry/pkg/name"
     9  	"github.com/google/go-containerregistry/pkg/v1/google"
    10  	ctrl "sigs.k8s.io/controller-runtime"
    11  	"sigs.k8s.io/controller-runtime/pkg/client"
    12  
    13  	"edge-infra.dev/pkg/f8n/warehouse/cluster"
    14  	whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
    15  	"edge-infra.dev/pkg/f8n/warehouse/k8s/kauth"
    16  	"edge-infra.dev/pkg/f8n/warehouse/oci"
    17  	"edge-infra.dev/pkg/f8n/warehouse/oci/cache"
    18  	"edge-infra.dev/pkg/f8n/warehouse/oci/remote"
    19  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    20  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    21  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    22  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    23  	"edge-infra.dev/pkg/k8s/runtime/patch"
    24  	"edge-infra.dev/pkg/k8s/runtime/sap"
    25  	"edge-infra.dev/pkg/k8s/runtime/statusreaders"
    26  )
    27  
    28  const (
    29  	// reconciler name for all Lumper reconcilers
    30  	controllerName = "lumperctl"
    31  )
    32  
    33  // Reconciler implements the base reconcile functionality needed for Lumper
    34  // controllers.
    35  type Reconciler struct {
    36  	client.Client
    37  
    38  	// Provider is the cluster provider that this controller is deployed to
    39  	Provider cluster.Provider
    40  
    41  	// Name is the controller's name, used to consistently represent the controller
    42  	// in various cluster interactions, e.g., as field manager
    43  	Name string
    44  
    45  	// ServiceAccount is the name of the service account used to deploy the
    46  	// controller
    47  	ServiceAccount string
    48  
    49  	// Namespace is the Namespace where the controller is deployed
    50  	Namespace string
    51  
    52  	// ResourceManager is a server-side apply client that can poll for the resources
    53  	// we are applying to the server
    54  	ResourceManager *sap.ResourceManager
    55  
    56  	// OwnerGroupLabel is the resource group used to build the full label applied
    57  	// to owned resources applied via server-side apply
    58  	OwnerGroupLabel string
    59  
    60  	// Conditions is the reconcile conditions configuration that defines the
    61  	// condition management behavior for this reconciler
    62  	Conditions reconcile.Conditions
    63  
    64  	// Metrics records condition, duration, and suspension metrics by default
    65  	Metrics metrics.Metrics
    66  
    67  	// OCI artifact pull-through cache
    68  	Cache cache.Cache
    69  
    70  	// Liveness results
    71  	LivenessChecker *LivenessChecker
    72  }
    73  
    74  func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
    75  	r.Name = controllerName
    76  	r.Client = mgr.GetClient()
    77  
    78  	if r.OwnerGroupLabel == "" {
    79  		return fmt.Errorf("reconciler OwnerGroupLable must be set")
    80  	}
    81  	if r.Conditions.IsEmpty() {
    82  		return fmt.Errorf("non-empty target condition and conditions to summarize "+
    83  			"reconcile.Conditions is required. reconcile.Conditions: %v", r.Conditions)
    84  	}
    85  
    86  	var err error
    87  	r.ResourceManager, err = sap.NewResourceManagerWithStatusReader(
    88  		mgr.GetConfig(),
    89  		client.Options{},
    90  		sap.Owner{Field: r.Name, Group: r.OwnerGroupLabel},
    91  		&statusreaders.CalicoStatusReader{},
    92  	)
    93  	if err != nil {
    94  		return fmt.Errorf("failed to create K8s resource manager: %w", err)
    95  	}
    96  
    97  	return nil
    98  }
    99  
   100  func (r *Reconciler) PatchOpts() []patch.Option {
   101  	return []patch.Option{
   102  		patch.WithOwnedConditions{Conditions: r.Conditions.Owned},
   103  		patch.WithFieldOwner(r.Name),
   104  	}
   105  }
   106  
   107  // Keychain creates an OCI registry auth keychain that handles, in order:
   108  //
   109  // - Google authentication (worload identity, default app creds, etc)
   110  // - K8s authentication (via image pull secrets based on the controller's ServiceAccount/Namespace)
   111  // - Default keychain (via docker creds helpers)
   112  func (r *Reconciler) Keychain(ctx context.Context, obj conditions.Setter) (authn.Keychain, recerr.Error) {
   113  	k8s, err := kauth.New(ctx, r.Client, kauth.Options{
   114  		ServiceAccountName: r.ServiceAccount,
   115  		Namespace:          r.Namespace,
   116  	})
   117  	if err != nil {
   118  		recErr := recerr.New(
   119  			fmt.Errorf("failed to build registry keychain: %w", err),
   120  			whv1.FetchFailedReason,
   121  		)
   122  		recErr.ToCondition(obj, whv1.FetchedArtifactCondition)
   123  		return nil, recErr
   124  	}
   125  
   126  	return authn.NewMultiKeychain(
   127  		google.Keychain,
   128  		k8s,
   129  		authn.DefaultKeychain,
   130  	), nil
   131  }
   132  
   133  // Fetch is a cache-aware a package puller based on the package pull policy that
   134  // returns the fetched artifact and a decorated logger with information about
   135  // the artifact. It also updates the FetchSucceededCondition for obj.
   136  func (r *Reconciler) Fetch(
   137  	ctx context.Context,
   138  	obj conditions.Setter,
   139  	ref name.Reference,
   140  	policy whv1.PullPolicy,
   141  	remoteOpts ...remote.Option,
   142  ) (oci.Artifact, recerr.Error) {
   143  	log := ctrl.LoggerFrom(ctx)
   144  
   145  	// Reset condition
   146  	conditions.Delete(obj, whv1.FetchedArtifactCondition)
   147  
   148  	opts := []cache.GetOption{
   149  		cache.WithRemoteOpts(remoteOpts...),
   150  	}
   151  	// Resolve the tag before hitting cache if we are always supposed to pull
   152  	// packages.
   153  	// Because we don't support Never, everything else is treated as IfNotPresent
   154  	if policy == whv1.Always {
   155  		opts = append(opts, cache.ResolveTag())
   156  	}
   157  
   158  	// TODO: http transport? handling authentication (see k8schain for workload identity,
   159  	//			 what about google app creds?)
   160  	// TODO: support u.Spec.Credentials for authentication
   161  	a, err := r.Cache.Get(ref, opts...)
   162  	if err != nil {
   163  		recErr := recerr.New(err, whv1.FetchFailedReason)
   164  		recErr.ToCondition(obj, whv1.FetchedArtifactCondition)
   165  		return nil, recErr
   166  	}
   167  	log.Info("fetched artifact")
   168  	// Reflect successful fetch
   169  	// TODO(aw18576): We should plumb more context through this function so that
   170  	// we know the artifact digest and can include it in the message, maybe
   171  	// information about whether or not we hit the cache.
   172  	conditions.MarkTrue(obj, whv1.FetchedArtifactCondition, whv1.FetchSucceededReason,
   173  		"Successfully retrieved artifact.")
   174  	return a, nil
   175  }
   176  

View as plain text