package services import ( "context" "database/sql" "encoding/json" "errors" "fmt" "strconv" "github.com/google/uuid" "edge-infra.dev/pkg/edge/api/graph/model" edgenode "edge-infra.dev/pkg/edge/api/services/edgenode/common" "edge-infra.dev/pkg/edge/api/services/interfaces" sqlquery "edge-infra.dev/pkg/edge/api/sql" "edge-infra.dev/pkg/edge/api/utils" "edge-infra.dev/pkg/edge/apis/cluster/v1alpha1" edgeCapabilities "edge-infra.dev/pkg/edge/capabilities" edgeconstants "edge-infra.dev/pkg/edge/constants" clustertype "edge-infra.dev/pkg/edge/constants/api/cluster" "edge-infra.dev/pkg/edge/constants/api/fleet" ) //go:generate mockgen -destination=../mocks/mock_registration_service.go -package=mocks edge-infra.dev/pkg/edge/api/services RegistrationService type RegistrationService interface { IsClusterSQLEntryExist(ctx context.Context, clusterName, projectID string) (bool, error) CreateClusterSQLEntry(ctx context.Context, payload *model.RegistrationPayload, projectID, clusterGUID, siteID, bannerEdgeID string, active bool, location string) error UpdateClusterSQLEntry(ctx context.Context, active bool, clusterEdgeID string) error CreateAClusterCR(ctx context.Context, clusterType, name, projectID, fleet, organization, clusterEdgeID string, banner *model.Banner, gkeClusterSpec *model.ClusterInfo) (string, error) UploadClusterCaHash(ctx context.Context, clusterEdgeID, clusterCaHash string) error ClusterCaHash(ctx context.Context, clusterEdgeID string) (string, error) // Reset refreshes all activation codes for terminals in this cluster, and marks the cluster as in-active // Note: must be forced (i.e., force = true) as this is a destructive operation Reset(ctx context.Context, clusterEdgeID string, force bool) error } type registrationService struct { GkeService GkeClient TopLevelProjectID string SecretService SecretService GCPService GCPService BSLSiteService BSLSiteService IAMService IAMService ChariotService ChariotService SQLDB *sql.DB TerminalService TerminalService ActivationCodeService edgenode.ActivationCode ClusterLabelService interfaces.ClusterLabelService LabelService LabelService } var ( ErrMustForceClusterReset = errors.New("cluster cannot be reset without force being set to true") ErrCanOnlyResetStore = errors.New("only store clusters can be reset") ErrNoOptionalPalletLabels = errors.New("no optional pallet labels found in banner") ) // CreateClusterSQLEntry performs a transaction that inserts and updates tables related to a new Cluster registration. Parameter payload // should not contain any nil fields, i.e. default values for fields that are optional in the API should be set before calling func (s *registrationService) CreateClusterSQLEntry(ctx context.Context, payload *model.RegistrationPayload, projectID, clusterGUID, siteID, bannerEdgeID string, active bool, location string) error { clusterName := payload.Name clusterType := payload.ClusterType fleetType := payload.Fleet fleetVersion := *payload.FleetVersion autoUpdate := *payload.AutoUpdateEnabled transaction, err := s.SQLDB.BeginTx(ctx, nil) if err != nil { return err } defer func() { _ = transaction.Rollback() }() // insert cluster record from values if _, err := transaction.ExecContext(ctx, sqlquery.ClusterInsertQuery, clusterName, projectID, true, active, clusterGUID, sqlquery.NewNullString(bannerEdgeID), siteID, location, fleetVersion); err != nil { return err } // get labelid and add label for cluster type var labelID string if row := transaction.QueryRowContext(ctx, sqlquery.GetLabelTypeByKey, clusterType, clustertype.LabelType); row.Scan(&labelID) != nil { clusterTypeError := clustertype.Type(clusterType).IsValid() return fmt.Errorf("%s: %w", err, clusterTypeError) } if _, err := transaction.ExecContext(ctx, sqlquery.ClusterLabelInsertQuery, clusterGUID, labelID); err != nil { return err } if row := transaction.QueryRowContext(ctx, sqlquery.GetLabelTypeByKey, fleetType, fleet.LabelType); row.Scan(&labelID) != nil { fleetTypeError := fleet.IsValid(fleetType) return fmt.Errorf("%s: %w", err, fleetTypeError) } if _, err := transaction.ExecContext(ctx, sqlquery.ClusterLabelInsertQuery, clusterGUID, labelID); err != nil { return err } // initial artifact configuration for stores if fleet.IsStoreCluster(fleet.Type(fleetType)) { // set pallet(s) to sync to cluster if _, err := transaction.ExecContext(ctx, sqlquery.CreateClusterArtifactVersion, clusterGUID, fleetType, fleetVersion); err != nil { return err } if err := s.registerClusterDefaultOptionalPallets(ctx, transaction, clusterGUID, bannerEdgeID, fleetVersion); err != nil { return err } // set cluster auto-update policy if _, err := transaction.ExecContext(ctx, sqlquery.UpdateClusterConfig, clusterGUID, AutoUpdateEnabledKey, strconv.FormatBool(autoUpdate), uuid.NewString()); err != nil { return err } } return transaction.Commit() } func (s *registrationService) registerClusterDefaultOptionalPallets(ctx context.Context, transaction *sql.Tx, clusterEdgeID, bannerEdgeID, fleetVersion string) error { // set default optional pallet labels to be added to a cluster existingLabels, err := s.LabelService.GetLabels(ctx, &bannerEdgeID) if err != nil { return err } // get existing labels of type edge-capabilities and filter by supported cluster fleet versions edgeCapabilityLabels := edgeCapabilities.GetCapabilityLabels(existingLabels, edgeCapabilities.DefaultStoreEdgeCapabilities...) // get supported edge capability labels for cluster fleet version supportedEdgeCapabilityLabels, err := edgeCapabilities.GetCapabilityLabelsForSupportedVersion(edgeCapabilityLabels, fleetVersion, nil) if err != nil { return err } for _, label := range supportedEdgeCapabilityLabels { if _, err := transaction.ExecContext(ctx, sqlquery.ClusterLabelInsertQuery, clusterEdgeID, label.LabelEdgeID); err != nil { return err } if _, err := transaction.ExecContext(ctx, sqlquery.CreateClusterArtifactVersion, clusterEdgeID, label.Key, fleetVersion); err != nil { return err } } return nil } func (s *registrationService) IsClusterSQLEntryExist(ctx context.Context, clusterName, projectID string) (bool, error) { var exist bool row := s.SQLDB.QueryRowContext(ctx, sqlquery.IsClusterExistQuery, clusterName, projectID) if err := row.Scan(&exist); err != nil { return false, err } return exist, nil } func (s *registrationService) UpdateClusterSQLEntry(ctx context.Context, active bool, clusterEdgeID string) error { _, err := s.SQLDB.ExecContext(ctx, sqlquery.ClusterUpdateQuery, active, clusterEdgeID) if err != nil { return err } return nil } func (s *registrationService) CreateAClusterCR(_ context.Context, clusterType, name, projectID, fleet, organization, clusterEdgeID string, banner *model.Banner, gkeClusterSpec *model.ClusterInfo) (string, error) { var storeCluster *v1alpha1.Cluster if gkeClusterSpec != nil { storeCluster = v1alpha1.NewCluster(name, projectID, organization, fleet, clusterType, gkeClusterSpec.Location, utils.ConvertToString(gkeClusterSpec.NodeVersion), gkeClusterSpec.MachineType, clusterEdgeID, gkeClusterSpec.NumNodes, banner) storeCluster.AddAutoscaling(gkeClusterSpec.Autoscale, utils.ConvertToInt(gkeClusterSpec.MinNodes), utils.ConvertToInt(gkeClusterSpec.MaxNodes)) } else { storeCluster = v1alpha1.NewCluster(name, projectID, organization, fleet, clusterType, edgeconstants.DefaultClusterLocation, "", "", clusterEdgeID, 0, banner) } storeClusterByte, err := json.Marshal(storeCluster) if err != nil { return "", err } storeClusterBase64 := utils.ToBase64(storeClusterByte) return storeClusterBase64, nil } func (s *registrationService) UploadClusterCaHash(ctx context.Context, clusterEdgeID, clusterCaHash string) error { _, err := s.SQLDB.ExecContext(ctx, sqlquery.UploadClusterCaHash, clusterCaHash, clusterEdgeID) return err } func (s *registrationService) ClusterCaHash(ctx context.Context, clusterEdgeID string) (string, error) { var clusterCaHash *string row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterCaHash, clusterEdgeID) if err := row.Scan(&clusterCaHash); err != nil { return "", err } if clusterCaHash == nil { // cluster ca hash will not exist at time of bootstrapping first node return "", nil } return *clusterCaHash, nil } func (s *registrationService) Reset(ctx context.Context, clusterEdgeID string, force bool) error { if !force { return ErrMustForceClusterReset } clusterFleet, err := s.ClusterLabelService.FetchFleetType(ctx, clusterEdgeID) if err != nil { return err } if !fleet.IsStoreCluster(*clusterFleet) { return ErrCanOnlyResetStore } // mark cluster as inactive if err := s.UpdateClusterSQLEntry(ctx, false, clusterEdgeID); err != nil { return err } terminalList, err := s.TerminalService.GetTerminalsByClusterID(ctx, clusterEdgeID) if err != nil { return err } // refresh activation codes for all terminals in cluster for _, terminal := range terminalList { if _, err := s.ActivationCodeService.Refresh(ctx, terminal.TerminalID); err != nil { return err } } return nil } func NewRegistrationService(gkeService GkeClient, topLevelProjectId string, secretService SecretService, gcpService GCPService, bslSiteService BSLSiteService, iamService IAMService, sqlDB *sql.DB, chariotService ChariotService, terminalService TerminalService, activationCodeService edgenode.ActivationCode, clusterLabelSvc interfaces.ClusterLabelService, labelService LabelService) *registrationService { //nolint stupid return ®istrationService{ GkeService: gkeService, TopLevelProjectID: topLevelProjectId, SecretService: secretService, GCPService: gcpService, BSLSiteService: bslSiteService, IAMService: iamService, ChariotService: chariotService, SQLDB: sqlDB, TerminalService: terminalService, ActivationCodeService: activationCodeService, ClusterLabelService: clusterLabelSvc, LabelService: labelService, } }