...

Source file src/edge-infra.dev/pkg/edge/controllers/clusterctl/cluster_controller.go

Documentation: edge-infra.dev/pkg/edge/controllers/clusterctl

     1  package clusterctl
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strings"
     7  	"sync"
     8  	"time"
     9  
    10  	"github.com/go-logr/logr"
    11  	"github.com/hashicorp/go-version"
    12  	corev1 "k8s.io/api/core/v1"
    13  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    14  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    15  	"k8s.io/apimachinery/pkg/runtime"
    16  	kuberecorder "k8s.io/client-go/tools/record"
    17  	"k8s.io/client-go/util/workqueue"
    18  	ctrl "sigs.k8s.io/controller-runtime"
    19  	"sigs.k8s.io/controller-runtime/pkg/client"
    20  	"sigs.k8s.io/controller-runtime/pkg/controller"
    21  	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    22  	"sigs.k8s.io/controller-runtime/pkg/event"
    23  	"sigs.k8s.io/controller-runtime/pkg/manager"
    24  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    25  
    26  	"edge-infra.dev/pkg/edge/api/graph/model"
    27  	"edge-infra.dev/pkg/edge/api/services"
    28  	"edge-infra.dev/pkg/edge/api/services/artifacts"
    29  	"edge-infra.dev/pkg/edge/api/services/edgenode"
    30  	clusterApi "edge-infra.dev/pkg/edge/apis/cluster/v1alpha1"
    31  	gkeClusterApi "edge-infra.dev/pkg/edge/apis/gkecluster/v1alpha1"
    32  	syncedobjectApi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1"
    33  	edgeCapabilities "edge-infra.dev/pkg/edge/capabilities"
    34  	"edge-infra.dev/pkg/edge/compatibility"
    35  	"edge-infra.dev/pkg/edge/constants"
    36  	clusterConstant "edge-infra.dev/pkg/edge/constants/api/cluster"
    37  	"edge-infra.dev/pkg/edge/constants/api/fleet"
    38  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins"
    39  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/helmreleases/cache"
    40  	pluginmetrics "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/metrics"
    41  	"edge-infra.dev/pkg/edge/controllers/dbmetrics"
    42  	"edge-infra.dev/pkg/edge/controllers/util/edgedb"
    43  	"edge-infra.dev/pkg/edge/k8objectsutils"
    44  	"edge-infra.dev/pkg/edge/shipment/generator"
    45  	whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha1"
    46  	"edge-infra.dev/pkg/k8s/konfigkonnector/apis/meta"
    47  	"edge-infra.dev/pkg/k8s/meta/status"
    48  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    49  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    50  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    51  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    52  	"edge-infra.dev/pkg/k8s/runtime/inventory"
    53  	"edge-infra.dev/pkg/k8s/runtime/patch"
    54  	"edge-infra.dev/pkg/k8s/runtime/sap"
    55  	unstructuredutil "edge-infra.dev/pkg/k8s/unstructured"
    56  	ff "edge-infra.dev/pkg/lib/featureflag"
    57  )
    58  
    59  const (
    60  	ErrPluginFailed          = "plugin failed"
    61  	ErrPluginFinalizerFailed = "plugin finalizer failed"
    62  	ErrInvalidCluster        = "invalid Cluster spec"
    63  	ErrToUnstructured        = "failed to convert %s/%s/%s to unstructured: %w"
    64  	latestTag                = "latest"
    65  )
    66  
    67  var (
    68  	// OwnerGroupLabel is the owner label for resources owned and created by the clusterctl.
    69  	OwnerGroupLabel = fmt.Sprintf("%s.%s", strings.ToLower(clusterApi.Kind), clusterApi.ClusterGVK.Group)
    70  )
    71  
    72  // clusterConditions is the reconcile summarization configuration for how
    73  // various conditions should be taken into account when the final condition is
    74  // summarized
    75  var clusterConditions = reconcile.Conditions{
    76  	Target: status.ReadyCondition,
    77  	Owned: []string{
    78  		status.ReadyCondition,
    79  		status.ReconcilingCondition,
    80  		status.StalledCondition,
    81  	},
    82  	Summarize: []string{
    83  		status.StalledCondition,
    84  	},
    85  	NegativePolarity: []string{
    86  		status.ReconcilingCondition,
    87  		status.StalledCondition,
    88  	},
    89  }
    90  
    91  // ClusterReconciler reconciles a Cluster object
    92  type ClusterReconciler struct {
    93  	client.Client
    94  	kuberecorder.EventRecorder
    95  	manager           manager.Manager
    96  	Scheme            *runtime.Scheme
    97  	Log               logr.Logger
    98  	Metrics           metrics.Metrics
    99  	Config            *Config
   100  	DefaultRequeue    time.Duration
   101  	WaitForSetTimeout time.Duration
   102  	// ResourceManager is a server-side apply client that can watch the resources
   103  	// we are applying to the server
   104  	ResourceManager *sap.ResourceManager
   105  	Name            string
   106  	Conditions      reconcile.Conditions
   107  	EdgeDB          *edgedb.EdgeDB
   108  	Recorder        *dbmetrics.DBMetrics
   109  	Concurrency     int
   110  	WaitForSetMap   *sync.Map
   111  	c               chan int
   112  	HelmCache       cache.Provider
   113  }
   114  
   115  func clusterReconcilerPredicate() predicate.Predicate {
   116  	return predicate.Funcs{
   117  		UpdateFunc: func(e event.UpdateEvent) bool {
   118  			return !e.ObjectNew.GetDeletionTimestamp().IsZero()
   119  		},
   120  		CreateFunc: func(_ event.CreateEvent) bool {
   121  			return true
   122  		},
   123  		DeleteFunc: func(_ event.DeleteEvent) bool {
   124  			return false
   125  		},
   126  	}
   127  }
   128  
   129  // SetupWithManager sets up the controller with the Manager.
   130  func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
   131  	return ctrl.NewControllerManagedBy(mgr).
   132  		For(&clusterApi.Cluster{}).
   133  		WithOptions(controller.Options{
   134  			MaxConcurrentReconciles: r.Concurrency,
   135  			RateLimiter:             workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond, 120*time.Second),
   136  		}).
   137  		WithEventFilter(clusterReconcilerPredicate()).
   138  		Complete(r)
   139  }
   140  
   141  func (r *ClusterReconciler) PatchOpts() []patch.Option {
   142  	return []patch.Option{
   143  		patch.WithOwnedConditions{Conditions: r.Conditions.Owned},
   144  		patch.WithFieldOwner(r.Name),
   145  	}
   146  }
   147  
   148  // +kubebuilder:rbac:groups="",resources=namespaces,verbs=create;get;list;watch;update;patch;delete
   149  // +kubebuilder:rbac:groups="",resources=namespaces/status,verbs=get;update;patch
   150  // +kubebuilder:rbac:groups=edge.ncr.com,resources=clusters,verbs=create;get;list;update;patch;watch
   151  // +kubebuilder:rbac:groups=edge.ncr.com,resources=clusters/status,verbs=get;update;patch
   152  // +kubebuilder:rbac:groups=edge.ncr.com,resources=syncedobjects,verbs=create;get;list;update;patch;watch;delete
   153  // +kubebuilder:rbac:groups=edge.ncr.com,resources=syncedobjects/status,verbs=get;watch
   154  // +kubebuilder:rbac:groups="edge.ncr.com",resources=gkeclusters,verbs=create;get;list;watch;create;update;patch;delete
   155  // +kubebuilder:rbac:groups="edge.ncr.com",resources=gkeclusters/status,verbs=get;update;patch
   156  // +kubebuilder:rbac:groups="remoteaccess.edge.ncr.com",resources=vpnconfigs,verbs=create;get;list;watch;create;update;patch;delete
   157  // +kubebuilder:rbac:groups="container.cnrm.cloud.google.com",resources=containerclusters,verbs=create;get;list;update;patch;watch;delete
   158  // +kubebuilder:rbac:groups="container.cnrm.cloud.google.com",resources=containerclusters/status,verbs=get;update;patch
   159  // +kubebuilder:rbac:groups="container.cnrm.cloud.google.com",resources=containernodepools,verbs=create;get;list;update;patch;watch;delete
   160  // +kubebuilder:rbac:groups="container.cnrm.cloud.google.com",resources=containernodepools/status,verbs=get;update;patch
   161  // +kubebuilder:rbac:groups="iam.cnrm.cloud.google.com",resources=iamserviceaccounts;iamserviceaccountkeys,verbs=get;list;create;update;patch;watch;delete
   162  // +kubebuilder:rbac:groups="iam.cnrm.cloud.google.com",resources=iampolicymembers,verbs=get;list;create;update;patch;watch;delete
   163  // +kubebuilder:rbac:groups="iam.cnrm.cloud.google.com",resources=iamserviceaccounts/status;iamservicecaccountkeys/status;iampolicymembers/status,verbs=get;watch
   164  // +kubebuilder:rbac:groups="core.cnrm.cloud.google.com",resources=configconnectors,verbs=create;update;patch;watch
   165  // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=shipments,verbs=get;list;create;update;patch;delete;watch
   166  // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=shipments/status,verbs=get;watch
   167  // +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch
   168  // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
   169  // +kubebuilder:rbac:groups="",resources=secrets,verbs=get
   170  // +kubebuilder:rbac:groups="cert-manager.io",resources=certificates,verbs=get;list;create;patch;watch;delete
   171  
   172  // Reconcile is part of the main kubernetes reconciliation loop which aims to
   173  // move the current state of the cluster closer to the desired state.
   174  //
   175  //nolint:gocyclo
   176  func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
   177  	var (
   178  		reconcileStart = time.Now()
   179  		log            = ctrl.LoggerFrom(ctx)
   180  		result         = reconcile.ResultEmpty
   181  		cluster        = &clusterApi.Cluster{}
   182  	)
   183  
   184  	if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
   185  		return ctrl.Result{}, client.IgnoreNotFound(err)
   186  	}
   187  	r.Metrics.RecordReconciling(ctx, cluster)
   188  	updateReconcileMetadata(ctx, cluster, reconcileStart)
   189  
   190  	oldStatus := cluster.Status.DeepCopy()
   191  	if oldStatus.Inventory == nil {
   192  		oldStatus.Inventory = &inventory.ResourceInventory{Entries: make([]inventory.ResourceRef, 0)}
   193  	}
   194  	newInv := &inventory.ResourceInventory{Entries: make([]inventory.ResourceRef, 0)}
   195  	patcher := patch.NewSerialPatcher(cluster, r.Client)
   196  
   197  	conditions.MarkReconciling(cluster, status.ReconcilingCondition, clusterApi.ReconcilingReason)
   198  
   199  	defer func() {
   200  		// by default we want to set the inventory to inventory generated in the reconcile loop
   201  		cluster.Status.Inventory = newInv
   202  		if recErr != nil {
   203  			//if there is an error though we want to merge with the previous inventory since it may be missing manifests that aren't pruned.
   204  			cluster.Status.Inventory = inventory.Merge(oldStatus.Inventory, newInv)
   205  			reconcileErr, ok := recErr.(recerr.Error)
   206  			if !ok {
   207  				reconcileErr = recerr.New(recErr, clusterApi.ReconcileFailedReason)
   208  			}
   209  			reconcileErr.ToCondition(cluster, status.ReadyCondition)
   210  		}
   211  
   212  		res, recErr = r.summarize(ctx, patcher, recErr, cluster, result)
   213  		r.Metrics.RecordDuration(ctx, cluster, reconcileStart)
   214  		r.Metrics.RecordReadiness(ctx, cluster)
   215  		r.EdgeDB.RecordInfraStatus(ctx, cluster, *r.Recorder)
   216  		pluginmetrics.New().RecordRegisteredPluginsCountMetric(plugins.Count())
   217  		deleteResourceEntry(cluster)
   218  	}()
   219  
   220  	// Check if finalizer exists
   221  	if !controllerutil.ContainsFinalizer(cluster, clusterApi.Finalizer) {
   222  		controllerutil.AddFinalizer(cluster, clusterApi.Finalizer)
   223  
   224  		// Return immediately so that we requeue and reconcile object with finalizer
   225  		// added.
   226  		result = reconcile.ResultRequeue
   227  		return
   228  	}
   229  
   230  	log = log.WithValues("name", cluster.Spec.Name, "spec", cluster.Spec, "cluster reconciler concurrency", r.Concurrency, "helm cache length", r.HelmCache.Len())
   231  	ctx = logr.NewContext(ctx, log)
   232  
   233  	// If a deletion timestamp has been set, execute finalizer logic
   234  	if !cluster.ObjectMeta.DeletionTimestamp.IsZero() { //nolint:nestif
   235  		log.Info("deletion detected, executing finalizers")
   236  		pluginResult, err := plugins.ExecuteFinalizers(ctx, r.Client, cluster, r.Config.PluginConcurrency)
   237  		if err != nil {
   238  			log.Error(err, ErrPluginFinalizerFailed)
   239  			recErr = err
   240  			return
   241  		}
   242  		if pluginResult.Requeue {
   243  			result = reconcile.ResultRequeue
   244  			return
   245  		}
   246  		if cluster.Status.Inventory != nil {
   247  			inv, err := inventory.ListObjects(cluster.Status.Inventory)
   248  			if err != nil {
   249  				log.Error(err, "failed to get objects to cleanup from inventory", "cluster", cluster.Name)
   250  				recErr = err
   251  				return
   252  			}
   253  			_, err = r.ResourceManager.DeleteAll(ctx, inv, sap.DefaultDeleteOptions())
   254  			if err != nil {
   255  				log.Error(err, "failed to cleanup objects from inventory", "cluster", cluster.Name)
   256  				recErr = err
   257  				return
   258  			}
   259  		}
   260  		controllerutil.RemoveFinalizer(cluster, clusterApi.Finalizer)
   261  		log.Info("finalizer executed")
   262  		return
   263  	}
   264  
   265  	conditions.Delete(cluster, status.DependenciesReadyCondition)
   266  
   267  	log.Info("reconciling started for cluster")
   268  	var unstructuredObjs []*unstructured.Unstructured
   269  
   270  	// Check if cluster spec is valid
   271  	if err := cluster.Spec.Valid(); err != nil {
   272  		log.Error(err, ErrInvalidCluster)
   273  		recErr = recerr.NewStalled(fmt.Errorf("invalid spec: %w", err), clusterApi.InvalidSpecReason)
   274  		return
   275  	}
   276  
   277  	if err := reconcile.Progressing(ctx, cluster, patcher, r.PatchOpts()...); err != nil {
   278  		recErr = recerr.New(err, clusterApi.ReconcileFailedReason)
   279  		return
   280  	}
   281  
   282  	ns := buildNamespace(cluster)
   283  	uobj, err := unstructuredutil.ToUnstructured(ns)
   284  	if err != nil {
   285  		recErr = recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err),
   286  			clusterApi.ApplyFailedReason)
   287  		return
   288  	}
   289  	unstructuredObjs = append(unstructuredObjs, uobj)
   290  
   291  	// Create the Namespace before applying the shipment so ssa is happy
   292  	err = client.IgnoreAlreadyExists(r.Create(ctx, ns))
   293  	if err != nil {
   294  		recErr = recerr.New(err, clusterApi.NamespaceCreationFailedReason)
   295  		return
   296  	}
   297  
   298  	if shouldCreateGKECluster(cluster) {
   299  		gkeCluster := gkeClusterApi.New(cluster.Spec.ProjectID,
   300  			cluster.Spec.Banner,
   301  			cluster.Spec.Organization,
   302  			cluster.Spec.Name,
   303  			cluster.Spec.Location,
   304  			cluster.Spec.NodeVersion,
   305  			cluster.Spec.NumNode,
   306  			cluster.Spec.Fleet,
   307  			cluster.Name)
   308  		gkeCluster.Spec.Autoscale = cluster.Spec.Autoscale
   309  		gkeCluster.Spec.MinNodes = cluster.Spec.MinNodes
   310  		gkeCluster.Spec.MaxNodes = cluster.Spec.MaxNodes
   311  		gkeCluster.ObjectMeta.OwnerReferences = cluster.NewOwnerReference()
   312  		uobj, err := unstructuredutil.ToUnstructured(gkeCluster)
   313  		if err != nil {
   314  			recErr = recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err),
   315  				clusterApi.ApplyFailedReason)
   316  			return
   317  		}
   318  		unstructuredObjs = append(unstructuredObjs, uobj)
   319  		log.WithValues(gkeClusterApi.Name, gkeCluster.Spec).Info("GKECluster created")
   320  	}
   321  
   322  	shipments, recErr := r.generateShipments(ctx, cluster)
   323  	if recErr != nil {
   324  		return
   325  	}
   326  	unstructuredObjs = append(unstructuredObjs, shipments...)
   327  
   328  	// check the unstructured objects and ensure they set the kind.
   329  	var kindErr = fmt.Errorf("unstructured object must contain kind information")
   330  	for i, obj := range unstructuredObjs {
   331  		if "" == obj.GetKind() {
   332  			var uj, err = obj.MarshalJSON()
   333  			var labels = []interface{}{"index", i, "unstructured", string(uj)}
   334  			if err != nil {
   335  				labels = append(labels, "marshalError", err.Error(), "obj", obj)
   336  			}
   337  			log.Error(kindErr, "kind missing", labels...)
   338  		}
   339  	}
   340  
   341  	r.ResourceManager.SetOwnerLabels(unstructuredObjs, r.Name, "")
   342  
   343  	// When the "kind" is missing, the Apply below will fail. Use the logs above when encountering this error.
   344  	changeSet, err := r.ResourceManager.ApplyAll(ctx, unstructuredObjs, sap.ApplyOptions{
   345  		Force:       true,
   346  		WaitTimeout: r.WaitForSetTimeout,
   347  	})
   348  	if err != nil {
   349  		recErr = recerr.New(fmt.Errorf("failed to apply resources: %w", err), clusterApi.ApplyFailedReason)
   350  		return
   351  	}
   352  
   353  	newInv = inventory.New(inventory.FromSapChangeSet(changeSet))
   354  
   355  	pluginResult, pluginChangeSet, err := plugins.Execute(ctx, r.Name, r.ResourceManager, cluster, r.HelmCache, r.Config.PluginConcurrency)
   356  	if pluginChangeSet != nil {
   357  		changeSet.Append(pluginChangeSet.Entries)
   358  		newInv = &inventory.ResourceInventory{Entries: newInv.Union(inventory.New(inventory.FromSapChangeSet(changeSet)))}
   359  	}
   360  	if err != nil {
   361  		log.Error(err, ErrPluginFailed)
   362  		waitErr := recerr.NewWait(err, clusterApi.PluginFailedReason, r.DefaultRequeue)
   363  		waitErr.ToCondition(cluster, status.ReadyCondition)
   364  		recErr = waitErr
   365  		return
   366  	}
   367  	if pluginResult.Requeue {
   368  		result = reconcile.ResultRequeue
   369  		return
   370  	}
   371  	log.Info("applied objects", "changeset", changeSet.ToMap())
   372  
   373  	prune, err := ff.FeatureEnabledForContext(ff.NewClusterContext(cluster.Name), ff.UseClusterCTLPruning, true)
   374  	if err != nil {
   375  		log.Error(err, "unable to get ld flag for prunning, defaulting to prune enabled")
   376  	}
   377  	if oldStatus.Inventory != nil && prune { //nolint:nestif
   378  		diff, err := inventory.Diff(oldStatus.Inventory, newInv)
   379  		if err != nil {
   380  			recErr = recerr.New(err, clusterApi.PruneFailedReason)
   381  			return
   382  		}
   383  		if len(diff) > 0 {
   384  			opt := sap.DefaultDeleteOptions()
   385  			opt.Exclusions = map[string]string{plugins.PruneLabel: plugins.PruneDisabled}
   386  			changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, opt)
   387  			if err != nil {
   388  				recErr = recerr.New(err, clusterApi.PruneFailedReason)
   389  				return
   390  			}
   391  			if err := plugins.Prune(ctx, changeSet, r.Config.PluginConcurrency, cluster.Name); err != nil {
   392  				log.Error(err, "failed to execute plugins prune side effect")
   393  			}
   394  			log.Info("pruned objects", "changeset", changeSet.ToMap())
   395  		}
   396  	}
   397  
   398  	// check if we are already waiting to update wait for set status for this cluster
   399  	_, inProgressWaitForSet := r.WaitForSetMap.LoadOrStore(cluster.Name, true)
   400  	if r.WaitForSetTimeout != 0 && !inProgressWaitForSet {
   401  		go func() {
   402  			r.c <- 1
   403  			defer func() { r.WaitForSetMap.Delete(cluster.Name); <-r.c }()
   404  			backgroundCtx := context.Background()
   405  			err = r.ResourceManager.WaitForSet(backgroundCtx, changeSet.ToObjMetadataSet(), sap.WaitOptions{
   406  				Timeout: r.WaitForSetTimeout,
   407  			})
   408  			if err != nil {
   409  				log.Error(err, "timeout waiting for wait for set status", "cluster", cluster.Name)
   410  				waitErr := recerr.NewWait(err, clusterApi.TimeOutReason, r.DefaultRequeue)
   411  				waitErr.ToCondition(cluster, status.ReadyCondition)
   412  				conditions.MarkStalled(cluster, clusterApi.TimeOutReason, "timeout waiting for wait for set status")
   413  				res, recErr = r.summarize(backgroundCtx, patcher, waitErr, cluster, reconcile.ResultRequeue)
   414  				return
   415  			}
   416  			log.Info("cluster reconciled successfully")
   417  			conditions.MarkTrue(cluster, status.ReadyCondition, clusterApi.ClusterReadyReason, "cluster reconciled successfully")
   418  			conditions.Delete(cluster, status.StalledCondition)
   419  			result = reconcile.ResultSuccess
   420  			res, recErr = r.summarize(backgroundCtx, patcher, recErr, cluster, result)
   421  		}()
   422  	}
   423  
   424  	log.Info("cluster manifests applied successfully, waiting for status")
   425  
   426  	result = reconcile.ResultSuccess
   427  
   428  	// set status as ready after manifest first applied while waiting for wait for set status
   429  	if !oldStatus.HasReadyConditionWithReason(clusterApi.ClusterReadyReason) && !oldStatus.HasReadyConditionWithReason(clusterApi.TimeOutReason) {
   430  		conditions.MarkTrue(cluster, status.ReadyCondition, clusterApi.ManifestsAppliedReason, "cluster manifests applied successfully, waiting for status")
   431  	}
   432  	return
   433  }
   434  
   435  func (r *ClusterReconciler) summarize(ctx context.Context, patcher *patch.SerialPatcher, recErr error, cluster *clusterApi.Cluster, result reconcile.Result) (ctrl.Result, error) {
   436  	summarizer := reconcile.NewSummarizer(patcher)
   437  	res, recErr := summarizer.SummarizeAndPatch(ctx, cluster, []reconcile.SummarizeOption{
   438  		reconcile.WithConditions(r.Conditions),
   439  		reconcile.WithResult(result),
   440  		reconcile.WithError(recErr),
   441  		reconcile.WithIgnoreNotFound(),
   442  		reconcile.WithProcessors(
   443  			reconcile.RecordReconcileReq,
   444  			reconcile.RecordResult,
   445  		),
   446  		reconcile.WithFieldOwner(r.Name),
   447  		reconcile.WithEventRecorder(r.EventRecorder),
   448  	}...)
   449  	return res, recErr
   450  }
   451  
   452  // nolint
   453  func (r *ClusterReconciler) generateShipments(ctx context.Context, cluster *clusterApi.Cluster) ([]*unstructured.Unstructured, recerr.Error) {
   454  	var err error
   455  	var unstructuredObjs []*unstructured.Unstructured
   456  	var uobj *unstructured.Unstructured
   457  	var sobj *syncedobjectApi.SyncedObject
   458  
   459  	capabilities := generator.InfraCapabilities
   460  
   461  	switch {
   462  	case cluster.IsBasicStore():
   463  		capabilities = generator.BasicStoreCapabilities
   464  	case cluster.IsStore():
   465  		capabilities = generator.StoreCapabilities
   466  	}
   467  
   468  	clusterShipmentInfo := generator.ClusterRenderParams{
   469  		ClusterType:              cluster.Spec.Type.String(),
   470  		UUID:                     cluster.Name,
   471  		Region:                   r.Config.GCPRegion,
   472  		Zone:                     r.Config.GCPZone,
   473  		GCPProjectID:             cluster.Spec.ProjectID,
   474  		BannerID:                 cluster.Spec.BannerEdgeID,
   475  		ForemanGCPProjectID:      r.Config.TopLevelProjectID,
   476  		Domain:                   r.Config.Domain,
   477  		BSLEndpoint:              r.Config.BSLConfig.Endpoint,
   478  		BSLEdgeEnvPrefix:         r.Config.BSLConfig.OrganizationPrefix,
   479  		BSLRootOrg:               r.Config.BSLConfig.Root,
   480  		DatasyncDNSZone:          r.Config.DatasyncDNSZone,
   481  		DatasyncDNSName:          r.Config.DatasyncDNSName,
   482  		DatabaseName:             r.Config.DatabaseName,
   483  		EdgeSecMaxLeasePeriod:    r.Config.EdgeSecMaxLeasePeriod,
   484  		EdgeSecMaxValidityPeriod: r.Config.EdgeSecMaxValidityPeriod,
   485  		GCPForemanProjectNumber:  r.Config.GCPForemanProjectNumber,
   486  	}
   487  
   488  	terminalService := services.NewTerminalService(r.Config.DB, nil)
   489  	activationCodeService := edgenode.NewActivationCodeService(r.Config.DB, nil, nil, nil, nil, nil, nil)
   490  
   491  	terminals, err := terminalService.GetTerminalsByClusterID(ctx, cluster.Name)
   492  	if err != nil {
   493  		ctrl.LoggerFrom(ctx).Error(err, "unable to get terminals, defaulting to store pallet only")
   494  	}
   495  	registeredTerminals := []*model.Terminal{}
   496  	for _, ts := range terminals {
   497  		code, err := activationCodeService.Fetch(ctx, ts.TerminalID)
   498  		if err != nil {
   499  			ctrl.LoggerFrom(ctx).Error(err, "unable to get terminal activation code, not adding terminal to count", "terminal_id", ts.TerminalID)
   500  			continue
   501  		}
   502  		//if node registered add to the list of registered nodes todo change this to check node status when added to db
   503  		if code == nil || len(*code) == 0 {
   504  			registeredTerminals = append(registeredTerminals, ts)
   505  		}
   506  	}
   507  	if len(registeredTerminals) != 0 {
   508  		if err := r.manageRegisteredTerminalLabels(ctx, cluster, len(registeredTerminals)); err != nil {
   509  			return nil, err
   510  		}
   511  	}
   512  
   513  	// use the latest fleet package (including non-store clusters e.g. "cluster-infra") by default for non-store clusters
   514  	version := latestTag
   515  	pallets := []whv1.BaseArtifact{{Name: cluster.Spec.Fleet.String(), Tag: version}}
   516  
   517  	if cluster.IsStore() { //nolint:nestif
   518  		artifacts, err := r.EdgeDB.GetClusterArtifactVersions(ctx, cluster.Name)
   519  		if err != nil {
   520  			return nil, recerr.New(fmt.Errorf("failed to get pallets from Edge DB for store %s: %w", cluster.Name, err), clusterApi.ReconcileFailedReason)
   521  		} else if len(artifacts) == 0 {
   522  			return nil, recerr.New(fmt.Errorf("no pallets found in Edge DB for store %s: %w", cluster.Name, err), clusterApi.ReconcileFailedReason)
   523  		}
   524  
   525  		// set pallets to be scheduled to cluster
   526  		pallets = []whv1.BaseArtifact{}
   527  		for _, a := range artifacts {
   528  			pallets = append(pallets, a.ToBaseArtifact())
   529  			version = a.Version
   530  		}
   531  		// In Order to make Datasync an optional feature, the couchdb replication external secret
   532  		// was moved to its own pallet
   533  		// We check to see if the banner in which the cluster resides has datasync enabled
   534  		// if it does, we add the couchdb replication secret pallet.
   535  		// This is added so the shipment ready state is not affected in banners that do not have datasync enabled.
   536  		if !cluster.IsGKE() {
   537  			supportsOptDatasync, err := compatibility.Compare(compatibility.GreaterThanOrEqual, version, "0.20")
   538  			if err == nil {
   539  				supportsOptDatasyncBetween, err := compatibility.Compare(compatibility.LessThan, version, "0.21")
   540  				if err == nil {
   541  					if supportsOptDatasync && supportsOptDatasyncBetween {
   542  						capabilitySVC := services.NewCapabilityService(r.Config.DB, nil, nil, "")
   543  						enablements, err := capabilitySVC.ListCapabilitiesByBanner(ctx, &cluster.Spec.BannerEdgeID)
   544  						switch {
   545  						case err != nil:
   546  							ctrl.LoggerFrom(ctx).Error(err, "unable to get enablements for banner, defaulting to store pallet only")
   547  						default:
   548  							for _, enablement := range enablements {
   549  								if enablement.Name == "couchdb" {
   550  									ctrl.LoggerFrom(ctx).Info("couchdb-repl-secret added to store shipment", "tag", version)
   551  									pallets = append(pallets, whv1.BaseArtifact{Name: "couchdb-repl-secret", Tag: version})
   552  									break
   553  								}
   554  							}
   555  						}
   556  					}
   557  				}
   558  			} else {
   559  				ctrl.LoggerFrom(ctx).Error(err, "unable to compare fleet version against static supported optional datasync version of atleast 0.20", "tag", version)
   560  			}
   561  		}
   562  	}
   563  
   564  	// compare the version of the artifact with the static version 0.18.0
   565  	// if the version is less than the static version (0.18.0) -> the cluster_location param should be added
   566  	// if the version is greater than or equal to static version (0.18.0) -> the gcp_zone and gcp_region should be added
   567  	// version can be 0.18.0, 0.17.0, sha256:abc, latest
   568  	// TODO(pa250194_ncrvoyix): support sha256:abc
   569  	supportLegacy := false
   570  	switch {
   571  	case strings.Contains(version, "."):
   572  		support, err := compareVersions(version)
   573  		if err != nil {
   574  			return nil, recerr.New(fmt.Errorf("unable to compare versions: %s and %s: %w", version, "0.18", err), clusterApi.ReconcileFailedReason)
   575  		}
   576  		supportLegacy = support
   577  	case version == latestTag:
   578  		supportLegacy = false
   579  	}
   580  
   581  	shipmentOpts := &generator.ShipmentOpts{
   582  		Prune:                 true,
   583  		Force:                 true,
   584  		Pallets:               pallets,
   585  		Repository:            generator.GenerateShipmentRepo(r.Config.GCPRegion, r.Config.TopLevelProjectID),
   586  		Capabilities:          capabilities,
   587  		SupportLegacyVersions: supportLegacy && fleet.IsStoreCluster(cluster.Spec.Fleet),
   588  	}
   589  
   590  	shipmentOpts.AddClusterRenderParams(clusterShipmentInfo)
   591  
   592  	clusterShip, infraShip, err := shipmentOpts.BuildSplitShipments()
   593  	if err != nil {
   594  		return nil, recerr.NewStalled(fmt.Errorf("failed to create shipment for store %s: %w", cluster.Spec.Name, err), clusterApi.InvalidShipmentSpecReason)
   595  	}
   596  	infraShip.ObjectMeta.OwnerReferences = cluster.NewOwnerReference()
   597  	uobj, err = unstructuredutil.ToUnstructured(infraShip)
   598  	if err != nil {
   599  		return nil, recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), clusterApi.ApplyFailedReason)
   600  	}
   601  	unstructuredObjs = append(unstructuredObjs, uobj)
   602  	//add cluster shipment synced obj
   603  	sobj, err = k8objectsutils.BuildClusterSyncedObjectWithDir(cluster, clusterShip, constants.ClusterShipment, constants.ShipmentKustomizationDir)
   604  	if err != nil {
   605  		return nil, recerr.New(fmt.Errorf("failed to create cluster %s shipment synced object: %w", cluster.Spec.Name, err), clusterApi.ApplyFailedReason)
   606  	}
   607  	uobj, err = unstructuredutil.ToUnstructured(sobj)
   608  	if err != nil {
   609  		return nil, recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), clusterApi.ApplyFailedReason)
   610  	}
   611  	// add a second, temporary cluster shipment synced obj to fix issues caused by renaming the original one
   612  	tempSO, err := k8objectsutils.BuildClusterSyncedObjectWithDir(cluster, clusterShip, constants.ClusterShipmentOld, constants.ShipmentKustomizationDir)
   613  	if err != nil {
   614  		return nil, recerr.New(fmt.Errorf("failed to create cluster %s shipment synced object: %w", cluster.Spec.Name, err), clusterApi.ApplyFailedReason)
   615  	}
   616  	unstructuredTempSO, err := unstructuredutil.ToUnstructured(tempSO)
   617  	if err != nil {
   618  		return nil, recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), clusterApi.ApplyFailedReason)
   619  	}
   620  	unstructuredObjs = append(unstructuredObjs, uobj, unstructuredTempSO)
   621  	return unstructuredObjs, nil
   622  }
   623  
   624  func buildNamespace(cluster *clusterApi.Cluster) *corev1.Namespace {
   625  	return &corev1.Namespace{
   626  		TypeMeta: metav1.TypeMeta{
   627  			Kind:       "Namespace",
   628  			APIVersion: corev1.SchemeGroupVersion.String(),
   629  		},
   630  		ObjectMeta: metav1.ObjectMeta{
   631  			Name: cluster.Name,
   632  			Annotations: map[string]string{
   633  				meta.ProjectAnnotation: cluster.Spec.ProjectID,
   634  			},
   635  			OwnerReferences: cluster.NewOwnerReference(),
   636  		},
   637  	}
   638  }
   639  
   640  // Check if a GKECluster should be created for the cluster. If a fleet label exists,
   641  // then it is a legacy-gke-store-cluster that needs a GKECluster
   642  func shouldCreateGKECluster(cluster *clusterApi.Cluster) bool {
   643  	if cluster.Spec.Type == clusterConstant.GKE && !cluster.IsStore() {
   644  		return true
   645  	} else if _, ok := cluster.Labels[fleet.Label]; ok {
   646  		return true
   647  	}
   648  	return false
   649  }
   650  
   651  func (r *ClusterReconciler) manageRegisteredTerminalLabels(ctx context.Context, cluster *clusterApi.Cluster, numRegisteredTerminals int) recerr.Error {
   652  	// get descheduler label from edge capability labels
   653  	labelService := services.NewLabelService(artifacts.NewArtifactsService(r.EdgeDB.DB, nil), r.EdgeDB.DB)
   654  	existingLabels, err := labelService.GetLabels(ctx, &cluster.Spec.BannerEdgeID)
   655  	if err != nil {
   656  		return recerr.New(fmt.Errorf("unable to get banner labels: %w", err), clusterApi.ReconcileFailedReason)
   657  	}
   658  
   659  	// get descheduler edge capability label
   660  	edgeCapabilityLabels := edgeCapabilities.GetCapabilityLabels(existingLabels, edgeCapabilities.DeschedulerPallet)
   661  
   662  	if len(edgeCapabilityLabels) != 0 {
   663  		if err := r.updateDeschedulerLabels(ctx, labelService, numRegisteredTerminals, cluster.Name, edgeCapabilityLabels[0].LabelEdgeID); err != nil {
   664  			return recerr.New(fmt.Errorf("unable to update descheduler label: %w", err), clusterApi.ReconcileFailedReason)
   665  		}
   666  	} else {
   667  		ctrl.LoggerFrom(ctx).Error(fmt.Errorf("no descheduler capability label found"), "descheduler pallet not part of edge capabilities")
   668  	}
   669  	return nil
   670  }
   671  
   672  func (r *ClusterReconciler) updateDeschedulerLabels(ctx context.Context, labelService services.LabelService, registeredTerminals int, clusterName, deschedulerLabelEdgeID string) error {
   673  	if registeredTerminals <= 1 {
   674  		return labelService.DeleteClusterLabels(ctx, &clusterName, &deschedulerLabelEdgeID)
   675  	}
   676  	label, err := labelService.GetClusterLabels(ctx, &clusterName, &deschedulerLabelEdgeID)
   677  	if err != nil {
   678  		return err
   679  	}
   680  	if label != nil {
   681  		return nil
   682  	}
   683  	return labelService.CreateClusterLabel(ctx, clusterName, deschedulerLabelEdgeID)
   684  }
   685  
   686  func (r *ClusterReconciler) setResourceManager() error {
   687  	if r.ResourceManager == nil {
   688  		sapMngr, err := sap.NewResourceManagerFromConfig(r.manager.GetConfig(),
   689  			client.Options{},
   690  			sap.Owner{Field: r.Name, Group: OwnerGroupLabel})
   691  		if err != nil {
   692  			return err
   693  		}
   694  		r.ResourceManager = sapMngr
   695  	}
   696  	return nil
   697  }
   698  
   699  func compareVersions(versionValue string) (bool, error) {
   700  	staticVerifiedVersion, err := version.NewVersion("0.18")
   701  	if err != nil {
   702  		return false, err
   703  	}
   704  	fleetVersion, err := version.NewVersion(versionValue)
   705  	if err != nil {
   706  		return false, err
   707  	}
   708  	return staticVerifiedVersion.GreaterThan(fleetVersion), nil
   709  }
   710  

View as plain text