...

Source file src/edge-infra.dev/pkg/edge/controllers/bannerctl/couch.go

Documentation: edge-infra.dev/pkg/edge/controllers/bannerctl

     1  package bannerctl
     2  
     3  import (
     4  	"context"
     5  	"encoding/base64"
     6  	"encoding/json"
     7  	"fmt"
     8  	"reflect"
     9  	"slices"
    10  	"strings"
    11  	"time"
    12  
    13  	"maps"
    14  
    15  	ctrl "sigs.k8s.io/controller-runtime"
    16  
    17  	corev1 "k8s.io/api/core/v1"
    18  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    19  
    20  	"edge-infra.dev/pkg/edge/api/totp"
    21  	apitypes "edge-infra.dev/pkg/edge/api/types"
    22  	bannerAPI "edge-infra.dev/pkg/edge/apis/banner/v1alpha1"
    23  	edgeErrors "edge-infra.dev/pkg/edge/apis/errors"
    24  	syncedobjectApi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1"
    25  	"edge-infra.dev/pkg/edge/bsl"
    26  	bannerconstants "edge-infra.dev/pkg/edge/constants/api/banner"
    27  	clusterConstants "edge-infra.dev/pkg/edge/constants/api/cluster"
    28  	"edge-infra.dev/pkg/edge/constants/api/fleet"
    29  	"edge-infra.dev/pkg/edge/registration"
    30  )
    31  
    32  const (
    33  	CouchDBEnablement = "couchdb"
    34  	BSLDefaultTimeout = 60 * time.Second
    35  )
    36  
    37  var (
    38  	couchK8sMachineType = "n2d-highcpu-8"
    39  	couchK8sMinNodes    = 1
    40  	couchK8sMaxNodes    = 3
    41  	cushion             = "cushion"
    42  )
    43  
    44  func (r *BannerReconciler) bslFullSync(ctx context.Context, b *bannerAPI.Banner) error {
    45  	log := ctrl.LoggerFrom(ctx).WithName("bsl-data-sync")
    46  
    47  	status, err := r.EdgeDB.GetBslSyncStatus(ctx, b.Name)
    48  	if err != nil {
    49  		log.Error(err, "fail to get bsl status from database")
    50  		return err
    51  	}
    52  
    53  	if r.bslSyncCompleted(b) && status.Completed {
    54  		log.Info("bsl data already successfully synced")
    55  		return nil
    56  	}
    57  
    58  	euID := b.Spec.BSL.EnterpriseUnit.ID
    59  	if euID == "" {
    60  		err := fmt.Errorf("eu id not provided for banner")
    61  		log.Error(err, "empty value for Banner.Spec.BSL.EnterpriseUnit.ID")
    62  		return err
    63  	}
    64  
    65  	req, err := r.BSLClient.WithRootOrgAccessKey(ctx)
    66  	if err != nil {
    67  		return fmt.Errorf("fail to build bsl req from access and secret keys: %w", err)
    68  	}
    69  
    70  	sc := bsl.SyncConfig{EnterpriseUnitIDs: nil, EntityTypes: b.Spec.BSL.EntityTypes}
    71  	err = req.SetOrg(euID).SyncProvisioningBSLData(sc)
    72  	if err != nil {
    73  		return fmt.Errorf("fail to sync provisioning bsl data to data sync: %w", err)
    74  	}
    75  
    76  	err = req.SetOrg(euID).SyncCatalogBSLData(sc)
    77  	if err != nil {
    78  		return fmt.Errorf("fail to sync catalog bsl data to data sync: %w", err)
    79  	}
    80  
    81  	updated := r.bslSyncStatusCompleted(b, status)
    82  	if updated {
    83  		err = r.EdgeDB.UpdateBslSyncStatus(ctx, b.Name, status)
    84  		if err != nil {
    85  			log.Error(err, "fail to update banner bsl status in database")
    86  			return err
    87  		}
    88  	}
    89  	return nil
    90  }
    91  
    92  func (r *BannerReconciler) createCouchServerCluster(ctx context.Context, b *bannerAPI.Banner) error {
    93  	// Check if couch cluster resource already exists
    94  	if b.Status.CouchClusterEdgeID != "" {
    95  		return nil
    96  	}
    97  	// If not, call registration api for couch cluster
    98  	totpToken, err := totp.GenerateTotp(r.TotpSecret)
    99  	if err != nil {
   100  		return err
   101  	}
   102  
   103  	GCPLocation := fmt.Sprintf("%s-%s", r.GCPRegion, r.GCPZone)
   104  
   105  	reg, err := registration.NewBuilder().
   106  		Banner(b.Spec.DisplayName).
   107  		Store(bannerconstants.CouchServerClusterName).
   108  		ClusterType(clusterConstants.GKE).
   109  		BSLOrganization(b.Spec.BSL.Organization.Name).
   110  		APIEndpoint(r.EdgeAPI). //todo
   111  		TotpToken(totpToken.Code).
   112  		CreateBSLSite(false).
   113  		Fleet(fleet.CouchDB).
   114  		MachineType(couchK8sMachineType).
   115  		MinNodes(couchK8sMinNodes).
   116  		MaxNodes(couchK8sMaxNodes).
   117  		Autoscale(true).
   118  		Location(GCPLocation).
   119  		BannerEdgeID(b.Name).
   120  		FleetVersion(apitypes.DefaultVersionTag).
   121  		Build()
   122  	if err != nil {
   123  		return err
   124  	}
   125  	reg.Client = r.Client
   126  	resp, err := reg.RegisterCluster(ctx)
   127  	if err != nil && !strings.Contains(err.Error(), edgeErrors.ErrClusterAlreadyExists) {
   128  		return err
   129  	}
   130  	if resp != nil {
   131  		b.Status.CouchClusterEdgeID = resp.ClusterEdgeID
   132  	}
   133  	return nil
   134  }
   135  
   136  func (r *BannerReconciler) createCouchCushionConfigMapSO(b *bannerAPI.Banner) *syncedobjectApi.SyncedObject {
   137  	// create cushion cm with foreman and tenant ids
   138  	cushionCM := &corev1.ConfigMap{
   139  		TypeMeta: metav1.TypeMeta{
   140  			APIVersion: "v1",
   141  			Kind:       "ConfigMap",
   142  		},
   143  		ObjectMeta: metav1.ObjectMeta{
   144  			Name:      cushion,
   145  			Namespace: cushion,
   146  		},
   147  		Data: map[string]string{
   148  			"FOREMAN_ID":      r.ForemanProjectID,
   149  			"BANNER_ID":       b.Spec.BSL.EnterpriseUnit.ID,
   150  			"TOPIC_ID":        "data-sync-c2e",
   151  			"SUBSCRIPTION_ID": b.Spec.BSL.EnterpriseUnit.ID,
   152  		},
   153  	}
   154  	return r.genCouchClusterSO("cushion-cm", cushionCM, b)
   155  }
   156  
   157  func (r *BannerReconciler) genCouchClusterSO(name string, obj interface{}, b *bannerAPI.Banner) *syncedobjectApi.SyncedObject {
   158  	couchClusterID := b.Status.CouchClusterEdgeID
   159  	data, _ := json.Marshal(obj)
   160  	data64 := base64.StdEncoding.EncodeToString(data)
   161  	return &syncedobjectApi.SyncedObject{
   162  		TypeMeta: metav1.TypeMeta{
   163  			APIVersion: syncedobjectApi.GroupVersion.String(),
   164  			Kind:       "SyncedObject",
   165  		},
   166  		ObjectMeta: metav1.ObjectMeta{
   167  			Name:            name,
   168  			Namespace:       b.Name,
   169  			OwnerReferences: r.ownerRef(b),
   170  		},
   171  		Spec: syncedobjectApi.SyncedObjectSpec{
   172  			Banner:  b.Spec.GCP.ProjectID,
   173  			Cluster: couchClusterID,
   174  			Object:  data64,
   175  		},
   176  	}
   177  }
   178  
   179  // bslSyncCompleted does the spec matches the status
   180  func (r *BannerReconciler) bslSyncCompleted(b *bannerAPI.Banner) bool {
   181  	if b.Status.BslSyncStatus == nil {
   182  		return false
   183  	}
   184  	if b.Status.BslSyncStatus.Completed {
   185  		// if no EntityTypes is provided, all entity types has been synced
   186  		if len(b.Spec.BSL.EntityTypes) == 0 {
   187  			return true
   188  		}
   189  	}
   190  outer:
   191  	for _, etSpec := range b.Spec.BSL.EntityTypes { // i.e.. item-price
   192  		for _, etStatus := range b.Status.BslSyncStatus.EntityTypes {
   193  			if etStatus == etSpec {
   194  				continue outer
   195  			}
   196  		}
   197  		return false
   198  	}
   199  	return true
   200  }
   201  
   202  // bslSyncStatusCompleted return true if status needs updated
   203  func (r *BannerReconciler) bslSyncStatusCompleted(b *bannerAPI.Banner, dbStatus *bannerAPI.BslSyncStatus) bool {
   204  	oldStatus := *dbStatus
   205  	if b.Status.BslSyncStatus == nil {
   206  		b.Status.BslSyncStatus = &bannerAPI.BslSyncStatus{}
   207  	}
   208  
   209  	b.Status.BslSyncStatus.Completed = true
   210  	dbStatus.Completed = true
   211  
   212  	if len(b.Spec.BSL.EntityTypes) == 0 {
   213  		b.Status.BslSyncStatus.EntityTypes = nil
   214  		return dbStatus.Completed != oldStatus.Completed
   215  	}
   216  
   217  	entityTypes := merge(dbStatus.EntityTypes, b.Spec.BSL.EntityTypes)
   218  
   219  	if !slices.Equal(b.Status.BslSyncStatus.EntityTypes, entityTypes) {
   220  		b.Status.BslSyncStatus.EntityTypes = entityTypes
   221  	}
   222  
   223  	if !slices.Equal(dbStatus.EntityTypes, entityTypes) {
   224  		dbStatus.EntityTypes = entityTypes
   225  	}
   226  
   227  	return !reflect.DeepEqual(oldStatus, dbStatus)
   228  }
   229  
   230  func merge(a, b []string) []string {
   231  	m := map[string]struct{}{}
   232  	for _, s := range a {
   233  		m[s] = struct{}{}
   234  	}
   235  	for _, s := range b {
   236  		m[s] = struct{}{}
   237  	}
   238  	return slices.Collect(maps.Keys(m))
   239  }
   240  

View as plain text