package clusterctl import ( "context" "fmt" "strings" "sync" "time" "github.com/go-logr/logr" "github.com/hashicorp/go-version" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "edge-infra.dev/pkg/edge/api/graph/model" "edge-infra.dev/pkg/edge/api/services" "edge-infra.dev/pkg/edge/api/services/artifacts" "edge-infra.dev/pkg/edge/api/services/edgenode" clusterApi "edge-infra.dev/pkg/edge/apis/cluster/v1alpha1" gkeClusterApi "edge-infra.dev/pkg/edge/apis/gkecluster/v1alpha1" syncedobjectApi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1" edgeCapabilities "edge-infra.dev/pkg/edge/capabilities" "edge-infra.dev/pkg/edge/compatibility" "edge-infra.dev/pkg/edge/constants" clusterConstant "edge-infra.dev/pkg/edge/constants/api/cluster" "edge-infra.dev/pkg/edge/constants/api/fleet" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/helmreleases/cache" pluginmetrics "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/metrics" "edge-infra.dev/pkg/edge/controllers/dbmetrics" "edge-infra.dev/pkg/edge/controllers/util/edgedb" "edge-infra.dev/pkg/edge/k8objectsutils" "edge-infra.dev/pkg/edge/shipment/generator" whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha1" "edge-infra.dev/pkg/k8s/konfigkonnector/apis/meta" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" "edge-infra.dev/pkg/k8s/runtime/inventory" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/k8s/runtime/sap" unstructuredutil "edge-infra.dev/pkg/k8s/unstructured" ff "edge-infra.dev/pkg/lib/featureflag" ) const ( ErrPluginFailed = "plugin failed" ErrPluginFinalizerFailed = "plugin finalizer failed" ErrInvalidCluster = "invalid Cluster spec" ErrToUnstructured = "failed to convert %s/%s/%s to unstructured: %w" latestTag = "latest" ) var ( // OwnerGroupLabel is the owner label for resources owned and created by the clusterctl. OwnerGroupLabel = fmt.Sprintf("%s.%s", strings.ToLower(clusterApi.Kind), clusterApi.ClusterGVK.Group) ) // clusterConditions is the reconcile summarization configuration for how // various conditions should be taken into account when the final condition is // summarized var clusterConditions = reconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ status.ReadyCondition, status.ReconcilingCondition, status.StalledCondition, }, Summarize: []string{ status.StalledCondition, }, NegativePolarity: []string{ status.ReconcilingCondition, status.StalledCondition, }, } // ClusterReconciler reconciles a Cluster object type ClusterReconciler struct { client.Client kuberecorder.EventRecorder manager manager.Manager Scheme *runtime.Scheme Log logr.Logger Metrics metrics.Metrics Config *Config DefaultRequeue time.Duration WaitForSetTimeout time.Duration // ResourceManager is a server-side apply client that can watch the resources // we are applying to the server ResourceManager *sap.ResourceManager Name string Conditions reconcile.Conditions EdgeDB *edgedb.EdgeDB Recorder *dbmetrics.DBMetrics Concurrency int WaitForSetMap *sync.Map c chan int HelmCache cache.Provider } func clusterReconcilerPredicate() predicate.Predicate { return predicate.Funcs{ UpdateFunc: func(e event.UpdateEvent) bool { return !e.ObjectNew.GetDeletionTimestamp().IsZero() }, CreateFunc: func(_ event.CreateEvent) bool { return true }, DeleteFunc: func(_ event.DeleteEvent) bool { return false }, } } // SetupWithManager sets up the controller with the Manager. func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&clusterApi.Cluster{}). WithOptions(controller.Options{ MaxConcurrentReconciles: r.Concurrency, RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond, 120*time.Second), }). WithEventFilter(clusterReconcilerPredicate()). Complete(r) } func (r *ClusterReconciler) PatchOpts() []patch.Option { return []patch.Option{ patch.WithOwnedConditions{Conditions: r.Conditions.Owned}, patch.WithFieldOwner(r.Name), } } // +kubebuilder:rbac:groups="",resources=namespaces,verbs=create;get;list;watch;update;patch;delete // +kubebuilder:rbac:groups="",resources=namespaces/status,verbs=get;update;patch // +kubebuilder:rbac:groups=edge.ncr.com,resources=clusters,verbs=create;get;list;update;patch;watch // +kubebuilder:rbac:groups=edge.ncr.com,resources=clusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups=edge.ncr.com,resources=syncedobjects,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups=edge.ncr.com,resources=syncedobjects/status,verbs=get;watch // +kubebuilder:rbac:groups="edge.ncr.com",resources=gkeclusters,verbs=create;get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="edge.ncr.com",resources=gkeclusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups="remoteaccess.edge.ncr.com",resources=vpnconfigs,verbs=create;get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="container.cnrm.cloud.google.com",resources=containerclusters,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="container.cnrm.cloud.google.com",resources=containerclusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups="container.cnrm.cloud.google.com",resources=containernodepools,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="container.cnrm.cloud.google.com",resources=containernodepools/status,verbs=get;update;patch // +kubebuilder:rbac:groups="iam.cnrm.cloud.google.com",resources=iamserviceaccounts;iamserviceaccountkeys,verbs=get;list;create;update;patch;watch;delete // +kubebuilder:rbac:groups="iam.cnrm.cloud.google.com",resources=iampolicymembers,verbs=get;list;create;update;patch;watch;delete // +kubebuilder:rbac:groups="iam.cnrm.cloud.google.com",resources=iamserviceaccounts/status;iamservicecaccountkeys/status;iampolicymembers/status,verbs=get;watch // +kubebuilder:rbac:groups="core.cnrm.cloud.google.com",resources=configconnectors,verbs=create;update;patch;watch // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=shipments,verbs=get;list;create;update;patch;delete;watch // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=shipments/status,verbs=get;watch // +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // +kubebuilder:rbac:groups="",resources=secrets,verbs=get // +kubebuilder:rbac:groups="cert-manager.io",resources=certificates,verbs=get;list;create;patch;watch;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // //nolint:gocyclo func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) { var ( reconcileStart = time.Now() log = ctrl.LoggerFrom(ctx) result = reconcile.ResultEmpty cluster = &clusterApi.Cluster{} ) if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } r.Metrics.RecordReconciling(ctx, cluster) updateReconcileMetadata(ctx, cluster, reconcileStart) oldStatus := cluster.Status.DeepCopy() if oldStatus.Inventory == nil { oldStatus.Inventory = &inventory.ResourceInventory{Entries: make([]inventory.ResourceRef, 0)} } newInv := &inventory.ResourceInventory{Entries: make([]inventory.ResourceRef, 0)} patcher := patch.NewSerialPatcher(cluster, r.Client) conditions.MarkReconciling(cluster, status.ReconcilingCondition, clusterApi.ReconcilingReason) defer func() { // by default we want to set the inventory to inventory generated in the reconcile loop cluster.Status.Inventory = newInv if recErr != nil { //if there is an error though we want to merge with the previous inventory since it may be missing manifests that aren't pruned. cluster.Status.Inventory = inventory.Merge(oldStatus.Inventory, newInv) reconcileErr, ok := recErr.(recerr.Error) if !ok { reconcileErr = recerr.New(recErr, clusterApi.ReconcileFailedReason) } reconcileErr.ToCondition(cluster, status.ReadyCondition) } res, recErr = r.summarize(ctx, patcher, recErr, cluster, result) r.Metrics.RecordDuration(ctx, cluster, reconcileStart) r.Metrics.RecordReadiness(ctx, cluster) r.EdgeDB.RecordInfraStatus(ctx, cluster, *r.Recorder) pluginmetrics.New().RecordRegisteredPluginsCountMetric(plugins.Count()) deleteResourceEntry(cluster) }() // Check if finalizer exists if !controllerutil.ContainsFinalizer(cluster, clusterApi.Finalizer) { controllerutil.AddFinalizer(cluster, clusterApi.Finalizer) // Return immediately so that we requeue and reconcile object with finalizer // added. result = reconcile.ResultRequeue return } log = log.WithValues("name", cluster.Spec.Name, "spec", cluster.Spec, "cluster reconciler concurrency", r.Concurrency, "helm cache length", r.HelmCache.Len()) ctx = logr.NewContext(ctx, log) // If a deletion timestamp has been set, execute finalizer logic if !cluster.ObjectMeta.DeletionTimestamp.IsZero() { //nolint:nestif log.Info("deletion detected, executing finalizers") pluginResult, err := plugins.ExecuteFinalizers(ctx, r.Client, cluster, r.Config.PluginConcurrency) if err != nil { log.Error(err, ErrPluginFinalizerFailed) recErr = err return } if pluginResult.Requeue { result = reconcile.ResultRequeue return } if cluster.Status.Inventory != nil { inv, err := inventory.ListObjects(cluster.Status.Inventory) if err != nil { log.Error(err, "failed to get objects to cleanup from inventory", "cluster", cluster.Name) recErr = err return } _, err = r.ResourceManager.DeleteAll(ctx, inv, sap.DefaultDeleteOptions()) if err != nil { log.Error(err, "failed to cleanup objects from inventory", "cluster", cluster.Name) recErr = err return } } controllerutil.RemoveFinalizer(cluster, clusterApi.Finalizer) log.Info("finalizer executed") return } conditions.Delete(cluster, status.DependenciesReadyCondition) log.Info("reconciling started for cluster") var unstructuredObjs []*unstructured.Unstructured // Check if cluster spec is valid if err := cluster.Spec.Valid(); err != nil { log.Error(err, ErrInvalidCluster) recErr = recerr.NewStalled(fmt.Errorf("invalid spec: %w", err), clusterApi.InvalidSpecReason) return } if err := reconcile.Progressing(ctx, cluster, patcher, r.PatchOpts()...); err != nil { recErr = recerr.New(err, clusterApi.ReconcileFailedReason) return } ns := buildNamespace(cluster) uobj, err := unstructuredutil.ToUnstructured(ns) if err != nil { recErr = recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), clusterApi.ApplyFailedReason) return } unstructuredObjs = append(unstructuredObjs, uobj) // Create the Namespace before applying the shipment so ssa is happy err = client.IgnoreAlreadyExists(r.Create(ctx, ns)) if err != nil { recErr = recerr.New(err, clusterApi.NamespaceCreationFailedReason) return } if shouldCreateGKECluster(cluster) { gkeCluster := gkeClusterApi.New(cluster.Spec.ProjectID, cluster.Spec.Banner, cluster.Spec.Organization, cluster.Spec.Name, cluster.Spec.Location, cluster.Spec.NodeVersion, cluster.Spec.NumNode, cluster.Spec.Fleet, cluster.Name) gkeCluster.Spec.Autoscale = cluster.Spec.Autoscale gkeCluster.Spec.MinNodes = cluster.Spec.MinNodes gkeCluster.Spec.MaxNodes = cluster.Spec.MaxNodes gkeCluster.ObjectMeta.OwnerReferences = cluster.NewOwnerReference() uobj, err := unstructuredutil.ToUnstructured(gkeCluster) if err != nil { recErr = recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), clusterApi.ApplyFailedReason) return } unstructuredObjs = append(unstructuredObjs, uobj) log.WithValues(gkeClusterApi.Name, gkeCluster.Spec).Info("GKECluster created") } shipments, recErr := r.generateShipments(ctx, cluster) if recErr != nil { return } unstructuredObjs = append(unstructuredObjs, shipments...) // check the unstructured objects and ensure they set the kind. var kindErr = fmt.Errorf("unstructured object must contain kind information") for i, obj := range unstructuredObjs { if "" == obj.GetKind() { var uj, err = obj.MarshalJSON() var labels = []interface{}{"index", i, "unstructured", string(uj)} if err != nil { labels = append(labels, "marshalError", err.Error(), "obj", obj) } log.Error(kindErr, "kind missing", labels...) } } r.ResourceManager.SetOwnerLabels(unstructuredObjs, r.Name, "") // When the "kind" is missing, the Apply below will fail. Use the logs above when encountering this error. changeSet, err := r.ResourceManager.ApplyAll(ctx, unstructuredObjs, sap.ApplyOptions{ Force: true, WaitTimeout: r.WaitForSetTimeout, }) if err != nil { recErr = recerr.New(fmt.Errorf("failed to apply resources: %w", err), clusterApi.ApplyFailedReason) return } newInv = inventory.New(inventory.FromSapChangeSet(changeSet)) pluginResult, pluginChangeSet, err := plugins.Execute(ctx, r.Name, r.ResourceManager, cluster, r.HelmCache, r.Config.PluginConcurrency) if pluginChangeSet != nil { changeSet.Append(pluginChangeSet.Entries) newInv = &inventory.ResourceInventory{Entries: newInv.Union(inventory.New(inventory.FromSapChangeSet(changeSet)))} } if err != nil { log.Error(err, ErrPluginFailed) waitErr := recerr.NewWait(err, clusterApi.PluginFailedReason, r.DefaultRequeue) waitErr.ToCondition(cluster, status.ReadyCondition) recErr = waitErr return } if pluginResult.Requeue { result = reconcile.ResultRequeue return } log.Info("applied objects", "changeset", changeSet.ToMap()) prune, err := ff.FeatureEnabledForContext(ff.NewClusterContext(cluster.Name), ff.UseClusterCTLPruning, true) if err != nil { log.Error(err, "unable to get ld flag for prunning, defaulting to prune enabled") } if oldStatus.Inventory != nil && prune { //nolint:nestif diff, err := inventory.Diff(oldStatus.Inventory, newInv) if err != nil { recErr = recerr.New(err, clusterApi.PruneFailedReason) return } if len(diff) > 0 { opt := sap.DefaultDeleteOptions() opt.Exclusions = map[string]string{plugins.PruneLabel: plugins.PruneDisabled} changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, opt) if err != nil { recErr = recerr.New(err, clusterApi.PruneFailedReason) return } if err := plugins.Prune(ctx, changeSet, r.Config.PluginConcurrency, cluster.Name); err != nil { log.Error(err, "failed to execute plugins prune side effect") } log.Info("pruned objects", "changeset", changeSet.ToMap()) } } // check if we are already waiting to update wait for set status for this cluster _, inProgressWaitForSet := r.WaitForSetMap.LoadOrStore(cluster.Name, true) if r.WaitForSetTimeout != 0 && !inProgressWaitForSet { go func() { r.c <- 1 defer func() { r.WaitForSetMap.Delete(cluster.Name); <-r.c }() backgroundCtx := context.Background() err = r.ResourceManager.WaitForSet(backgroundCtx, changeSet.ToObjMetadataSet(), sap.WaitOptions{ Timeout: r.WaitForSetTimeout, }) if err != nil { log.Error(err, "timeout waiting for wait for set status", "cluster", cluster.Name) waitErr := recerr.NewWait(err, clusterApi.TimeOutReason, r.DefaultRequeue) waitErr.ToCondition(cluster, status.ReadyCondition) conditions.MarkStalled(cluster, clusterApi.TimeOutReason, "timeout waiting for wait for set status") res, recErr = r.summarize(backgroundCtx, patcher, waitErr, cluster, reconcile.ResultRequeue) return } log.Info("cluster reconciled successfully") conditions.MarkTrue(cluster, status.ReadyCondition, clusterApi.ClusterReadyReason, "cluster reconciled successfully") conditions.Delete(cluster, status.StalledCondition) result = reconcile.ResultSuccess res, recErr = r.summarize(backgroundCtx, patcher, recErr, cluster, result) }() } log.Info("cluster manifests applied successfully, waiting for status") result = reconcile.ResultSuccess // set status as ready after manifest first applied while waiting for wait for set status if !oldStatus.HasReadyConditionWithReason(clusterApi.ClusterReadyReason) && !oldStatus.HasReadyConditionWithReason(clusterApi.TimeOutReason) { conditions.MarkTrue(cluster, status.ReadyCondition, clusterApi.ManifestsAppliedReason, "cluster manifests applied successfully, waiting for status") } return } func (r *ClusterReconciler) summarize(ctx context.Context, patcher *patch.SerialPatcher, recErr error, cluster *clusterApi.Cluster, result reconcile.Result) (ctrl.Result, error) { summarizer := reconcile.NewSummarizer(patcher) res, recErr := summarizer.SummarizeAndPatch(ctx, cluster, []reconcile.SummarizeOption{ reconcile.WithConditions(r.Conditions), reconcile.WithResult(result), reconcile.WithError(recErr), reconcile.WithIgnoreNotFound(), reconcile.WithProcessors( reconcile.RecordReconcileReq, reconcile.RecordResult, ), reconcile.WithFieldOwner(r.Name), reconcile.WithEventRecorder(r.EventRecorder), }...) return res, recErr } // nolint func (r *ClusterReconciler) generateShipments(ctx context.Context, cluster *clusterApi.Cluster) ([]*unstructured.Unstructured, recerr.Error) { var err error var unstructuredObjs []*unstructured.Unstructured var uobj *unstructured.Unstructured var sobj *syncedobjectApi.SyncedObject capabilities := generator.InfraCapabilities switch { case cluster.IsBasicStore(): capabilities = generator.BasicStoreCapabilities case cluster.IsStore(): capabilities = generator.StoreCapabilities } clusterShipmentInfo := generator.ClusterRenderParams{ ClusterType: cluster.Spec.Type.String(), UUID: cluster.Name, Region: r.Config.GCPRegion, Zone: r.Config.GCPZone, GCPProjectID: cluster.Spec.ProjectID, BannerID: cluster.Spec.BannerEdgeID, ForemanGCPProjectID: r.Config.TopLevelProjectID, Domain: r.Config.Domain, BSLEndpoint: r.Config.BSLConfig.Endpoint, BSLEdgeEnvPrefix: r.Config.BSLConfig.OrganizationPrefix, BSLRootOrg: r.Config.BSLConfig.Root, DatasyncDNSZone: r.Config.DatasyncDNSZone, DatasyncDNSName: r.Config.DatasyncDNSName, DatabaseName: r.Config.DatabaseName, EdgeSecMaxLeasePeriod: r.Config.EdgeSecMaxLeasePeriod, EdgeSecMaxValidityPeriod: r.Config.EdgeSecMaxValidityPeriod, GCPForemanProjectNumber: r.Config.GCPForemanProjectNumber, } terminalService := services.NewTerminalService(r.Config.DB, nil) activationCodeService := edgenode.NewActivationCodeService(r.Config.DB, nil, nil, nil, nil, nil, nil) terminals, err := terminalService.GetTerminalsByClusterID(ctx, cluster.Name) if err != nil { ctrl.LoggerFrom(ctx).Error(err, "unable to get terminals, defaulting to store pallet only") } registeredTerminals := []*model.Terminal{} for _, ts := range terminals { code, err := activationCodeService.Fetch(ctx, ts.TerminalID) if err != nil { ctrl.LoggerFrom(ctx).Error(err, "unable to get terminal activation code, not adding terminal to count", "terminal_id", ts.TerminalID) continue } //if node registered add to the list of registered nodes todo change this to check node status when added to db if code == nil || len(*code) == 0 { registeredTerminals = append(registeredTerminals, ts) } } if len(registeredTerminals) != 0 { if err := r.manageRegisteredTerminalLabels(ctx, cluster, len(registeredTerminals)); err != nil { return nil, err } } // use the latest fleet package (including non-store clusters e.g. "cluster-infra") by default for non-store clusters version := latestTag pallets := []whv1.BaseArtifact{{Name: cluster.Spec.Fleet.String(), Tag: version}} if cluster.IsStore() { //nolint:nestif artifacts, err := r.EdgeDB.GetClusterArtifactVersions(ctx, cluster.Name) if err != nil { return nil, recerr.New(fmt.Errorf("failed to get pallets from Edge DB for store %s: %w", cluster.Name, err), clusterApi.ReconcileFailedReason) } else if len(artifacts) == 0 { return nil, recerr.New(fmt.Errorf("no pallets found in Edge DB for store %s: %w", cluster.Name, err), clusterApi.ReconcileFailedReason) } // set pallets to be scheduled to cluster pallets = []whv1.BaseArtifact{} for _, a := range artifacts { pallets = append(pallets, a.ToBaseArtifact()) version = a.Version } // In Order to make Datasync an optional feature, the couchdb replication external secret // was moved to its own pallet // We check to see if the banner in which the cluster resides has datasync enabled // if it does, we add the couchdb replication secret pallet. // This is added so the shipment ready state is not affected in banners that do not have datasync enabled. if !cluster.IsGKE() { supportsOptDatasync, err := compatibility.Compare(compatibility.GreaterThanOrEqual, version, "0.20") if err == nil { supportsOptDatasyncBetween, err := compatibility.Compare(compatibility.LessThan, version, "0.21") if err == nil { if supportsOptDatasync && supportsOptDatasyncBetween { capabilitySVC := services.NewCapabilityService(r.Config.DB, nil, nil, "") enablements, err := capabilitySVC.ListCapabilitiesByBanner(ctx, &cluster.Spec.BannerEdgeID) switch { case err != nil: ctrl.LoggerFrom(ctx).Error(err, "unable to get enablements for banner, defaulting to store pallet only") default: for _, enablement := range enablements { if enablement.Name == "couchdb" { ctrl.LoggerFrom(ctx).Info("couchdb-repl-secret added to store shipment", "tag", version) pallets = append(pallets, whv1.BaseArtifact{Name: "couchdb-repl-secret", Tag: version}) break } } } } } } else { ctrl.LoggerFrom(ctx).Error(err, "unable to compare fleet version against static supported optional datasync version of atleast 0.20", "tag", version) } } } // compare the version of the artifact with the static version 0.18.0 // if the version is less than the static version (0.18.0) -> the cluster_location param should be added // if the version is greater than or equal to static version (0.18.0) -> the gcp_zone and gcp_region should be added // version can be 0.18.0, 0.17.0, sha256:abc, latest // TODO(pa250194_ncrvoyix): support sha256:abc supportLegacy := false switch { case strings.Contains(version, "."): support, err := compareVersions(version) if err != nil { return nil, recerr.New(fmt.Errorf("unable to compare versions: %s and %s: %w", version, "0.18", err), clusterApi.ReconcileFailedReason) } supportLegacy = support case version == latestTag: supportLegacy = false } shipmentOpts := &generator.ShipmentOpts{ Prune: true, Force: true, Pallets: pallets, Repository: generator.GenerateShipmentRepo(r.Config.GCPRegion, r.Config.TopLevelProjectID), Capabilities: capabilities, SupportLegacyVersions: supportLegacy && fleet.IsStoreCluster(cluster.Spec.Fleet), } shipmentOpts.AddClusterRenderParams(clusterShipmentInfo) clusterShip, infraShip, err := shipmentOpts.BuildSplitShipments() if err != nil { return nil, recerr.NewStalled(fmt.Errorf("failed to create shipment for store %s: %w", cluster.Spec.Name, err), clusterApi.InvalidShipmentSpecReason) } infraShip.ObjectMeta.OwnerReferences = cluster.NewOwnerReference() uobj, err = unstructuredutil.ToUnstructured(infraShip) if err != nil { return nil, recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), clusterApi.ApplyFailedReason) } unstructuredObjs = append(unstructuredObjs, uobj) //add cluster shipment synced obj sobj, err = k8objectsutils.BuildClusterSyncedObjectWithDir(cluster, clusterShip, constants.ClusterShipment, constants.ShipmentKustomizationDir) if err != nil { return nil, recerr.New(fmt.Errorf("failed to create cluster %s shipment synced object: %w", cluster.Spec.Name, err), clusterApi.ApplyFailedReason) } uobj, err = unstructuredutil.ToUnstructured(sobj) if err != nil { return nil, recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), clusterApi.ApplyFailedReason) } // add a second, temporary cluster shipment synced obj to fix issues caused by renaming the original one tempSO, err := k8objectsutils.BuildClusterSyncedObjectWithDir(cluster, clusterShip, constants.ClusterShipmentOld, constants.ShipmentKustomizationDir) if err != nil { return nil, recerr.New(fmt.Errorf("failed to create cluster %s shipment synced object: %w", cluster.Spec.Name, err), clusterApi.ApplyFailedReason) } unstructuredTempSO, err := unstructuredutil.ToUnstructured(tempSO) if err != nil { return nil, recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), clusterApi.ApplyFailedReason) } unstructuredObjs = append(unstructuredObjs, uobj, unstructuredTempSO) return unstructuredObjs, nil } func buildNamespace(cluster *clusterApi.Cluster) *corev1.Namespace { return &corev1.Namespace{ TypeMeta: metav1.TypeMeta{ Kind: "Namespace", APIVersion: corev1.SchemeGroupVersion.String(), }, ObjectMeta: metav1.ObjectMeta{ Name: cluster.Name, Annotations: map[string]string{ meta.ProjectAnnotation: cluster.Spec.ProjectID, }, OwnerReferences: cluster.NewOwnerReference(), }, } } // Check if a GKECluster should be created for the cluster. If a fleet label exists, // then it is a legacy-gke-store-cluster that needs a GKECluster func shouldCreateGKECluster(cluster *clusterApi.Cluster) bool { if cluster.Spec.Type == clusterConstant.GKE && !cluster.IsStore() { return true } else if _, ok := cluster.Labels[fleet.Label]; ok { return true } return false } func (r *ClusterReconciler) manageRegisteredTerminalLabels(ctx context.Context, cluster *clusterApi.Cluster, numRegisteredTerminals int) recerr.Error { // get descheduler label from edge capability labels labelService := services.NewLabelService(artifacts.NewArtifactsService(r.EdgeDB.DB, nil), r.EdgeDB.DB) existingLabels, err := labelService.GetLabels(ctx, &cluster.Spec.BannerEdgeID) if err != nil { return recerr.New(fmt.Errorf("unable to get banner labels: %w", err), clusterApi.ReconcileFailedReason) } // get descheduler edge capability label edgeCapabilityLabels := edgeCapabilities.GetCapabilityLabels(existingLabels, edgeCapabilities.DeschedulerPallet) if len(edgeCapabilityLabels) != 0 { if err := r.updateDeschedulerLabels(ctx, labelService, numRegisteredTerminals, cluster.Name, edgeCapabilityLabels[0].LabelEdgeID); err != nil { return recerr.New(fmt.Errorf("unable to update descheduler label: %w", err), clusterApi.ReconcileFailedReason) } } else { ctrl.LoggerFrom(ctx).Error(fmt.Errorf("no descheduler capability label found"), "descheduler pallet not part of edge capabilities") } return nil } func (r *ClusterReconciler) updateDeschedulerLabels(ctx context.Context, labelService services.LabelService, registeredTerminals int, clusterName, deschedulerLabelEdgeID string) error { if registeredTerminals <= 1 { return labelService.DeleteClusterLabels(ctx, &clusterName, &deschedulerLabelEdgeID) } label, err := labelService.GetClusterLabels(ctx, &clusterName, &deschedulerLabelEdgeID) if err != nil { return err } if label != nil { return nil } return labelService.CreateClusterLabel(ctx, clusterName, deschedulerLabelEdgeID) } func (r *ClusterReconciler) setResourceManager() error { if r.ResourceManager == nil { sapMngr, err := sap.NewResourceManagerFromConfig(r.manager.GetConfig(), client.Options{}, sap.Owner{Field: r.Name, Group: OwnerGroupLabel}) if err != nil { return err } r.ResourceManager = sapMngr } return nil } func compareVersions(versionValue string) (bool, error) { staticVerifiedVersion, err := version.NewVersion("0.18") if err != nil { return false, err } fleetVersion, err := version.NewVersion(versionValue) if err != nil { return false, err } return staticVerifiedVersion.GreaterThan(fleetVersion), nil }