...

Source file src/edge-infra.dev/pkg/edge/api/services/registration_service.go

Documentation: edge-infra.dev/pkg/edge/api/services

     1  package services
     2  
     3  import (
     4  	"context"
     5  	"database/sql"
     6  	"encoding/json"
     7  	"errors"
     8  	"fmt"
     9  	"strconv"
    10  
    11  	"github.com/google/uuid"
    12  
    13  	"edge-infra.dev/pkg/edge/api/graph/model"
    14  	edgenode "edge-infra.dev/pkg/edge/api/services/edgenode/common"
    15  	"edge-infra.dev/pkg/edge/api/services/interfaces"
    16  	sqlquery "edge-infra.dev/pkg/edge/api/sql"
    17  	"edge-infra.dev/pkg/edge/api/utils"
    18  	"edge-infra.dev/pkg/edge/apis/cluster/v1alpha1"
    19  	edgeCapabilities "edge-infra.dev/pkg/edge/capabilities"
    20  	edgeconstants "edge-infra.dev/pkg/edge/constants"
    21  	clustertype "edge-infra.dev/pkg/edge/constants/api/cluster"
    22  	"edge-infra.dev/pkg/edge/constants/api/fleet"
    23  )
    24  
    25  //go:generate mockgen -destination=../mocks/mock_registration_service.go -package=mocks edge-infra.dev/pkg/edge/api/services RegistrationService
    26  type RegistrationService interface {
    27  	IsClusterSQLEntryExist(ctx context.Context, clusterName, projectID string) (bool, error)
    28  	CreateClusterSQLEntry(ctx context.Context, payload *model.RegistrationPayload, projectID, clusterGUID, siteID, bannerEdgeID string, active bool, location string) error
    29  	UpdateClusterSQLEntry(ctx context.Context, active bool, clusterEdgeID string) error
    30  	CreateAClusterCR(ctx context.Context, clusterType, name, projectID, fleet, organization, clusterEdgeID string, banner *model.Banner, gkeClusterSpec *model.ClusterInfo) (string, error)
    31  	UploadClusterCaHash(ctx context.Context, clusterEdgeID, clusterCaHash string) error
    32  	ClusterCaHash(ctx context.Context, clusterEdgeID string) (string, error)
    33  	// Reset refreshes all activation codes for terminals in this cluster, and marks the cluster as in-active
    34  	// Note: must be forced (i.e., force = true) as this is a destructive operation
    35  	Reset(ctx context.Context, clusterEdgeID string, force bool) error
    36  }
    37  
    38  type registrationService struct {
    39  	GkeService            GkeClient
    40  	TopLevelProjectID     string
    41  	SecretService         SecretService
    42  	GCPService            GCPService
    43  	BSLSiteService        BSLSiteService
    44  	IAMService            IAMService
    45  	ChariotService        ChariotService
    46  	SQLDB                 *sql.DB
    47  	TerminalService       TerminalService
    48  	ActivationCodeService edgenode.ActivationCode
    49  	ClusterLabelService   interfaces.ClusterLabelService
    50  	LabelService          LabelService
    51  }
    52  
    53  var (
    54  	ErrMustForceClusterReset  = errors.New("cluster cannot be reset without force being set to true")
    55  	ErrCanOnlyResetStore      = errors.New("only store clusters can be reset")
    56  	ErrNoOptionalPalletLabels = errors.New("no optional pallet labels found in banner")
    57  )
    58  
    59  // CreateClusterSQLEntry performs a transaction that inserts and updates tables related to a new Cluster registration. Parameter payload
    60  // should not contain any nil fields, i.e. default values for fields that are optional in the API should be set before calling
    61  func (s *registrationService) CreateClusterSQLEntry(ctx context.Context, payload *model.RegistrationPayload, projectID, clusterGUID, siteID, bannerEdgeID string, active bool, location string) error {
    62  	clusterName := payload.Name
    63  	clusterType := payload.ClusterType
    64  	fleetType := payload.Fleet
    65  	fleetVersion := *payload.FleetVersion
    66  	autoUpdate := *payload.AutoUpdateEnabled
    67  
    68  	transaction, err := s.SQLDB.BeginTx(ctx, nil)
    69  	if err != nil {
    70  		return err
    71  	}
    72  	defer func() { _ = transaction.Rollback() }()
    73  
    74  	// insert cluster record from values
    75  	if _, err := transaction.ExecContext(ctx, sqlquery.ClusterInsertQuery, clusterName, projectID, true, active, clusterGUID, sqlquery.NewNullString(bannerEdgeID), siteID, location, fleetVersion); err != nil {
    76  		return err
    77  	}
    78  
    79  	// get labelid and add label for cluster type
    80  	var labelID string
    81  	if row := transaction.QueryRowContext(ctx, sqlquery.GetLabelTypeByKey, clusterType, clustertype.LabelType); row.Scan(&labelID) != nil {
    82  		clusterTypeError := clustertype.Type(clusterType).IsValid()
    83  		return fmt.Errorf("%s: %w", err, clusterTypeError)
    84  	}
    85  	if _, err := transaction.ExecContext(ctx, sqlquery.ClusterLabelInsertQuery, clusterGUID, labelID); err != nil {
    86  		return err
    87  	}
    88  	if row := transaction.QueryRowContext(ctx, sqlquery.GetLabelTypeByKey, fleetType, fleet.LabelType); row.Scan(&labelID) != nil {
    89  		fleetTypeError := fleet.IsValid(fleetType)
    90  		return fmt.Errorf("%s: %w", err, fleetTypeError)
    91  	}
    92  	if _, err := transaction.ExecContext(ctx, sqlquery.ClusterLabelInsertQuery, clusterGUID, labelID); err != nil {
    93  		return err
    94  	}
    95  
    96  	// initial artifact configuration for stores
    97  	if fleet.IsStoreCluster(fleet.Type(fleetType)) {
    98  		// set pallet(s) to sync to cluster
    99  		if _, err := transaction.ExecContext(ctx, sqlquery.CreateClusterArtifactVersion, clusterGUID, fleetType, fleetVersion); err != nil {
   100  			return err
   101  		}
   102  
   103  		if err := s.registerClusterDefaultOptionalPallets(ctx, transaction, clusterGUID, bannerEdgeID, fleetVersion); err != nil {
   104  			return err
   105  		}
   106  		// set cluster auto-update policy
   107  		if _, err := transaction.ExecContext(ctx, sqlquery.UpdateClusterConfig, clusterGUID, AutoUpdateEnabledKey, strconv.FormatBool(autoUpdate), uuid.NewString()); err != nil {
   108  			return err
   109  		}
   110  	}
   111  
   112  	return transaction.Commit()
   113  }
   114  
   115  func (s *registrationService) registerClusterDefaultOptionalPallets(ctx context.Context, transaction *sql.Tx, clusterEdgeID, bannerEdgeID, fleetVersion string) error {
   116  	// set default optional pallet labels to be added to a cluster
   117  	existingLabels, err := s.LabelService.GetLabels(ctx, &bannerEdgeID)
   118  	if err != nil {
   119  		return err
   120  	}
   121  
   122  	// get existing labels of type edge-capabilities and filter by supported cluster fleet versions
   123  	edgeCapabilityLabels := edgeCapabilities.GetCapabilityLabels(existingLabels, edgeCapabilities.DefaultStoreEdgeCapabilities...)
   124  
   125  	// get supported edge capability labels for cluster fleet version
   126  	supportedEdgeCapabilityLabels, err := edgeCapabilities.GetCapabilityLabelsForSupportedVersion(edgeCapabilityLabels, fleetVersion, nil)
   127  	if err != nil {
   128  		return err
   129  	}
   130  
   131  	for _, label := range supportedEdgeCapabilityLabels {
   132  		if _, err := transaction.ExecContext(ctx, sqlquery.ClusterLabelInsertQuery, clusterEdgeID, label.LabelEdgeID); err != nil {
   133  			return err
   134  		}
   135  		if _, err := transaction.ExecContext(ctx, sqlquery.CreateClusterArtifactVersion, clusterEdgeID, label.Key, fleetVersion); err != nil {
   136  			return err
   137  		}
   138  	}
   139  	return nil
   140  }
   141  
   142  func (s *registrationService) IsClusterSQLEntryExist(ctx context.Context, clusterName, projectID string) (bool, error) {
   143  	var exist bool
   144  	row := s.SQLDB.QueryRowContext(ctx, sqlquery.IsClusterExistQuery, clusterName, projectID)
   145  	if err := row.Scan(&exist); err != nil {
   146  		return false, err
   147  	}
   148  	return exist, nil
   149  }
   150  
   151  func (s *registrationService) UpdateClusterSQLEntry(ctx context.Context, active bool, clusterEdgeID string) error {
   152  	_, err := s.SQLDB.ExecContext(ctx, sqlquery.ClusterUpdateQuery, active, clusterEdgeID)
   153  	if err != nil {
   154  		return err
   155  	}
   156  	return nil
   157  }
   158  
   159  func (s *registrationService) CreateAClusterCR(_ context.Context, clusterType, name, projectID, fleet, organization, clusterEdgeID string, banner *model.Banner, gkeClusterSpec *model.ClusterInfo) (string, error) {
   160  	var storeCluster *v1alpha1.Cluster
   161  	if gkeClusterSpec != nil {
   162  		storeCluster = v1alpha1.NewCluster(name, projectID, organization, fleet, clusterType, gkeClusterSpec.Location, utils.ConvertToString(gkeClusterSpec.NodeVersion), gkeClusterSpec.MachineType, clusterEdgeID, gkeClusterSpec.NumNodes, banner)
   163  		storeCluster.AddAutoscaling(gkeClusterSpec.Autoscale, utils.ConvertToInt(gkeClusterSpec.MinNodes), utils.ConvertToInt(gkeClusterSpec.MaxNodes))
   164  	} else {
   165  		storeCluster = v1alpha1.NewCluster(name, projectID, organization, fleet, clusterType, edgeconstants.DefaultClusterLocation, "", "", clusterEdgeID, 0, banner)
   166  	}
   167  	storeClusterByte, err := json.Marshal(storeCluster)
   168  	if err != nil {
   169  		return "", err
   170  	}
   171  	storeClusterBase64 := utils.ToBase64(storeClusterByte)
   172  	return storeClusterBase64, nil
   173  }
   174  
   175  func (s *registrationService) UploadClusterCaHash(ctx context.Context, clusterEdgeID, clusterCaHash string) error {
   176  	_, err := s.SQLDB.ExecContext(ctx, sqlquery.UploadClusterCaHash, clusterCaHash, clusterEdgeID)
   177  	return err
   178  }
   179  
   180  func (s *registrationService) ClusterCaHash(ctx context.Context, clusterEdgeID string) (string, error) {
   181  	var clusterCaHash *string
   182  	row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterCaHash, clusterEdgeID)
   183  	if err := row.Scan(&clusterCaHash); err != nil {
   184  		return "", err
   185  	}
   186  	if clusterCaHash == nil { // cluster ca hash will not exist at time of bootstrapping first node
   187  		return "", nil
   188  	}
   189  	return *clusterCaHash, nil
   190  }
   191  
   192  func (s *registrationService) Reset(ctx context.Context, clusterEdgeID string, force bool) error {
   193  	if !force {
   194  		return ErrMustForceClusterReset
   195  	}
   196  	clusterFleet, err := s.ClusterLabelService.FetchFleetType(ctx, clusterEdgeID)
   197  	if err != nil {
   198  		return err
   199  	}
   200  	if !fleet.IsStoreCluster(*clusterFleet) {
   201  		return ErrCanOnlyResetStore
   202  	}
   203  	// mark cluster as inactive
   204  	if err := s.UpdateClusterSQLEntry(ctx, false, clusterEdgeID); err != nil {
   205  		return err
   206  	}
   207  	terminalList, err := s.TerminalService.GetTerminalsByClusterID(ctx, clusterEdgeID)
   208  	if err != nil {
   209  		return err
   210  	}
   211  	// refresh activation codes for all terminals in cluster
   212  	for _, terminal := range terminalList {
   213  		if _, err := s.ActivationCodeService.Refresh(ctx, terminal.TerminalID); err != nil {
   214  			return err
   215  		}
   216  	}
   217  	return nil
   218  }
   219  
   220  func NewRegistrationService(gkeService GkeClient, topLevelProjectId string, secretService SecretService, gcpService GCPService, bslSiteService BSLSiteService, iamService IAMService, sqlDB *sql.DB, chariotService ChariotService, terminalService TerminalService, activationCodeService edgenode.ActivationCode, clusterLabelSvc interfaces.ClusterLabelService, labelService LabelService) *registrationService { //nolint stupid
   221  	return &registrationService{
   222  		GkeService:            gkeService,
   223  		TopLevelProjectID:     topLevelProjectId,
   224  		SecretService:         secretService,
   225  		GCPService:            gcpService,
   226  		BSLSiteService:        bslSiteService,
   227  		IAMService:            iamService,
   228  		ChariotService:        chariotService,
   229  		SQLDB:                 sqlDB,
   230  		TerminalService:       terminalService,
   231  		ActivationCodeService: activationCodeService,
   232  		ClusterLabelService:   clusterLabelSvc,
   233  		LabelService:          labelService,
   234  	}
   235  }
   236  

View as plain text