...

Source file src/edge-infra.dev/pkg/edge/controllers/syncedobject/controller.go

Documentation: edge-infra.dev/pkg/edge/controllers/syncedobject

     1  package syncedobject
     2  
     3  import (
     4  	"context"
     5  	"encoding/base64"
     6  	"encoding/json"
     7  	"fmt"
     8  	"regexp"
     9  	"time"
    10  
    11  	"cloud.google.com/go/pubsub"
    12  	certmgr "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
    13  	"github.com/go-logr/logr"
    14  	"github.com/google/uuid"
    15  	"k8s.io/apimachinery/pkg/runtime"
    16  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    17  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    18  	ctrl "sigs.k8s.io/controller-runtime"
    19  	"sigs.k8s.io/controller-runtime/pkg/builder"
    20  	"sigs.k8s.io/controller-runtime/pkg/client"
    21  	k8sctrl "sigs.k8s.io/controller-runtime/pkg/controller"
    22  	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    23  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    24  
    25  	soapi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1"
    26  	"edge-infra.dev/pkg/edge/chariot"
    27  	chariotClientApi "edge-infra.dev/pkg/edge/chariot/client"
    28  	"edge-infra.dev/pkg/k8s/meta/status"
    29  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    30  	"edge-infra.dev/pkg/k8s/runtime/controller"
    31  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    32  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    33  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    34  	"edge-infra.dev/pkg/k8s/runtime/patch"
    35  	"edge-infra.dev/pkg/lib/fog"
    36  )
    37  
    38  const SyncedObjectChariotOwner = "SyncedObjectController"
    39  
    40  // syncedObjectConditions is the reconcile summarization configuration for how
    41  // various conditions should be taken into account when the final condition is
    42  // summarized
    43  var syncedObjectConditions = reconcile.Conditions{
    44  	Target: status.ReadyCondition,
    45  	Owned: []string{
    46  		status.ReadyCondition,
    47  		status.ReconcilingCondition,
    48  		status.StalledCondition,
    49  	},
    50  	Summarize: []string{
    51  		status.StalledCondition,
    52  	},
    53  	NegativePolarity: []string{
    54  		status.ReconcilingCondition,
    55  		status.StalledCondition,
    56  	},
    57  }
    58  
    59  // +kubebuilder:rbac:groups=edge.ncr.com,resources=syncedobjects,verbs=create;get;list;update;delete;patch;watch
    60  // +kubebuilder:rbac:groups=edge.ncr.com,resources=syncedobjects/status,verbs=get;update;patch
    61  // +kubebuilder:rbac:groups="pubsub.cnrm.cloud.google.com",resources=pubsubtopics,verbs=get;create;list;watch;patch
    62  // +kubebuilder:rbac:groups="pubsub.cnrm.cloud.google.com",resources=pubsubtopics/status,verbs=get;watch
    63  
    64  type Reconciler struct {
    65  	client.Client
    66  	Log        logr.Logger
    67  	Name       string
    68  	CrdMetrics metrics.Metrics
    69  	Conditions reconcile.Conditions
    70  	Cfg        *Config
    71  }
    72  
    73  func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
    74  	return ctrl.NewControllerManagedBy(mgr).
    75  		For(&soapi.SyncedObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
    76  		WithOptions(k8sctrl.Options{
    77  			MaxConcurrentReconciles: r.Cfg.Concurrency,
    78  		}).
    79  		Complete(r)
    80  }
    81  
    82  func (r *Reconciler) PatchOpts() []patch.Option {
    83  	return []patch.Option{
    84  		patch.WithOwnedConditions{Conditions: r.Conditions.Owned},
    85  		patch.WithFieldOwner(r.Name),
    86  	}
    87  }
    88  
    89  // Run creates the manager, sets up the controller, and then starts
    90  // everything.  It returns the created manager for testing purposes
    91  func Run(ctx context.Context, cfg *Config) error {
    92  	var log = fog.FromContext(ctx)
    93  
    94  	mgr, err := cfg.CreateMgr()
    95  	if err != nil {
    96  		log.Error(err, "failed to create manager")
    97  		return err
    98  	}
    99  
   100  	log.Info("starting manager")
   101  	if err := mgr.Start(ctx); err != nil {
   102  		log.Error(err, "problem running manager")
   103  		return err
   104  	}
   105  	return nil
   106  }
   107  
   108  // CreateMgr wires up the reconciler(s) with a created manager and returns the manager + setup logger
   109  //
   110  // This method is directly passed into the `test/f2/x/ktest.WithCtrlManager` function.
   111  func (cfg *Config) CreateMgr(o ...controller.Option) (ctrl.Manager, error) {
   112  	var log = fog.New()
   113  	ctrl.SetLogger(log)
   114  
   115  	o = append(o, controller.WithMetricsAddress(cfg.MetricsAddr))
   116  	ctlCfg, opts := controller.ProcessOptions(o...)
   117  	opts.LeaderElectionID = "syncedobject.edge.ncr.com"
   118  	opts.Scheme = createScheme()
   119  
   120  	mgr, err := ctrl.NewManager(ctlCfg, opts)
   121  	if err != nil {
   122  		log.Error(err, "unable to create manager")
   123  		return nil, err
   124  	}
   125  
   126  	var r = &Reconciler{
   127  		Client:     mgr.GetClient(),
   128  		Log:        log.WithName("syncedobject-reconciler"),
   129  		Name:       "syncedobject-controller",
   130  		CrdMetrics: metrics.New(mgr, "soctl"),
   131  		Conditions: syncedObjectConditions,
   132  		Cfg:        cfg,
   133  	}
   134  	if err = r.SetupWithManager(mgr); err != nil {
   135  		log.Error(err, "failed to setup controller with manager")
   136  		return nil, err
   137  	}
   138  	return mgr, nil
   139  }
   140  
   141  func createScheme() *runtime.Scheme {
   142  	scheme := runtime.NewScheme()
   143  
   144  	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
   145  	utilruntime.Must(soapi.AddToScheme(scheme))
   146  	utilruntime.Must(certmgr.AddToScheme(scheme))
   147  
   148  	return scheme
   149  }
   150  
   151  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
   152  	var (
   153  		reconcileStart = time.Now()
   154  		log            = ctrl.LoggerFrom(ctx)
   155  		result         = reconcile.ResultEmpty
   156  		so             = &soapi.SyncedObject{}
   157  	)
   158  
   159  	if err := r.Get(ctx, req.NamespacedName, so); err != nil {
   160  		return ctrl.Result{}, client.IgnoreNotFound(err)
   161  	}
   162  	log = log.WithValues("info", so.SyncedObjectInfo())
   163  	ctx = logr.NewContext(ctx, log)
   164  	r.CrdMetrics.RecordReconciling(ctx, so)
   165  
   166  	patcher := patch.NewSerialPatcher(so, r.Client)
   167  
   168  	defer func() {
   169  		if recErr != nil {
   170  			reconcileErr, ok := recErr.(recerr.Error)
   171  			if !ok {
   172  				reconcileErr = recerr.New(recErr, soapi.ReconcileFailedReason)
   173  			}
   174  			reconcileErr.ToCondition(so, status.ReadyCondition)
   175  		}
   176  
   177  		summarizer := reconcile.NewSummarizer(patcher)
   178  		res, recErr = summarizer.SummarizeAndPatch(ctx, so, []reconcile.SummarizeOption{
   179  			reconcile.WithConditions(r.Conditions),
   180  			reconcile.WithResult(result),
   181  			reconcile.WithError(recErr),
   182  			reconcile.WithIgnoreNotFound(),
   183  			reconcile.WithProcessors(
   184  				reconcile.RecordReconcileReq,
   185  				reconcile.RecordResult,
   186  			),
   187  			reconcile.WithFieldOwner(r.Name),
   188  		}...)
   189  
   190  		r.CrdMetrics.RecordDuration(ctx, so, reconcileStart)
   191  		r.CrdMetrics.RecordReadiness(ctx, so)
   192  	}()
   193  
   194  	// Check if finalizer exists
   195  	if !controllerutil.ContainsFinalizer(so, soapi.Finalizer) {
   196  		if !controllerutil.AddFinalizer(so, soapi.Finalizer) {
   197  			recErr = fmt.Errorf("finalizer not added")
   198  			log.Error(recErr, "AddFinalizer returned false")
   199  		}
   200  		// Requeue immediately and reconcile the object with the finalizer added
   201  		result = reconcile.ResultRequeue
   202  		return
   203  	}
   204  
   205  	if err := ValidateSyncedObject(so); err != nil {
   206  		recErr = recerr.NewStalled(err, soapi.InvalidSpecReason)
   207  		log.Error(recErr, "failed to validate SyncedObject.Spec")
   208  		return
   209  	}
   210  
   211  	// If a deletion timestamp has been set, execute finalizer logic
   212  	if !so.ObjectMeta.DeletionTimestamp.IsZero() {
   213  		// check for an abandon annotation
   214  		annotations := so.GetAnnotations()
   215  		if annotations[soapi.AnnotationDeletionPolicy] == soapi.AnnotationDeletionPolicyAbandon {
   216  			log.Info("detected abandon deletion policy")
   217  		} else if err := r.rideChariot(ctx, so, chariotClientApi.Delete); err != nil {
   218  			recErr = recerr.New(err, soapi.PublishMessageFailedReason)
   219  			log.Error(recErr, "failed to publish chariot delete message")
   220  			// Requeue immediately and reconcile the deleted object
   221  			result = reconcile.ResultRequeue
   222  			return // Don't delete the finalizer if the chariot request fails.
   223  		} else {
   224  			log.Info("published chariot delete message")
   225  		}
   226  
   227  		if !controllerutil.RemoveFinalizer(so, soapi.Finalizer) {
   228  			recErr = fmt.Errorf("finalizer not removed")
   229  			log.Error(recErr, "RemoveFinalizer returned false")
   230  		}
   231  		return
   232  	}
   233  
   234  	// Expired SyncedObjects are marked deleted using the controller-runtime client
   235  	if so.IsExpired() {
   236  		log.Info("synced object expired")
   237  		err := r.Client.Delete(ctx, so)
   238  		if client.IgnoreNotFound(err) != nil {
   239  			recErr = err
   240  		}
   241  		// Requeue immediately so the finalizer is executed.
   242  		result = reconcile.ResultRequeue
   243  		return
   244  	}
   245  
   246  	if recErr = reconcile.Progressing(ctx, so, patcher, r.PatchOpts()...); recErr != nil {
   247  		return
   248  	}
   249  
   250  	if storageLocationOutdated, err := IsStorageLocationOutdated(so); err != nil {
   251  		recErr = recerr.New(err, soapi.ReconcileFailedReason)
   252  		log.Error(recErr, "failed to check storage location for updates")
   253  		return
   254  	} else if storageLocationOutdated {
   255  		// The outdated object must be deleted in google cloud storage using a chariot DELETE message.
   256  		var outdatedSo = &soapi.SyncedObject{
   257  			Spec: soapi.SyncedObjectSpec{
   258  				Object:    so.Status.StoredObject,
   259  				Banner:    so.Status.Banner,
   260  				Cluster:   so.Status.Cluster,
   261  				Directory: so.Status.Directory,
   262  			},
   263  		}
   264  
   265  		// check for an abandon annotation before deleting
   266  		annotations := so.GetAnnotations()
   267  		if annotations[soapi.AnnotationDeletionPolicy] == soapi.AnnotationDeletionPolicyAbandon {
   268  			log.Info("detected abandon deletion policy for synced object with outdated storage location")
   269  		} else if err := r.rideChariot(ctx, outdatedSo, chariotClientApi.Delete); err != nil {
   270  			recErr = recerr.New(err, soapi.PublishMessageFailedReason)
   271  			log.Error(recErr, "failed to publish chariot delete message")
   272  			// Requeue immediately and delete the outdated object.
   273  			result = reconcile.ResultRequeue
   274  			return
   275  		} else {
   276  			log.Info("published chariot delete message for outdated storage location")
   277  		}
   278  		// Clear status until chariot successfully sends a CREATE message for the updated object
   279  		so.ClearStatusDetails()
   280  	}
   281  
   282  	if err := r.rideChariot(ctx, so, chariotClientApi.Create); err != nil {
   283  		recErr = recerr.New(err, soapi.PublishMessageFailedReason)
   284  		log.Error(recErr, "failed to publish chariot create message")
   285  		// Requeue immediately and re-publish the object.
   286  		result = reconcile.ResultRequeue
   287  		return
   288  	}
   289  
   290  	// Update the status details when the chariot CREATE message is successfully sent.
   291  	if err := so.SetStatusDetails(); err != nil {
   292  		// this should not happen since the object is validated at the beginning of the reconcile loop.
   293  		recErr = recerr.New(err, soapi.ReconcileFailedReason)
   294  		log.Error(recErr, "failed update status")
   295  		return
   296  	}
   297  
   298  	conditions.MarkTrue(so, status.ReadyCondition, soapi.PublishMessageSucceededReason, "message published successfully")
   299  	result = reconcile.ResultSuccess
   300  	return
   301  }
   302  
   303  func (r *Reconciler) rideChariot(ctx context.Context, so *soapi.SyncedObject, operation chariotClientApi.Operation) error {
   304  	var dir string
   305  	if so.Spec.Directory != nil {
   306  		dir = *so.Spec.Directory
   307  	}
   308  
   309  	re := chariotClientApi.
   310  		NewChariotMessage().
   311  		SetOperation(operation).
   312  		SetBanner(so.Spec.Banner).
   313  		SetCluster(so.Spec.Cluster).
   314  		SetOwner(SyncedObjectChariotOwner).
   315  		SetDir(dir).
   316  		SetObjects([]string{so.Spec.Object}).
   317  		SetNotify(so.Spec.Notify)
   318  
   319  	data, err := json.Marshal(re)
   320  	if err != nil {
   321  		return err
   322  	}
   323  
   324  	var msg = &pubsub.Message{Data: data}
   325  
   326  	_, err = r.Cfg.PubsubTopic.Publish(ctx, msg).Get(ctx)
   327  	return err
   328  }
   329  
   330  func CreateChariotID(base64EncodedObj string) (string, error) {
   331  	obj, err := base64.StdEncoding.DecodeString(base64EncodedObj)
   332  	if err != nil {
   333  		return "", err
   334  	}
   335  	gvknn, err := chariot.ParseYamlGVKNN(obj)
   336  	if err != nil {
   337  		return "", err
   338  	}
   339  	return gvknn.Hash(), gvknn.Validate()
   340  }
   341  
   342  // IsStorageLocationOutdated checks the SyncedObject's storage location to see if an outdated object should be deleted.
   343  //
   344  // The storage location depends on the SyncedObject's Banner, Cluster, Directory, and its Object's gvknn.
   345  func IsStorageLocationOutdated(so *soapi.SyncedObject) (bool, error) {
   346  	if so.Status.StorageLocation == "" {
   347  		// the synced object was just created, or its storage location was cleared successfully.
   348  		return false, nil
   349  	}
   350  
   351  	chariotID, err := CreateChariotID(so.Spec.Object)
   352  	if err != nil {
   353  		return false, err
   354  	}
   355  	var dir string
   356  	if so.Spec.Directory != nil {
   357  		dir = *so.Spec.Directory
   358  	}
   359  	currentStorageLocation := chariot.FmtStorageLocation(so.Spec.Banner, so.Spec.Cluster, dir, chariotID)
   360  	if currentStorageLocation != so.Status.StorageLocation {
   361  		return true, nil
   362  	}
   363  
   364  	return false, nil
   365  }
   366  
   367  // naming conventions for
   368  // - banner GCP project ID: https://cloud.google.com/resource-manager/docs/creating-managing-projects#before_you_begin
   369  // - cluster gke names: https://cloud.google.com/kubernetes-engine/docs/reference/rest/v1beta1/projects.zones.clusters
   370  var reValidBannerAndCluster = regexp.MustCompile("^[a-z]([-]?[a-z0-9])+$")
   371  
   372  func ValidateSyncedObject(so *soapi.SyncedObject) error {
   373  	if so.Spec.Object == "" {
   374  		return fmt.Errorf("missing object in spec")
   375  	} else if _, err := CreateChariotID(so.Spec.Object); err != nil {
   376  		return fmt.Errorf("invalid object in spec: %w", err)
   377  	}
   378  
   379  	if so.Spec.Banner == "" {
   380  		return fmt.Errorf("missing banner in spec")
   381  	} else if len(so.Spec.Banner) < 6 || len(so.Spec.Banner) > 30 {
   382  		return fmt.Errorf("invalid banner project ID length")
   383  	} else if !reValidBannerAndCluster.MatchString(so.Spec.Banner) {
   384  		return fmt.Errorf("invalid banner project ID name")
   385  	}
   386  
   387  	// cluster can be blank, a valid GKE cluster name, or a cluster edge ID
   388  	if so.Spec.Cluster == "" {
   389  		return nil
   390  	} else if len(so.Spec.Cluster) > 40 {
   391  		return fmt.Errorf("invalid cluster name length")
   392  	} else if reValidBannerAndCluster.MatchString(so.Spec.Cluster) {
   393  		return nil
   394  	} else if cuuid, err := uuid.Parse(so.Spec.Cluster); err != nil || cuuid == uuid.Nil {
   395  		return fmt.Errorf("invalid cluster name")
   396  	}
   397  	return nil
   398  }
   399  

View as plain text