1 package clusterctl
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9
10 "github.com/go-logr/logr"
11 "github.com/hashicorp/go-version"
12 corev1 "k8s.io/api/core/v1"
13 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
15 "k8s.io/apimachinery/pkg/runtime"
16 kuberecorder "k8s.io/client-go/tools/record"
17 "k8s.io/client-go/util/workqueue"
18 ctrl "sigs.k8s.io/controller-runtime"
19 "sigs.k8s.io/controller-runtime/pkg/client"
20 "sigs.k8s.io/controller-runtime/pkg/controller"
21 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
22 "sigs.k8s.io/controller-runtime/pkg/event"
23 "sigs.k8s.io/controller-runtime/pkg/manager"
24 "sigs.k8s.io/controller-runtime/pkg/predicate"
25
26 "edge-infra.dev/pkg/edge/api/graph/model"
27 "edge-infra.dev/pkg/edge/api/services"
28 "edge-infra.dev/pkg/edge/api/services/artifacts"
29 "edge-infra.dev/pkg/edge/api/services/edgenode"
30 clusterApi "edge-infra.dev/pkg/edge/apis/cluster/v1alpha1"
31 gkeClusterApi "edge-infra.dev/pkg/edge/apis/gkecluster/v1alpha1"
32 syncedobjectApi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1"
33 edgeCapabilities "edge-infra.dev/pkg/edge/capabilities"
34 "edge-infra.dev/pkg/edge/compatibility"
35 "edge-infra.dev/pkg/edge/constants"
36 clusterConstant "edge-infra.dev/pkg/edge/constants/api/cluster"
37 "edge-infra.dev/pkg/edge/constants/api/fleet"
38 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins"
39 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/helmreleases/cache"
40 pluginmetrics "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/metrics"
41 "edge-infra.dev/pkg/edge/controllers/dbmetrics"
42 "edge-infra.dev/pkg/edge/controllers/util/edgedb"
43 "edge-infra.dev/pkg/edge/k8objectsutils"
44 "edge-infra.dev/pkg/edge/shipment/generator"
45 whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha1"
46 "edge-infra.dev/pkg/k8s/konfigkonnector/apis/meta"
47 "edge-infra.dev/pkg/k8s/meta/status"
48 "edge-infra.dev/pkg/k8s/runtime/conditions"
49 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
50 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
51 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
52 "edge-infra.dev/pkg/k8s/runtime/inventory"
53 "edge-infra.dev/pkg/k8s/runtime/patch"
54 "edge-infra.dev/pkg/k8s/runtime/sap"
55 unstructuredutil "edge-infra.dev/pkg/k8s/unstructured"
56 ff "edge-infra.dev/pkg/lib/featureflag"
57 )
58
59 const (
60 ErrPluginFailed = "plugin failed"
61 ErrPluginFinalizerFailed = "plugin finalizer failed"
62 ErrInvalidCluster = "invalid Cluster spec"
63 ErrToUnstructured = "failed to convert %s/%s/%s to unstructured: %w"
64 latestTag = "latest"
65 )
66
67 var (
68
69 OwnerGroupLabel = fmt.Sprintf("%s.%s", strings.ToLower(clusterApi.Kind), clusterApi.ClusterGVK.Group)
70 )
71
72
73
74
75 var clusterConditions = reconcile.Conditions{
76 Target: status.ReadyCondition,
77 Owned: []string{
78 status.ReadyCondition,
79 status.ReconcilingCondition,
80 status.StalledCondition,
81 },
82 Summarize: []string{
83 status.StalledCondition,
84 },
85 NegativePolarity: []string{
86 status.ReconcilingCondition,
87 status.StalledCondition,
88 },
89 }
90
91
92 type ClusterReconciler struct {
93 client.Client
94 kuberecorder.EventRecorder
95 manager manager.Manager
96 Scheme *runtime.Scheme
97 Log logr.Logger
98 Metrics metrics.Metrics
99 Config *Config
100 DefaultRequeue time.Duration
101 WaitForSetTimeout time.Duration
102
103
104 ResourceManager *sap.ResourceManager
105 Name string
106 Conditions reconcile.Conditions
107 EdgeDB *edgedb.EdgeDB
108 Recorder *dbmetrics.DBMetrics
109 Concurrency int
110 WaitForSetMap *sync.Map
111 c chan int
112 HelmCache cache.Provider
113 }
114
115 func clusterReconcilerPredicate() predicate.Predicate {
116 return predicate.Funcs{
117 UpdateFunc: func(e event.UpdateEvent) bool {
118 return !e.ObjectNew.GetDeletionTimestamp().IsZero()
119 },
120 CreateFunc: func(_ event.CreateEvent) bool {
121 return true
122 },
123 DeleteFunc: func(_ event.DeleteEvent) bool {
124 return false
125 },
126 }
127 }
128
129
130 func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
131 return ctrl.NewControllerManagedBy(mgr).
132 For(&clusterApi.Cluster{}).
133 WithOptions(controller.Options{
134 MaxConcurrentReconciles: r.Concurrency,
135 RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond, 120*time.Second),
136 }).
137 WithEventFilter(clusterReconcilerPredicate()).
138 Complete(r)
139 }
140
141 func (r *ClusterReconciler) PatchOpts() []patch.Option {
142 return []patch.Option{
143 patch.WithOwnedConditions{Conditions: r.Conditions.Owned},
144 patch.WithFieldOwner(r.Name),
145 }
146 }
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176 func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
177 var (
178 reconcileStart = time.Now()
179 log = ctrl.LoggerFrom(ctx)
180 result = reconcile.ResultEmpty
181 cluster = &clusterApi.Cluster{}
182 )
183
184 if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
185 return ctrl.Result{}, client.IgnoreNotFound(err)
186 }
187 r.Metrics.RecordReconciling(ctx, cluster)
188 updateReconcileMetadata(ctx, cluster, reconcileStart)
189
190 oldStatus := cluster.Status.DeepCopy()
191 if oldStatus.Inventory == nil {
192 oldStatus.Inventory = &inventory.ResourceInventory{Entries: make([]inventory.ResourceRef, 0)}
193 }
194 newInv := &inventory.ResourceInventory{Entries: make([]inventory.ResourceRef, 0)}
195 patcher := patch.NewSerialPatcher(cluster, r.Client)
196
197 conditions.MarkReconciling(cluster, status.ReconcilingCondition, clusterApi.ReconcilingReason)
198
199 defer func() {
200
201 cluster.Status.Inventory = newInv
202 if recErr != nil {
203
204 cluster.Status.Inventory = inventory.Merge(oldStatus.Inventory, newInv)
205 reconcileErr, ok := recErr.(recerr.Error)
206 if !ok {
207 reconcileErr = recerr.New(recErr, clusterApi.ReconcileFailedReason)
208 }
209 reconcileErr.ToCondition(cluster, status.ReadyCondition)
210 }
211
212 res, recErr = r.summarize(ctx, patcher, recErr, cluster, result)
213 r.Metrics.RecordDuration(ctx, cluster, reconcileStart)
214 r.Metrics.RecordReadiness(ctx, cluster)
215 r.EdgeDB.RecordInfraStatus(ctx, cluster, *r.Recorder)
216 pluginmetrics.New().RecordRegisteredPluginsCountMetric(plugins.Count())
217 deleteResourceEntry(cluster)
218 }()
219
220
221 if !controllerutil.ContainsFinalizer(cluster, clusterApi.Finalizer) {
222 controllerutil.AddFinalizer(cluster, clusterApi.Finalizer)
223
224
225
226 result = reconcile.ResultRequeue
227 return
228 }
229
230 log = log.WithValues("name", cluster.Spec.Name, "spec", cluster.Spec, "cluster reconciler concurrency", r.Concurrency, "helm cache length", r.HelmCache.Len())
231 ctx = logr.NewContext(ctx, log)
232
233
234 if !cluster.ObjectMeta.DeletionTimestamp.IsZero() {
235 log.Info("deletion detected, executing finalizers")
236 pluginResult, err := plugins.ExecuteFinalizers(ctx, r.Client, cluster, r.Config.PluginConcurrency)
237 if err != nil {
238 log.Error(err, ErrPluginFinalizerFailed)
239 recErr = err
240 return
241 }
242 if pluginResult.Requeue {
243 result = reconcile.ResultRequeue
244 return
245 }
246 if cluster.Status.Inventory != nil {
247 inv, err := inventory.ListObjects(cluster.Status.Inventory)
248 if err != nil {
249 log.Error(err, "failed to get objects to cleanup from inventory", "cluster", cluster.Name)
250 recErr = err
251 return
252 }
253 _, err = r.ResourceManager.DeleteAll(ctx, inv, sap.DefaultDeleteOptions())
254 if err != nil {
255 log.Error(err, "failed to cleanup objects from inventory", "cluster", cluster.Name)
256 recErr = err
257 return
258 }
259 }
260 controllerutil.RemoveFinalizer(cluster, clusterApi.Finalizer)
261 log.Info("finalizer executed")
262 return
263 }
264
265 conditions.Delete(cluster, status.DependenciesReadyCondition)
266
267 log.Info("reconciling started for cluster")
268 var unstructuredObjs []*unstructured.Unstructured
269
270
271 if err := cluster.Spec.Valid(); err != nil {
272 log.Error(err, ErrInvalidCluster)
273 recErr = recerr.NewStalled(fmt.Errorf("invalid spec: %w", err), clusterApi.InvalidSpecReason)
274 return
275 }
276
277 if err := reconcile.Progressing(ctx, cluster, patcher, r.PatchOpts()...); err != nil {
278 recErr = recerr.New(err, clusterApi.ReconcileFailedReason)
279 return
280 }
281
282 ns := buildNamespace(cluster)
283 uobj, err := unstructuredutil.ToUnstructured(ns)
284 if err != nil {
285 recErr = recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err),
286 clusterApi.ApplyFailedReason)
287 return
288 }
289 unstructuredObjs = append(unstructuredObjs, uobj)
290
291
292 err = client.IgnoreAlreadyExists(r.Create(ctx, ns))
293 if err != nil {
294 recErr = recerr.New(err, clusterApi.NamespaceCreationFailedReason)
295 return
296 }
297
298 if shouldCreateGKECluster(cluster) {
299 gkeCluster := gkeClusterApi.New(cluster.Spec.ProjectID,
300 cluster.Spec.Banner,
301 cluster.Spec.Organization,
302 cluster.Spec.Name,
303 cluster.Spec.Location,
304 cluster.Spec.NodeVersion,
305 cluster.Spec.NumNode,
306 cluster.Spec.Fleet,
307 cluster.Name)
308 gkeCluster.Spec.Autoscale = cluster.Spec.Autoscale
309 gkeCluster.Spec.MinNodes = cluster.Spec.MinNodes
310 gkeCluster.Spec.MaxNodes = cluster.Spec.MaxNodes
311 gkeCluster.ObjectMeta.OwnerReferences = cluster.NewOwnerReference()
312 uobj, err := unstructuredutil.ToUnstructured(gkeCluster)
313 if err != nil {
314 recErr = recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err),
315 clusterApi.ApplyFailedReason)
316 return
317 }
318 unstructuredObjs = append(unstructuredObjs, uobj)
319 log.WithValues(gkeClusterApi.Name, gkeCluster.Spec).Info("GKECluster created")
320 }
321
322 shipments, recErr := r.generateShipments(ctx, cluster)
323 if recErr != nil {
324 return
325 }
326 unstructuredObjs = append(unstructuredObjs, shipments...)
327
328
329 var kindErr = fmt.Errorf("unstructured object must contain kind information")
330 for i, obj := range unstructuredObjs {
331 if "" == obj.GetKind() {
332 var uj, err = obj.MarshalJSON()
333 var labels = []interface{}{"index", i, "unstructured", string(uj)}
334 if err != nil {
335 labels = append(labels, "marshalError", err.Error(), "obj", obj)
336 }
337 log.Error(kindErr, "kind missing", labels...)
338 }
339 }
340
341 r.ResourceManager.SetOwnerLabels(unstructuredObjs, r.Name, "")
342
343
344 changeSet, err := r.ResourceManager.ApplyAll(ctx, unstructuredObjs, sap.ApplyOptions{
345 Force: true,
346 WaitTimeout: r.WaitForSetTimeout,
347 })
348 if err != nil {
349 recErr = recerr.New(fmt.Errorf("failed to apply resources: %w", err), clusterApi.ApplyFailedReason)
350 return
351 }
352
353 newInv = inventory.New(inventory.FromSapChangeSet(changeSet))
354
355 pluginResult, pluginChangeSet, err := plugins.Execute(ctx, r.Name, r.ResourceManager, cluster, r.HelmCache, r.Config.PluginConcurrency)
356 if pluginChangeSet != nil {
357 changeSet.Append(pluginChangeSet.Entries)
358 newInv = &inventory.ResourceInventory{Entries: newInv.Union(inventory.New(inventory.FromSapChangeSet(changeSet)))}
359 }
360 if err != nil {
361 log.Error(err, ErrPluginFailed)
362 waitErr := recerr.NewWait(err, clusterApi.PluginFailedReason, r.DefaultRequeue)
363 waitErr.ToCondition(cluster, status.ReadyCondition)
364 recErr = waitErr
365 return
366 }
367 if pluginResult.Requeue {
368 result = reconcile.ResultRequeue
369 return
370 }
371 log.Info("applied objects", "changeset", changeSet.ToMap())
372
373 prune, err := ff.FeatureEnabledForContext(ff.NewClusterContext(cluster.Name), ff.UseClusterCTLPruning, true)
374 if err != nil {
375 log.Error(err, "unable to get ld flag for prunning, defaulting to prune enabled")
376 }
377 if oldStatus.Inventory != nil && prune {
378 diff, err := inventory.Diff(oldStatus.Inventory, newInv)
379 if err != nil {
380 recErr = recerr.New(err, clusterApi.PruneFailedReason)
381 return
382 }
383 if len(diff) > 0 {
384 opt := sap.DefaultDeleteOptions()
385 opt.Exclusions = map[string]string{plugins.PruneLabel: plugins.PruneDisabled}
386 changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, opt)
387 if err != nil {
388 recErr = recerr.New(err, clusterApi.PruneFailedReason)
389 return
390 }
391 if err := plugins.Prune(ctx, changeSet, r.Config.PluginConcurrency, cluster.Name); err != nil {
392 log.Error(err, "failed to execute plugins prune side effect")
393 }
394 log.Info("pruned objects", "changeset", changeSet.ToMap())
395 }
396 }
397
398
399 _, inProgressWaitForSet := r.WaitForSetMap.LoadOrStore(cluster.Name, true)
400 if r.WaitForSetTimeout != 0 && !inProgressWaitForSet {
401 go func() {
402 r.c <- 1
403 defer func() { r.WaitForSetMap.Delete(cluster.Name); <-r.c }()
404 backgroundCtx := context.Background()
405 err = r.ResourceManager.WaitForSet(backgroundCtx, changeSet.ToObjMetadataSet(), sap.WaitOptions{
406 Timeout: r.WaitForSetTimeout,
407 })
408 if err != nil {
409 log.Error(err, "timeout waiting for wait for set status", "cluster", cluster.Name)
410 waitErr := recerr.NewWait(err, clusterApi.TimeOutReason, r.DefaultRequeue)
411 waitErr.ToCondition(cluster, status.ReadyCondition)
412 conditions.MarkStalled(cluster, clusterApi.TimeOutReason, "timeout waiting for wait for set status")
413 res, recErr = r.summarize(backgroundCtx, patcher, waitErr, cluster, reconcile.ResultRequeue)
414 return
415 }
416 log.Info("cluster reconciled successfully")
417 conditions.MarkTrue(cluster, status.ReadyCondition, clusterApi.ClusterReadyReason, "cluster reconciled successfully")
418 conditions.Delete(cluster, status.StalledCondition)
419 result = reconcile.ResultSuccess
420 res, recErr = r.summarize(backgroundCtx, patcher, recErr, cluster, result)
421 }()
422 }
423
424 log.Info("cluster manifests applied successfully, waiting for status")
425
426 result = reconcile.ResultSuccess
427
428
429 if !oldStatus.HasReadyConditionWithReason(clusterApi.ClusterReadyReason) && !oldStatus.HasReadyConditionWithReason(clusterApi.TimeOutReason) {
430 conditions.MarkTrue(cluster, status.ReadyCondition, clusterApi.ManifestsAppliedReason, "cluster manifests applied successfully, waiting for status")
431 }
432 return
433 }
434
435 func (r *ClusterReconciler) summarize(ctx context.Context, patcher *patch.SerialPatcher, recErr error, cluster *clusterApi.Cluster, result reconcile.Result) (ctrl.Result, error) {
436 summarizer := reconcile.NewSummarizer(patcher)
437 res, recErr := summarizer.SummarizeAndPatch(ctx, cluster, []reconcile.SummarizeOption{
438 reconcile.WithConditions(r.Conditions),
439 reconcile.WithResult(result),
440 reconcile.WithError(recErr),
441 reconcile.WithIgnoreNotFound(),
442 reconcile.WithProcessors(
443 reconcile.RecordReconcileReq,
444 reconcile.RecordResult,
445 ),
446 reconcile.WithFieldOwner(r.Name),
447 reconcile.WithEventRecorder(r.EventRecorder),
448 }...)
449 return res, recErr
450 }
451
452
453 func (r *ClusterReconciler) generateShipments(ctx context.Context, cluster *clusterApi.Cluster) ([]*unstructured.Unstructured, recerr.Error) {
454 var err error
455 var unstructuredObjs []*unstructured.Unstructured
456 var uobj *unstructured.Unstructured
457 var sobj *syncedobjectApi.SyncedObject
458
459 capabilities := generator.InfraCapabilities
460
461 switch {
462 case cluster.IsBasicStore():
463 capabilities = generator.BasicStoreCapabilities
464 case cluster.IsStore():
465 capabilities = generator.StoreCapabilities
466 }
467
468 clusterShipmentInfo := generator.ClusterRenderParams{
469 ClusterType: cluster.Spec.Type.String(),
470 UUID: cluster.Name,
471 Region: r.Config.GCPRegion,
472 Zone: r.Config.GCPZone,
473 GCPProjectID: cluster.Spec.ProjectID,
474 BannerID: cluster.Spec.BannerEdgeID,
475 ForemanGCPProjectID: r.Config.TopLevelProjectID,
476 Domain: r.Config.Domain,
477 BSLEndpoint: r.Config.BSLConfig.Endpoint,
478 BSLEdgeEnvPrefix: r.Config.BSLConfig.OrganizationPrefix,
479 BSLRootOrg: r.Config.BSLConfig.Root,
480 DatasyncDNSZone: r.Config.DatasyncDNSZone,
481 DatasyncDNSName: r.Config.DatasyncDNSName,
482 DatabaseName: r.Config.DatabaseName,
483 EdgeSecMaxLeasePeriod: r.Config.EdgeSecMaxLeasePeriod,
484 EdgeSecMaxValidityPeriod: r.Config.EdgeSecMaxValidityPeriod,
485 GCPForemanProjectNumber: r.Config.GCPForemanProjectNumber,
486 }
487
488 terminalService := services.NewTerminalService(r.Config.DB, nil)
489 activationCodeService := edgenode.NewActivationCodeService(r.Config.DB, nil, nil, nil, nil, nil, nil)
490
491 terminals, err := terminalService.GetTerminalsByClusterID(ctx, cluster.Name)
492 if err != nil {
493 ctrl.LoggerFrom(ctx).Error(err, "unable to get terminals, defaulting to store pallet only")
494 }
495 registeredTerminals := []*model.Terminal{}
496 for _, ts := range terminals {
497 code, err := activationCodeService.Fetch(ctx, ts.TerminalID)
498 if err != nil {
499 ctrl.LoggerFrom(ctx).Error(err, "unable to get terminal activation code, not adding terminal to count", "terminal_id", ts.TerminalID)
500 continue
501 }
502
503 if code == nil || len(*code) == 0 {
504 registeredTerminals = append(registeredTerminals, ts)
505 }
506 }
507 if len(registeredTerminals) != 0 {
508 if err := r.manageRegisteredTerminalLabels(ctx, cluster, len(registeredTerminals)); err != nil {
509 return nil, err
510 }
511 }
512
513
514 version := latestTag
515 pallets := []whv1.BaseArtifact{{Name: cluster.Spec.Fleet.String(), Tag: version}}
516
517 if cluster.IsStore() {
518 artifacts, err := r.EdgeDB.GetClusterArtifactVersions(ctx, cluster.Name)
519 if err != nil {
520 return nil, recerr.New(fmt.Errorf("failed to get pallets from Edge DB for store %s: %w", cluster.Name, err), clusterApi.ReconcileFailedReason)
521 } else if len(artifacts) == 0 {
522 return nil, recerr.New(fmt.Errorf("no pallets found in Edge DB for store %s: %w", cluster.Name, err), clusterApi.ReconcileFailedReason)
523 }
524
525
526 pallets = []whv1.BaseArtifact{}
527 for _, a := range artifacts {
528 pallets = append(pallets, a.ToBaseArtifact())
529 version = a.Version
530 }
531
532
533
534
535
536 if !cluster.IsGKE() {
537 supportsOptDatasync, err := compatibility.Compare(compatibility.GreaterThanOrEqual, version, "0.20")
538 if err == nil {
539 supportsOptDatasyncBetween, err := compatibility.Compare(compatibility.LessThan, version, "0.21")
540 if err == nil {
541 if supportsOptDatasync && supportsOptDatasyncBetween {
542 capabilitySVC := services.NewCapabilityService(r.Config.DB, nil, nil, "")
543 enablements, err := capabilitySVC.ListCapabilitiesByBanner(ctx, &cluster.Spec.BannerEdgeID)
544 switch {
545 case err != nil:
546 ctrl.LoggerFrom(ctx).Error(err, "unable to get enablements for banner, defaulting to store pallet only")
547 default:
548 for _, enablement := range enablements {
549 if enablement.Name == "couchdb" {
550 ctrl.LoggerFrom(ctx).Info("couchdb-repl-secret added to store shipment", "tag", version)
551 pallets = append(pallets, whv1.BaseArtifact{Name: "couchdb-repl-secret", Tag: version})
552 break
553 }
554 }
555 }
556 }
557 }
558 } else {
559 ctrl.LoggerFrom(ctx).Error(err, "unable to compare fleet version against static supported optional datasync version of atleast 0.20", "tag", version)
560 }
561 }
562 }
563
564
565
566
567
568
569 supportLegacy := false
570 switch {
571 case strings.Contains(version, "."):
572 support, err := compareVersions(version)
573 if err != nil {
574 return nil, recerr.New(fmt.Errorf("unable to compare versions: %s and %s: %w", version, "0.18", err), clusterApi.ReconcileFailedReason)
575 }
576 supportLegacy = support
577 case version == latestTag:
578 supportLegacy = false
579 }
580
581 shipmentOpts := &generator.ShipmentOpts{
582 Prune: true,
583 Force: true,
584 Pallets: pallets,
585 Repository: generator.GenerateShipmentRepo(r.Config.GCPRegion, r.Config.TopLevelProjectID),
586 Capabilities: capabilities,
587 SupportLegacyVersions: supportLegacy && fleet.IsStoreCluster(cluster.Spec.Fleet),
588 }
589
590 shipmentOpts.AddClusterRenderParams(clusterShipmentInfo)
591
592 clusterShip, infraShip, err := shipmentOpts.BuildSplitShipments()
593 if err != nil {
594 return nil, recerr.NewStalled(fmt.Errorf("failed to create shipment for store %s: %w", cluster.Spec.Name, err), clusterApi.InvalidShipmentSpecReason)
595 }
596 infraShip.ObjectMeta.OwnerReferences = cluster.NewOwnerReference()
597 uobj, err = unstructuredutil.ToUnstructured(infraShip)
598 if err != nil {
599 return nil, recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), clusterApi.ApplyFailedReason)
600 }
601 unstructuredObjs = append(unstructuredObjs, uobj)
602
603 sobj, err = k8objectsutils.BuildClusterSyncedObjectWithDir(cluster, clusterShip, constants.ClusterShipment, constants.ShipmentKustomizationDir)
604 if err != nil {
605 return nil, recerr.New(fmt.Errorf("failed to create cluster %s shipment synced object: %w", cluster.Spec.Name, err), clusterApi.ApplyFailedReason)
606 }
607 uobj, err = unstructuredutil.ToUnstructured(sobj)
608 if err != nil {
609 return nil, recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), clusterApi.ApplyFailedReason)
610 }
611
612 tempSO, err := k8objectsutils.BuildClusterSyncedObjectWithDir(cluster, clusterShip, constants.ClusterShipmentOld, constants.ShipmentKustomizationDir)
613 if err != nil {
614 return nil, recerr.New(fmt.Errorf("failed to create cluster %s shipment synced object: %w", cluster.Spec.Name, err), clusterApi.ApplyFailedReason)
615 }
616 unstructuredTempSO, err := unstructuredutil.ToUnstructured(tempSO)
617 if err != nil {
618 return nil, recerr.New(fmt.Errorf(ErrToUnstructured, uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), clusterApi.ApplyFailedReason)
619 }
620 unstructuredObjs = append(unstructuredObjs, uobj, unstructuredTempSO)
621 return unstructuredObjs, nil
622 }
623
624 func buildNamespace(cluster *clusterApi.Cluster) *corev1.Namespace {
625 return &corev1.Namespace{
626 TypeMeta: metav1.TypeMeta{
627 Kind: "Namespace",
628 APIVersion: corev1.SchemeGroupVersion.String(),
629 },
630 ObjectMeta: metav1.ObjectMeta{
631 Name: cluster.Name,
632 Annotations: map[string]string{
633 meta.ProjectAnnotation: cluster.Spec.ProjectID,
634 },
635 OwnerReferences: cluster.NewOwnerReference(),
636 },
637 }
638 }
639
640
641
642 func shouldCreateGKECluster(cluster *clusterApi.Cluster) bool {
643 if cluster.Spec.Type == clusterConstant.GKE && !cluster.IsStore() {
644 return true
645 } else if _, ok := cluster.Labels[fleet.Label]; ok {
646 return true
647 }
648 return false
649 }
650
651 func (r *ClusterReconciler) manageRegisteredTerminalLabels(ctx context.Context, cluster *clusterApi.Cluster, numRegisteredTerminals int) recerr.Error {
652
653 labelService := services.NewLabelService(artifacts.NewArtifactsService(r.EdgeDB.DB, nil), r.EdgeDB.DB)
654 existingLabels, err := labelService.GetLabels(ctx, &cluster.Spec.BannerEdgeID)
655 if err != nil {
656 return recerr.New(fmt.Errorf("unable to get banner labels: %w", err), clusterApi.ReconcileFailedReason)
657 }
658
659
660 edgeCapabilityLabels := edgeCapabilities.GetCapabilityLabels(existingLabels, edgeCapabilities.DeschedulerPallet)
661
662 if len(edgeCapabilityLabels) != 0 {
663 if err := r.updateDeschedulerLabels(ctx, labelService, numRegisteredTerminals, cluster.Name, edgeCapabilityLabels[0].LabelEdgeID); err != nil {
664 return recerr.New(fmt.Errorf("unable to update descheduler label: %w", err), clusterApi.ReconcileFailedReason)
665 }
666 } else {
667 ctrl.LoggerFrom(ctx).Error(fmt.Errorf("no descheduler capability label found"), "descheduler pallet not part of edge capabilities")
668 }
669 return nil
670 }
671
672 func (r *ClusterReconciler) updateDeschedulerLabels(ctx context.Context, labelService services.LabelService, registeredTerminals int, clusterName, deschedulerLabelEdgeID string) error {
673 if registeredTerminals <= 1 {
674 return labelService.DeleteClusterLabels(ctx, &clusterName, &deschedulerLabelEdgeID)
675 }
676 label, err := labelService.GetClusterLabels(ctx, &clusterName, &deschedulerLabelEdgeID)
677 if err != nil {
678 return err
679 }
680 if label != nil {
681 return nil
682 }
683 return labelService.CreateClusterLabel(ctx, clusterName, deschedulerLabelEdgeID)
684 }
685
686 func (r *ClusterReconciler) setResourceManager() error {
687 if r.ResourceManager == nil {
688 sapMngr, err := sap.NewResourceManagerFromConfig(r.manager.GetConfig(),
689 client.Options{},
690 sap.Owner{Field: r.Name, Group: OwnerGroupLabel})
691 if err != nil {
692 return err
693 }
694 r.ResourceManager = sapMngr
695 }
696 return nil
697 }
698
699 func compareVersions(versionValue string) (bool, error) {
700 staticVerifiedVersion, err := version.NewVersion("0.18")
701 if err != nil {
702 return false, err
703 }
704 fleetVersion, err := version.NewVersion(versionValue)
705 if err != nil {
706 return false, err
707 }
708 return staticVerifiedVersion.GreaterThan(fleetVersion), nil
709 }
710
View as plain text