package bannerctl import ( "context" "encoding/base64" "encoding/json" "fmt" "reflect" "slices" "strings" "time" "maps" ctrl "sigs.k8s.io/controller-runtime" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "edge-infra.dev/pkg/edge/api/totp" apitypes "edge-infra.dev/pkg/edge/api/types" bannerAPI "edge-infra.dev/pkg/edge/apis/banner/v1alpha1" edgeErrors "edge-infra.dev/pkg/edge/apis/errors" syncedobjectApi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1" "edge-infra.dev/pkg/edge/bsl" bannerconstants "edge-infra.dev/pkg/edge/constants/api/banner" clusterConstants "edge-infra.dev/pkg/edge/constants/api/cluster" "edge-infra.dev/pkg/edge/constants/api/fleet" "edge-infra.dev/pkg/edge/registration" ) const ( CouchDBEnablement = "couchdb" BSLDefaultTimeout = 60 * time.Second ) var ( couchK8sMachineType = "n2d-highcpu-8" couchK8sMinNodes = 1 couchK8sMaxNodes = 3 cushion = "cushion" ) func (r *BannerReconciler) bslFullSync(ctx context.Context, b *bannerAPI.Banner) error { log := ctrl.LoggerFrom(ctx).WithName("bsl-data-sync") status, err := r.EdgeDB.GetBslSyncStatus(ctx, b.Name) if err != nil { log.Error(err, "fail to get bsl status from database") return err } if r.bslSyncCompleted(b) && status.Completed { log.Info("bsl data already successfully synced") return nil } euID := b.Spec.BSL.EnterpriseUnit.ID if euID == "" { err := fmt.Errorf("eu id not provided for banner") log.Error(err, "empty value for Banner.Spec.BSL.EnterpriseUnit.ID") return err } req, err := r.BSLClient.WithRootOrgAccessKey(ctx) if err != nil { return fmt.Errorf("fail to build bsl req from access and secret keys: %w", err) } sc := bsl.SyncConfig{EnterpriseUnitIDs: nil, EntityTypes: b.Spec.BSL.EntityTypes} err = req.SetOrg(euID).SyncProvisioningBSLData(sc) if err != nil { return fmt.Errorf("fail to sync provisioning bsl data to data sync: %w", err) } err = req.SetOrg(euID).SyncCatalogBSLData(sc) if err != nil { return fmt.Errorf("fail to sync catalog bsl data to data sync: %w", err) } updated := r.bslSyncStatusCompleted(b, status) if updated { err = r.EdgeDB.UpdateBslSyncStatus(ctx, b.Name, status) if err != nil { log.Error(err, "fail to update banner bsl status in database") return err } } return nil } func (r *BannerReconciler) createCouchServerCluster(ctx context.Context, b *bannerAPI.Banner) error { // Check if couch cluster resource already exists if b.Status.CouchClusterEdgeID != "" { return nil } // If not, call registration api for couch cluster totpToken, err := totp.GenerateTotp(r.TotpSecret) if err != nil { return err } GCPLocation := fmt.Sprintf("%s-%s", r.GCPRegion, r.GCPZone) reg, err := registration.NewBuilder(). Banner(b.Spec.DisplayName). Store(bannerconstants.CouchServerClusterName). ClusterType(clusterConstants.GKE). BSLOrganization(b.Spec.BSL.Organization.Name). APIEndpoint(r.EdgeAPI). //todo TotpToken(totpToken.Code). CreateBSLSite(false). Fleet(fleet.CouchDB). MachineType(couchK8sMachineType). MinNodes(couchK8sMinNodes). MaxNodes(couchK8sMaxNodes). Autoscale(true). Location(GCPLocation). BannerEdgeID(b.Name). FleetVersion(apitypes.DefaultVersionTag). Build() if err != nil { return err } reg.Client = r.Client resp, err := reg.RegisterCluster(ctx) if err != nil && !strings.Contains(err.Error(), edgeErrors.ErrClusterAlreadyExists) { return err } if resp != nil { b.Status.CouchClusterEdgeID = resp.ClusterEdgeID } return nil } func (r *BannerReconciler) createCouchCushionConfigMapSO(b *bannerAPI.Banner) *syncedobjectApi.SyncedObject { // create cushion cm with foreman and tenant ids cushionCM := &corev1.ConfigMap{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "ConfigMap", }, ObjectMeta: metav1.ObjectMeta{ Name: cushion, Namespace: cushion, }, Data: map[string]string{ "FOREMAN_ID": r.ForemanProjectID, "BANNER_ID": b.Spec.BSL.EnterpriseUnit.ID, "TOPIC_ID": "data-sync-c2e", "SUBSCRIPTION_ID": b.Spec.BSL.EnterpriseUnit.ID, }, } return r.genCouchClusterSO("cushion-cm", cushionCM, b) } func (r *BannerReconciler) genCouchClusterSO(name string, obj interface{}, b *bannerAPI.Banner) *syncedobjectApi.SyncedObject { couchClusterID := b.Status.CouchClusterEdgeID data, _ := json.Marshal(obj) data64 := base64.StdEncoding.EncodeToString(data) return &syncedobjectApi.SyncedObject{ TypeMeta: metav1.TypeMeta{ APIVersion: syncedobjectApi.GroupVersion.String(), Kind: "SyncedObject", }, ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: b.Name, OwnerReferences: r.ownerRef(b), }, Spec: syncedobjectApi.SyncedObjectSpec{ Banner: b.Spec.GCP.ProjectID, Cluster: couchClusterID, Object: data64, }, } } // bslSyncCompleted does the spec matches the status func (r *BannerReconciler) bslSyncCompleted(b *bannerAPI.Banner) bool { if b.Status.BslSyncStatus == nil { return false } if b.Status.BslSyncStatus.Completed { // if no EntityTypes is provided, all entity types has been synced if len(b.Spec.BSL.EntityTypes) == 0 { return true } } outer: for _, etSpec := range b.Spec.BSL.EntityTypes { // i.e.. item-price for _, etStatus := range b.Status.BslSyncStatus.EntityTypes { if etStatus == etSpec { continue outer } } return false } return true } // bslSyncStatusCompleted return true if status needs updated func (r *BannerReconciler) bslSyncStatusCompleted(b *bannerAPI.Banner, dbStatus *bannerAPI.BslSyncStatus) bool { oldStatus := *dbStatus if b.Status.BslSyncStatus == nil { b.Status.BslSyncStatus = &bannerAPI.BslSyncStatus{} } b.Status.BslSyncStatus.Completed = true dbStatus.Completed = true if len(b.Spec.BSL.EntityTypes) == 0 { b.Status.BslSyncStatus.EntityTypes = nil return dbStatus.Completed != oldStatus.Completed } entityTypes := merge(dbStatus.EntityTypes, b.Spec.BSL.EntityTypes) if !slices.Equal(b.Status.BslSyncStatus.EntityTypes, entityTypes) { b.Status.BslSyncStatus.EntityTypes = entityTypes } if !slices.Equal(dbStatus.EntityTypes, entityTypes) { dbStatus.EntityTypes = entityTypes } return !reflect.DeepEqual(oldStatus, dbStatus) } func merge(a, b []string) []string { m := map[string]struct{}{} for _, s := range a { m[s] = struct{}{} } for _, s := range b { m[s] = struct{}{} } return slices.Collect(maps.Keys(m)) }