package syncedobject import ( "context" "encoding/base64" "encoding/json" "fmt" "regexp" "time" "cloud.google.com/go/pubsub" certmgr "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" "github.com/go-logr/logr" "github.com/google/uuid" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" k8sctrl "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" soapi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1" "edge-infra.dev/pkg/edge/chariot" chariotClientApi "edge-infra.dev/pkg/edge/chariot/client" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/lib/fog" ) const SyncedObjectChariotOwner = "SyncedObjectController" // syncedObjectConditions is the reconcile summarization configuration for how // various conditions should be taken into account when the final condition is // summarized var syncedObjectConditions = reconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ status.ReadyCondition, status.ReconcilingCondition, status.StalledCondition, }, Summarize: []string{ status.StalledCondition, }, NegativePolarity: []string{ status.ReconcilingCondition, status.StalledCondition, }, } // +kubebuilder:rbac:groups=edge.ncr.com,resources=syncedobjects,verbs=create;get;list;update;delete;patch;watch // +kubebuilder:rbac:groups=edge.ncr.com,resources=syncedobjects/status,verbs=get;update;patch // +kubebuilder:rbac:groups="pubsub.cnrm.cloud.google.com",resources=pubsubtopics,verbs=get;create;list;watch;patch // +kubebuilder:rbac:groups="pubsub.cnrm.cloud.google.com",resources=pubsubtopics/status,verbs=get;watch type Reconciler struct { client.Client Log logr.Logger Name string CrdMetrics metrics.Metrics Conditions reconcile.Conditions Cfg *Config } func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&soapi.SyncedObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). WithOptions(k8sctrl.Options{ MaxConcurrentReconciles: r.Cfg.Concurrency, }). Complete(r) } func (r *Reconciler) PatchOpts() []patch.Option { return []patch.Option{ patch.WithOwnedConditions{Conditions: r.Conditions.Owned}, patch.WithFieldOwner(r.Name), } } // Run creates the manager, sets up the controller, and then starts // everything. It returns the created manager for testing purposes func Run(ctx context.Context, cfg *Config) error { var log = fog.FromContext(ctx) mgr, err := cfg.CreateMgr() if err != nil { log.Error(err, "failed to create manager") return err } log.Info("starting manager") if err := mgr.Start(ctx); err != nil { log.Error(err, "problem running manager") return err } return nil } // CreateMgr wires up the reconciler(s) with a created manager and returns the manager + setup logger // // This method is directly passed into the `test/f2/x/ktest.WithCtrlManager` function. func (cfg *Config) CreateMgr(o ...controller.Option) (ctrl.Manager, error) { var log = fog.New() ctrl.SetLogger(log) o = append(o, controller.WithMetricsAddress(cfg.MetricsAddr)) ctlCfg, opts := controller.ProcessOptions(o...) opts.LeaderElectionID = "syncedobject.edge.ncr.com" opts.Scheme = createScheme() mgr, err := ctrl.NewManager(ctlCfg, opts) if err != nil { log.Error(err, "unable to create manager") return nil, err } var r = &Reconciler{ Client: mgr.GetClient(), Log: log.WithName("syncedobject-reconciler"), Name: "syncedobject-controller", CrdMetrics: metrics.New(mgr, "soctl"), Conditions: syncedObjectConditions, Cfg: cfg, } if err = r.SetupWithManager(mgr); err != nil { log.Error(err, "failed to setup controller with manager") return nil, err } return mgr, nil } func createScheme() *runtime.Scheme { scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(soapi.AddToScheme(scheme)) utilruntime.Must(certmgr.AddToScheme(scheme)) return scheme } func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) { var ( reconcileStart = time.Now() log = ctrl.LoggerFrom(ctx) result = reconcile.ResultEmpty so = &soapi.SyncedObject{} ) if err := r.Get(ctx, req.NamespacedName, so); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } log = log.WithValues("info", so.SyncedObjectInfo()) ctx = logr.NewContext(ctx, log) r.CrdMetrics.RecordReconciling(ctx, so) patcher := patch.NewSerialPatcher(so, r.Client) defer func() { if recErr != nil { reconcileErr, ok := recErr.(recerr.Error) if !ok { reconcileErr = recerr.New(recErr, soapi.ReconcileFailedReason) } reconcileErr.ToCondition(so, status.ReadyCondition) } summarizer := reconcile.NewSummarizer(patcher) res, recErr = summarizer.SummarizeAndPatch(ctx, so, []reconcile.SummarizeOption{ reconcile.WithConditions(r.Conditions), reconcile.WithResult(result), reconcile.WithError(recErr), reconcile.WithIgnoreNotFound(), reconcile.WithProcessors( reconcile.RecordReconcileReq, reconcile.RecordResult, ), reconcile.WithFieldOwner(r.Name), }...) r.CrdMetrics.RecordDuration(ctx, so, reconcileStart) r.CrdMetrics.RecordReadiness(ctx, so) }() // Check if finalizer exists if !controllerutil.ContainsFinalizer(so, soapi.Finalizer) { if !controllerutil.AddFinalizer(so, soapi.Finalizer) { recErr = fmt.Errorf("finalizer not added") log.Error(recErr, "AddFinalizer returned false") } // Requeue immediately and reconcile the object with the finalizer added result = reconcile.ResultRequeue return } if err := ValidateSyncedObject(so); err != nil { recErr = recerr.NewStalled(err, soapi.InvalidSpecReason) log.Error(recErr, "failed to validate SyncedObject.Spec") return } // If a deletion timestamp has been set, execute finalizer logic if !so.ObjectMeta.DeletionTimestamp.IsZero() { // check for an abandon annotation annotations := so.GetAnnotations() if annotations[soapi.AnnotationDeletionPolicy] == soapi.AnnotationDeletionPolicyAbandon { log.Info("detected abandon deletion policy") } else if err := r.rideChariot(ctx, so, chariotClientApi.Delete); err != nil { recErr = recerr.New(err, soapi.PublishMessageFailedReason) log.Error(recErr, "failed to publish chariot delete message") // Requeue immediately and reconcile the deleted object result = reconcile.ResultRequeue return // Don't delete the finalizer if the chariot request fails. } else { log.Info("published chariot delete message") } if !controllerutil.RemoveFinalizer(so, soapi.Finalizer) { recErr = fmt.Errorf("finalizer not removed") log.Error(recErr, "RemoveFinalizer returned false") } return } // Expired SyncedObjects are marked deleted using the controller-runtime client if so.IsExpired() { log.Info("synced object expired") err := r.Client.Delete(ctx, so) if client.IgnoreNotFound(err) != nil { recErr = err } // Requeue immediately so the finalizer is executed. result = reconcile.ResultRequeue return } if recErr = reconcile.Progressing(ctx, so, patcher, r.PatchOpts()...); recErr != nil { return } if storageLocationOutdated, err := IsStorageLocationOutdated(so); err != nil { recErr = recerr.New(err, soapi.ReconcileFailedReason) log.Error(recErr, "failed to check storage location for updates") return } else if storageLocationOutdated { // The outdated object must be deleted in google cloud storage using a chariot DELETE message. var outdatedSo = &soapi.SyncedObject{ Spec: soapi.SyncedObjectSpec{ Object: so.Status.StoredObject, Banner: so.Status.Banner, Cluster: so.Status.Cluster, Directory: so.Status.Directory, }, } // check for an abandon annotation before deleting annotations := so.GetAnnotations() if annotations[soapi.AnnotationDeletionPolicy] == soapi.AnnotationDeletionPolicyAbandon { log.Info("detected abandon deletion policy for synced object with outdated storage location") } else if err := r.rideChariot(ctx, outdatedSo, chariotClientApi.Delete); err != nil { recErr = recerr.New(err, soapi.PublishMessageFailedReason) log.Error(recErr, "failed to publish chariot delete message") // Requeue immediately and delete the outdated object. result = reconcile.ResultRequeue return } else { log.Info("published chariot delete message for outdated storage location") } // Clear status until chariot successfully sends a CREATE message for the updated object so.ClearStatusDetails() } if err := r.rideChariot(ctx, so, chariotClientApi.Create); err != nil { recErr = recerr.New(err, soapi.PublishMessageFailedReason) log.Error(recErr, "failed to publish chariot create message") // Requeue immediately and re-publish the object. result = reconcile.ResultRequeue return } // Update the status details when the chariot CREATE message is successfully sent. if err := so.SetStatusDetails(); err != nil { // this should not happen since the object is validated at the beginning of the reconcile loop. recErr = recerr.New(err, soapi.ReconcileFailedReason) log.Error(recErr, "failed update status") return } conditions.MarkTrue(so, status.ReadyCondition, soapi.PublishMessageSucceededReason, "message published successfully") result = reconcile.ResultSuccess return } func (r *Reconciler) rideChariot(ctx context.Context, so *soapi.SyncedObject, operation chariotClientApi.Operation) error { var dir string if so.Spec.Directory != nil { dir = *so.Spec.Directory } re := chariotClientApi. NewChariotMessage(). SetOperation(operation). SetBanner(so.Spec.Banner). SetCluster(so.Spec.Cluster). SetOwner(SyncedObjectChariotOwner). SetDir(dir). SetObjects([]string{so.Spec.Object}). SetNotify(so.Spec.Notify) data, err := json.Marshal(re) if err != nil { return err } var msg = &pubsub.Message{Data: data} _, err = r.Cfg.PubsubTopic.Publish(ctx, msg).Get(ctx) return err } func CreateChariotID(base64EncodedObj string) (string, error) { obj, err := base64.StdEncoding.DecodeString(base64EncodedObj) if err != nil { return "", err } gvknn, err := chariot.ParseYamlGVKNN(obj) if err != nil { return "", err } return gvknn.Hash(), gvknn.Validate() } // IsStorageLocationOutdated checks the SyncedObject's storage location to see if an outdated object should be deleted. // // The storage location depends on the SyncedObject's Banner, Cluster, Directory, and its Object's gvknn. func IsStorageLocationOutdated(so *soapi.SyncedObject) (bool, error) { if so.Status.StorageLocation == "" { // the synced object was just created, or its storage location was cleared successfully. return false, nil } chariotID, err := CreateChariotID(so.Spec.Object) if err != nil { return false, err } var dir string if so.Spec.Directory != nil { dir = *so.Spec.Directory } currentStorageLocation := chariot.FmtStorageLocation(so.Spec.Banner, so.Spec.Cluster, dir, chariotID) if currentStorageLocation != so.Status.StorageLocation { return true, nil } return false, nil } // naming conventions for // - banner GCP project ID: https://cloud.google.com/resource-manager/docs/creating-managing-projects#before_you_begin // - cluster gke names: https://cloud.google.com/kubernetes-engine/docs/reference/rest/v1beta1/projects.zones.clusters var reValidBannerAndCluster = regexp.MustCompile("^[a-z]([-]?[a-z0-9])+$") func ValidateSyncedObject(so *soapi.SyncedObject) error { if so.Spec.Object == "" { return fmt.Errorf("missing object in spec") } else if _, err := CreateChariotID(so.Spec.Object); err != nil { return fmt.Errorf("invalid object in spec: %w", err) } if so.Spec.Banner == "" { return fmt.Errorf("missing banner in spec") } else if len(so.Spec.Banner) < 6 || len(so.Spec.Banner) > 30 { return fmt.Errorf("invalid banner project ID length") } else if !reValidBannerAndCluster.MatchString(so.Spec.Banner) { return fmt.Errorf("invalid banner project ID name") } // cluster can be blank, a valid GKE cluster name, or a cluster edge ID if so.Spec.Cluster == "" { return nil } else if len(so.Spec.Cluster) > 40 { return fmt.Errorf("invalid cluster name length") } else if reValidBannerAndCluster.MatchString(so.Spec.Cluster) { return nil } else if cuuid, err := uuid.Parse(so.Spec.Cluster); err != nil || cuuid == uuid.Nil { return fmt.Errorf("invalid cluster name") } return nil }