...

Source file src/edge-infra.dev/pkg/edge/api/graph/resolver/orchestration.go

Documentation: edge-infra.dev/pkg/edge/api/graph/resolver

     1  package resolver
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"strconv"
     8  	"strings"
     9  
    10  	"github.com/google/uuid"
    11  	"github.com/rs/zerolog/log"
    12  	corev1 "k8s.io/api/core/v1"
    13  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    14  
    15  	"edge-infra.dev/pkg/edge/api/graph/model"
    16  	"edge-infra.dev/pkg/edge/api/middleware"
    17  	"edge-infra.dev/pkg/edge/api/services"
    18  	"edge-infra.dev/pkg/edge/api/types"
    19  	"edge-infra.dev/pkg/edge/api/utils"
    20  	apiErr "edge-infra.dev/pkg/edge/apis/errors"
    21  	"edge-infra.dev/pkg/edge/bsl"
    22  	chariotClientApi "edge-infra.dev/pkg/edge/chariot/client"
    23  	edgeconstants "edge-infra.dev/pkg/edge/constants"
    24  	"edge-infra.dev/pkg/edge/constants/api/cluster"
    25  	"edge-infra.dev/pkg/edge/constants/api/fleet"
    26  	"edge-infra.dev/pkg/lib/runtime/version"
    27  	"edge-infra.dev/pkg/sds/clustersecrets"
    28  )
    29  
    30  // bslHolder is a struct to hold bsl information needed for registration. Would like to see this refactored
    31  type bslHolder struct {
    32  	siteInfo    *bsl.BSLInfo
    33  	siteCreated bool
    34  }
    35  
    36  type newClusterSpec struct {
    37  	payload            *model.RegistrationPayload
    38  	bslInfo            *bslHolder
    39  	organization       string
    40  	banner             *model.Banner
    41  	newCluster         *model.Cluster
    42  	clusterEdgeID      string
    43  	infraCluster       *model.Cluster
    44  	storeClusterBase64 string
    45  }
    46  
    47  func (r *Resolver) RegisterClusterSteps(ctx context.Context, payload model.RegistrationPayload) (*model.RegistrationResponse, error) {
    48  	//check store infra compatibility with cloud infra
    49  	//nolint:nestif
    50  	if payload.FleetVersion != nil {
    51  		if *payload.FleetVersion != "latest" {
    52  			if err := r.validateEdgeVersionRegistrationCompatibility(ctx, *payload.FleetVersion); err != nil {
    53  				return nil, err
    54  			}
    55  		}
    56  	}
    57  
    58  	if payload.StoreInfo != nil && payload.StoreInfo.BslOrganization != nil {
    59  		return r.registerCluster(ctx, payload, bsl.GetOrgShortName(*payload.StoreInfo.BslOrganization))
    60  	}
    61  	return r.registerCluster(ctx, payload, bsl.GetOrgShortName(middleware.ForContext(ctx).Organization))
    62  }
    63  
    64  func (r *Resolver) registerCluster(ctx context.Context, payload model.RegistrationPayload, organization string) (*model.RegistrationResponse, error) {
    65  	var (
    66  		//variables to store information about new cluster banner, infracluster, bsl, etc.
    67  		clusterSpecForNewCluster = &newClusterSpec{
    68  			payload:      &payload,
    69  			bslInfo:      &bslHolder{siteInfo: &bsl.BSLInfo{}},
    70  			organization: organization,
    71  		}
    72  		err error
    73  	)
    74  
    75  	// set default values for optional variables
    76  	if payload.FleetVersion == nil {
    77  		var version = types.DefaultVersionTag
    78  		payload.FleetVersion = &version
    79  	}
    80  
    81  	//get banner the cluster being registered in will exist in
    82  	log.Ctx(ctx).Debug().Msgf("registration api called %v", payload)
    83  	if payload.BannerEdgeID != nil {
    84  		clusterSpecForNewCluster.banner, err = r.BannerService.GetBannerByEdgeID(ctx, *payload.BannerEdgeID)
    85  	} else {
    86  		clusterSpecForNewCluster.banner, err = r.BannerService.GetBannerByNameAndTenant(ctx, payload.BannerName, organization)
    87  	}
    88  	if err != nil {
    89  		return nil, fmt.Errorf("error getting banner SQL Info: %w", err)
    90  	}
    91  
    92  	//get infra cluster the cluster infra will be created on eg. (cluster infra cluster for banner or banner infra for infra clusters)
    93  	if !fleet.IsClusterInfraCluster(fleet.Type(payload.Fleet)) {
    94  		clusterSpecForNewCluster.infraCluster, err = r.BannerService.GetClusterInfraInfo(ctx, clusterSpecForNewCluster.banner.BannerEdgeID)
    95  	} else {
    96  		clusterSpecForNewCluster.infraCluster, err = r.BannerService.GetBannerInfraInfo(ctx)
    97  	}
    98  	if err != nil {
    99  		return nil, fmt.Errorf("error getting infra cluster SQL Info: %w", err)
   100  	}
   101  
   102  	//check if this cluster has been registered
   103  	exist, err := r.RegistrationService.IsClusterSQLEntryExist(ctx, payload.Name, clusterSpecForNewCluster.banner.ProjectID)
   104  	if err != nil {
   105  		return nil, fmt.Errorf("error checking if cluster exists: %w", err)
   106  	}
   107  	if exist {
   108  		//cluster already exists, fetch its info & return it
   109  		clusterSpecForNewCluster.newCluster, err = r.StoreClusterService.GetClusterByNameAndProject(ctx, payload.Name, clusterSpecForNewCluster.banner.ProjectID)
   110  		if err != nil {
   111  			return nil, fmt.Errorf("error getting SQL Info for existing cluster: %w", err)
   112  		}
   113  		resp := &model.RegistrationResponse{
   114  			ClusterEdgeID: clusterSpecForNewCluster.newCluster.ClusterEdgeID,
   115  			SiteID:        &clusterSpecForNewCluster.bslInfo.siteInfo.ID,
   116  		}
   117  		if clusterSpecForNewCluster.newCluster.BslSiteID != nil {
   118  			resp.SiteID = clusterSpecForNewCluster.newCluster.BslSiteID
   119  		}
   120  		return resp, fmt.Errorf("%s. cluster name: %s", apiErr.ErrClusterAlreadyExists, payload.Name)
   121  	}
   122  
   123  	// create the bsl site
   124  	if err := r.createBSLResources(ctx, clusterSpecForNewCluster); err != nil {
   125  		return nil, err
   126  	}
   127  
   128  	if err := r.createCluster(ctx, clusterSpecForNewCluster); err != nil {
   129  		if rollbackErr := r.undoRegistrationClusterSteps(ctx, clusterSpecForNewCluster); rollbackErr != nil {
   130  			return nil, fmt.Errorf("%w; (rollback errors: %v)", err, rollbackErr)
   131  		}
   132  		return nil, err
   133  	}
   134  
   135  	//return success response
   136  	return &model.RegistrationResponse{
   137  		ClusterEdgeID: clusterSpecForNewCluster.clusterEdgeID,
   138  		SiteID:        &clusterSpecForNewCluster.bslInfo.siteInfo.ID,
   139  	}, nil
   140  }
   141  
   142  func (r *Resolver) createBSLResources(ctx context.Context, clusterSpec *newClusterSpec) error {
   143  	clusterSpec.clusterEdgeID = uuid.NewString()
   144  	if fleet.IsStoreCluster(fleet.Type(clusterSpec.payload.Fleet)) {
   145  		var err error
   146  		clusterSpec.bslInfo.siteInfo, clusterSpec.bslInfo.siteCreated, err = r.BSLSiteService.GetOrCreateSite(
   147  			ctx, clusterSpec.payload.StoreInfo, clusterSpec.banner, clusterSpec.payload.Name, clusterSpec.clusterEdgeID)
   148  		if err != nil {
   149  			return err
   150  		}
   151  	}
   152  	return nil
   153  }
   154  
   155  func (r *Resolver) createClusterSQLEntry(ctx context.Context, clusterSpec *newClusterSpec) error {
   156  	projectIDForNewCluster := clusterSpec.banner.ProjectID
   157  	location := edgeconstants.DefaultClusterLocation
   158  	if clusterSpec.payload.ClusterInfo != nil && clusterSpec.payload.ClusterInfo.Location != "" {
   159  		location = clusterSpec.payload.ClusterInfo.Location
   160  	}
   161  
   162  	// TODO(dk185217): api model transition for back compat. ui or other api clients should send static versions (e.g. "0.18.0", "0.17.4", ...) instead of "latest",
   163  	// and send auto-update policy as a separate field. Once they do, this can be removed and FleetVersion, AutoUpdateEnabled can be used directly
   164  	autoUpdate := false
   165  	if fleet.IsStoreCluster(fleet.Type(clusterSpec.payload.Fleet)) && *clusterSpec.payload.FleetVersion == types.DefaultVersionTag {
   166  		// "latest" implies auto-update
   167  		autoUpdate = true
   168  		// resolve "latest" to explicit version
   169  		latestVersion, err := r.ArtifactsService.GetLatestAvailableArtifactVersion(ctx, clusterSpec.payload.Fleet)
   170  		if err != nil {
   171  			return fmt.Errorf("error resolving latest version of artifact %s: %v", clusterSpec.payload.Fleet, err)
   172  		}
   173  		fleetVersion := latestVersion.Version
   174  		clusterSpec.payload.FleetVersion = &fleetVersion
   175  	}
   176  	if clusterSpec.payload.AutoUpdateEnabled == nil {
   177  		clusterSpec.payload.AutoUpdateEnabled = &autoUpdate
   178  	}
   179  	return r.RegistrationService.CreateClusterSQLEntry(ctx, clusterSpec.payload, projectIDForNewCluster, clusterSpec.clusterEdgeID, clusterSpec.bslInfo.siteInfo.ID, clusterSpec.banner.BannerEdgeID, false, location)
   180  }
   181  
   182  func (r *Resolver) createCluster(ctx context.Context, clusterSpec *newClusterSpec) error {
   183  	// Create namespace in foreman cluster via chariot
   184  	storeNsBase64, err := utils.ConvertStructToBase64(&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: clusterSpec.clusterEdgeID}})
   185  	if err != nil {
   186  		return err
   187  	}
   188  	err = r.sendChariotMessage(ctx, r.Config.Bff.TopLevelProjectID, "foreman0", chariotClientApi.Create, storeNsBase64)
   189  	if err != nil {
   190  		return err
   191  	}
   192  
   193  	//Creates cluster sql entry with active = false
   194  	err = r.createClusterSQLEntry(ctx, clusterSpec)
   195  	if err != nil {
   196  		return fmt.Errorf("error creating cluster entry in sql: %w", err)
   197  	}
   198  	//get the cluster from the db
   199  	clusterSpec.newCluster, err = r.StoreClusterService.GetCluster(ctx, clusterSpec.clusterEdgeID)
   200  	if err != nil {
   201  		return fmt.Errorf("error creating cluster entry in sql: %w", err)
   202  	}
   203  
   204  	clusterType := clusterSpec.payload.ClusterType
   205  
   206  	// register dsds cluster configuration
   207  	if clusterType == cluster.DSDS {
   208  		if err := r.registerDSDSConfiguration(ctx, clusterSpec); err != nil {
   209  			return err
   210  		}
   211  	} else if fleet.IsStoreCluster(fleet.Type(clusterSpec.payload.Fleet)) {
   212  		//allow basic-stores and generic stores to have cluster configs in the DB w/o chariot sending
   213  		clusterConfig, err := r.ClusterConfigService.UpdateClusterConfig(ctx, clusterSpec.clusterEdgeID, &model.UpdateClusterConfig{})
   214  		if err != nil {
   215  			return err
   216  		}
   217  		clusterSpec.newCluster.ClusterConfig = clusterConfig
   218  	}
   219  
   220  	//create cluster resource on infra cluster to provision infrastructure for cluster
   221  	return r.createClusterViaChariot(ctx, clusterSpec)
   222  }
   223  
   224  func (r *Resolver) registerDSDSConfiguration(ctx context.Context, clusterSpec *newClusterSpec) error {
   225  	// register default k8s subnets in cluster network services
   226  	if err := r.registerK8sDefaultSubnets(ctx, clusterSpec.newCluster.ClusterEdgeID); err != nil {
   227  		return err
   228  	}
   229  
   230  	// register cluster configuration defaults
   231  	clusterConfig, err := r.Mutation().UpdateClusterConfig(ctx, clusterSpec.clusterEdgeID, model.UpdateClusterConfig{})
   232  	if err != nil {
   233  		return err
   234  	}
   235  	clusterSpec.newCluster.ClusterConfig = clusterConfig
   236  
   237  	// register cluster secrets
   238  	for _, secret := range clustersecrets.List() {
   239  		if _, err := r.updateClusterSecret(ctx, clusterSpec.clusterEdgeID, secret.Type(), ""); err != nil {
   240  			return err
   241  		}
   242  	}
   243  	return nil
   244  }
   245  
   246  // createClusterViaChariot creates the cluster resource on the infra cluster to provision infrastructure for the new cluster via cluster controller
   247  func (r *Resolver) createClusterViaChariot(ctx context.Context, clusterSpec *newClusterSpec) error {
   248  	var err error
   249  	clusterSpec.storeClusterBase64, err = r.RegistrationService.CreateAClusterCR(
   250  		ctx,
   251  		clusterSpec.payload.ClusterType,
   252  		clusterSpec.newCluster.Name,
   253  		clusterSpec.newCluster.ProjectID,
   254  		clusterSpec.payload.Fleet,
   255  		clusterSpec.organization,
   256  		clusterSpec.clusterEdgeID,
   257  		clusterSpec.banner,
   258  		clusterSpec.payload.ClusterInfo,
   259  	)
   260  
   261  	if err != nil {
   262  		err := fmt.Errorf("error creating a cluster cr: %w", err)
   263  		log.Ctx(ctx).Err(err).Msg("creating cluster cr failed")
   264  		return err
   265  	}
   266  	err = r.sendChariotMessage(ctx, clusterSpec.infraCluster.ProjectID, clusterSpec.infraCluster.ClusterEdgeID, chariotClientApi.Create, clusterSpec.storeClusterBase64)
   267  	if err != nil {
   268  		return err
   269  	}
   270  	return nil
   271  }
   272  
   273  // undoCreateClusterViaChariot undoes the creation of the cluster on infra cluster if there is an error in registration
   274  func (r *Resolver) undoCreateClusterViaChariot(ctx context.Context, storeClusterBase64, path, projectID string) error {
   275  	return r.sendChariotMessage(ctx, projectID, path, chariotClientApi.Delete, storeClusterBase64)
   276  }
   277  
   278  // sendChariotMessage sends the message to chariot v2
   279  func (r *Resolver) sendChariotMessage(ctx context.Context, projectID, clusterPath string, operation chariotClientApi.Operation, object ...string) error {
   280  	chariotMessage := chariotClientApi.
   281  		NewChariotMessage().
   282  		SetBanner(projectID).
   283  		SetCluster(clusterPath).
   284  		SetOperation(operation).
   285  		SetOwner(services.ComponentOwner).
   286  		AddObject(object...)
   287  	if err := r.ChariotService.InvokeChariotPubsub(ctx, chariotMessage, nil); err != nil {
   288  		err := fmt.Errorf("error calling chariot v2: %w", err)
   289  		log.Ctx(ctx).Err(err).Msg("chariot invocation failed")
   290  		return err
   291  	}
   292  	return nil
   293  }
   294  
   295  // undoRegistrationClusterSteps undoes all step already performed for registration
   296  // func (r *Resolver) undoRegistrationClusterSteps(ctx context.Context, clusterModel *model.Cluster, clusterBanner *model.Banner, infraClusterModel *model.Cluster, bslInfo *bslHolder, configMaps []string, storeClusterBase64 string) error {
   297  func (r *Resolver) undoRegistrationClusterSteps(ctx context.Context, clusterSpec *newClusterSpec) error {
   298  	var undoErrors []string
   299  
   300  	if clusterSpec.bslInfo.siteCreated {
   301  		err := r.BSLSiteService.DeleteBSLSite(ctx, clusterSpec.bslInfo.siteInfo, clusterSpec.banner)
   302  		if err != nil {
   303  			undoErrors = append(undoErrors, err.Error())
   304  		}
   305  	}
   306  
   307  	if clusterSpec.newCluster == nil {
   308  		return formatUndoErrors(undoErrors)
   309  	}
   310  
   311  	err := r.BannerService.DeleteClusterSQLEntry(ctx, clusterSpec.clusterEdgeID)
   312  	if err != nil {
   313  		undoErrors = append(undoErrors, fmt.Sprintf("error deleting cluster sql entry: %s", err.Error()))
   314  	}
   315  
   316  	if clusterSpec.storeClusterBase64 != "" {
   317  		err := r.undoCreateClusterViaChariot(ctx, clusterSpec.storeClusterBase64, clusterSpec.infraCluster.ClusterEdgeID, clusterSpec.infraCluster.ProjectID)
   318  		if err != nil {
   319  			undoErrors = append(undoErrors, err.Error())
   320  		}
   321  	}
   322  
   323  	return formatUndoErrors(undoErrors)
   324  }
   325  
   326  // formatUndoErrors small helper method to format any errors in registration
   327  func formatUndoErrors(e []string) error {
   328  	if len(e) > 0 {
   329  		return errors.New(strings.Join(e, "; "))
   330  	}
   331  	return nil
   332  }
   333  
   334  // validateEdgeVersionRegistrationCompatibility checks for compatibility between cloud and store Edge Infra versions
   335  func (r *Resolver) validateEdgeVersionRegistrationCompatibility(ctx context.Context, infraVersion string) error {
   336  	cloudInfraVersion := version.New()
   337  	_, cloudInfraMinorVersion, _, err := cloudInfraVersion.SemVerMajorMinorPatch()
   338  	if err != nil {
   339  		return err
   340  	}
   341  	/*
   342  		cloud infra minor version is appended with the base patch version .0 as compatibility is listed based on the base minor version (i.e. 0.18.0)
   343  		for every patch version
   344  	*/
   345  	cloudInfraCompatibility, err := r.CompatibilityService.GetArtifactVersionCompatibility(ctx, model.ArtifactVersion{Name: fleet.Store, Version: cloudInfraVersion.SemVer}, nil)
   346  	if err != nil {
   347  		return err
   348  	}
   349  
   350  	storeInfraVersionParts := strings.Split(infraVersion, ".")
   351  	storeInfraMinorVersion, err := strconv.Atoi(storeInfraVersionParts[1])
   352  	if err != nil {
   353  		return err
   354  	}
   355  	// checks that the store infra version is within 2 version of cloud infra
   356  	if cloudInfraMinorVersion-storeInfraMinorVersion > cloudInfraCompatibility.NthIndex {
   357  		return fmt.Errorf("the store infra version %s is not compatible with the current cloud infra version %s", infraVersion, cloudInfraVersion.SemVer)
   358  	}
   359  	return nil
   360  }
   361  

View as plain text