...

Source file src/edge-infra.dev/pkg/edge/registration/cluster_registration.go

Documentation: edge-infra.dev/pkg/edge/registration

     1  package registration
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"strings"
     8  	"time"
     9  
    10  	edgeClient "edge-infra.dev/pkg/edge/api/client"
    11  	"edge-infra.dev/pkg/edge/api/graph/model"
    12  	edgeErrors "edge-infra.dev/pkg/edge/apis/errors"
    13  	"edge-infra.dev/pkg/edge/bsl"
    14  	"edge-infra.dev/pkg/edge/constants"
    15  	"edge-infra.dev/pkg/edge/constants/api/cluster"
    16  	"edge-infra.dev/pkg/edge/constants/api/fleet"
    17  	"edge-infra.dev/pkg/edge/info"
    18  	"edge-infra.dev/pkg/f8n/warehouse/lift/pack"
    19  
    20  	goext "github.com/external-secrets/external-secrets/apis/externalsecrets/v1beta1"
    21  	kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1"
    22  	"github.com/fluxcd/pkg/ssa"
    23  	sourceApi "github.com/fluxcd/source-controller/api/v1"
    24  	corev1 "k8s.io/api/core/v1"
    25  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    26  	k8serrors "k8s.io/apimachinery/pkg/api/errors"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    29  	"k8s.io/apimachinery/pkg/fields"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  	"k8s.io/apimachinery/pkg/types"
    32  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    33  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
    34  	"sigs.k8s.io/controller-runtime/pkg/client"
    35  )
    36  
    37  type BffConfig struct {
    38  	APIEndpoint        string
    39  	BSLUser            string
    40  	BSLPassword        string
    41  	TotpToken          string
    42  	BSLOrganization    string
    43  	BearerToken        string
    44  	CreateBSLSite      bool
    45  	BSLSiteEuid        string
    46  	BSLSiteLatitude    float64
    47  	BSLSiteLongitude   float64
    48  	BSLSiteReferenceID string
    49  	EdgeVersion        string
    50  }
    51  
    52  type ClusterInfo struct {
    53  	Banner         string
    54  	BannerEdgeID   string
    55  	Region         string
    56  	Store          string
    57  	ClusterType    string
    58  	Fleet          string
    59  	ClusterEdgeID  string
    60  	ClusterCaHash  string
    61  	ForceBootstrap bool
    62  	MachineType    string
    63  	Autoscale      bool
    64  	MinNodes       int
    65  	MaxNodes       int
    66  	Location       string
    67  	FleetVersion   string
    68  }
    69  
    70  type Registration struct {
    71  	Bff         BffConfig
    72  	Cluster     ClusterInfo
    73  	Client      client.Client
    74  	Manager     *ssa.ResourceManager
    75  	Packer      *pack.Packer
    76  	LumperImage string
    77  }
    78  
    79  const LumperPath = "config/pallets/f8n/warehouse/lumperctl"
    80  
    81  func (r *Registration) RegisterCluster(ctx context.Context) (*model.RegistrationResponse, error) {
    82  	fmt.Println("Fetching cluster registration data...")
    83  	resp, err := r.getClusterRegistrationData(ctx)
    84  	if err != nil && !strings.Contains(err.Error(), edgeErrors.ErrClusterAlreadyExists) {
    85  		return nil, err
    86  	}
    87  	fmt.Println("Cluster registered with Edge!")
    88  	fmt.Println("Cluster edge ID: ", resp.ClusterEdgeID)
    89  	return resp, err
    90  }
    91  
    92  func (r *Registration) BootstrapCluster(ctx context.Context) error {
    93  	mgr := ssa.NewResourceManager(
    94  		r.Client,
    95  		// be sure to consistently communicate this controllers ownership of objects
    96  		// this should match the result of createOpts()
    97  		polling.NewStatusPoller(r.Client, r.Client.RESTMapper(), polling.Options{}), ssa.Owner{Field: "bootstrap"},
    98  	)
    99  	r.Manager = mgr
   100  	fmt.Println("Fetching cluster bootstrap data...")
   101  	bootstrapData, err := r.getClusterBootstrapData(ctx)
   102  	if err != nil {
   103  		return err
   104  	}
   105  	fmt.Println("Installing Edge Manifests...")
   106  	if err = r.installManifests(ctx, bootstrapData); err != nil {
   107  		return err
   108  	}
   109  	fmt.Println("Cluster bootstrapping with edge completed!")
   110  
   111  	return nil
   112  }
   113  
   114  func (r *Registration) getBFFClient() (*edgeClient.EdgeClient, error) {
   115  	opts := []edgeClient.Option{
   116  		edgeClient.WithBaseURL(r.Bff.APIEndpoint),
   117  	}
   118  	if r.Bff.TotpToken != "" {
   119  		opts = append(opts, edgeClient.WithTotp(r.Bff.TotpToken))
   120  	} else if r.Bff.BearerToken != "" {
   121  		opts = append(opts, edgeClient.WithBearerToken(r.Bff.BearerToken))
   122  	} else {
   123  		opts = append(opts, edgeClient.WithCredentials(r.Bff.BSLUser, r.Bff.BSLPassword, r.Bff.BSLOrganization))
   124  	}
   125  	if r.Bff.EdgeVersion != "" {
   126  		opts = append(opts, edgeClient.WithVersion(r.Bff.EdgeVersion))
   127  	}
   128  	return edgeClient.New(opts...)
   129  }
   130  
   131  // Retrieves data from the registration bff api for cluster registration
   132  func (r *Registration) getClusterRegistrationData(ctx context.Context) (*model.RegistrationResponse, error) {
   133  	bffClient, err := r.getBFFClient()
   134  	if err != nil {
   135  		return nil, err
   136  	}
   137  
   138  	payload := model.RegistrationPayload{
   139  		Name:        r.Cluster.Store,
   140  		ClusterType: r.Cluster.ClusterType,
   141  		Fleet:       r.Cluster.Fleet,
   142  		StoreInfo: &model.StoreInfo{
   143  			StoreID:     &r.Cluster.Store, //using store name as store id, until store id is properly defined
   144  			SiteID:      &r.Bff.BSLSiteEuid,
   145  			CreateSite:  &r.Bff.CreateBSLSite,
   146  			Latitude:    &r.Bff.BSLSiteLatitude,
   147  			Longitude:   &r.Bff.BSLSiteLongitude,
   148  			ReferenceID: &r.Bff.BSLSiteReferenceID,
   149  		},
   150  		BannerName:   r.Cluster.Banner,
   151  		FleetVersion: &r.Cluster.FleetVersion,
   152  	}
   153  	if payload.Fleet == "" {
   154  		payload.Fleet = fleet.Store
   155  	}
   156  	if r.Bff.BSLOrganization != "" {
   157  		payload.StoreInfo.BslOrganization = &r.Bff.BSLOrganization
   158  	}
   159  	if r.Cluster.BannerEdgeID != "" {
   160  		payload.BannerEdgeID = &r.Cluster.BannerEdgeID
   161  	}
   162  	if r.Cluster.MaxNodes != 0 && r.Cluster.MachineType != "" {
   163  		payload.ClusterInfo = &model.ClusterInfo{
   164  			MachineType: r.Cluster.MachineType,
   165  			Autoscale:   r.Cluster.Autoscale,
   166  			MinNodes:    &r.Cluster.MinNodes,
   167  			MaxNodes:    &r.Cluster.MaxNodes,
   168  			NumNodes:    r.Cluster.MinNodes,
   169  			Location:    r.Cluster.Location,
   170  		}
   171  	}
   172  	latestVersion := "latest"
   173  	if payload.FleetVersion == nil {
   174  		payload.FleetVersion = &latestVersion
   175  	}
   176  
   177  	variables := map[string]interface{}{
   178  		"payload": payload,
   179  	}
   180  	return r.rFCluster(ctx, bffClient, variables)
   181  }
   182  
   183  // Retrieves data from the bff api for cluster registration
   184  func (r *Registration) getClusterBootstrapData(ctx context.Context) (*model.BootstrapResponse, error) {
   185  	bffClient, err := r.getBFFClient()
   186  	if err != nil {
   187  		return nil, err
   188  	}
   189  
   190  	payload := model.BootstrapPayload{
   191  		ClusterEdgeID: r.Cluster.ClusterEdgeID,
   192  		Force:         &r.Cluster.ForceBootstrap,
   193  	}
   194  	if r.Cluster.ClusterCaHash != "" {
   195  		payload.ClusterCaHash = &r.Cluster.ClusterCaHash
   196  	}
   197  
   198  	variables := map[string]interface{}{
   199  		"payload": payload,
   200  	}
   201  	return r.bootstrapCluster(ctx, bffClient, variables)
   202  }
   203  
   204  // Some basic checks to make sure the cluster will be in a good state before
   205  // allowing the user to add it to edge and bootstrap it. Checks for bsl-info/
   206  // edge-info config map, buckets, kustomizations, external secrets, secrets
   207  // in fluxSystem, gcp-creds secrets
   208  func (r *Registration) CheckIfClusterInCleanState(ctx context.Context) error {
   209  	resources := []string{}
   210  	configMap := &corev1.ConfigMap{}
   211  	err := r.Client.Get(ctx, types.NamespacedName{Name: bsl.BSLInfoConfigMapName, Namespace: metav1.NamespacePublic}, configMap)
   212  	if err == nil {
   213  		resources = append(resources, fmt.Sprintf("config map %s", configMap.Name))
   214  	} else if !k8serrors.IsNotFound(err) {
   215  		return err
   216  	}
   217  
   218  	err = r.Client.Get(ctx, types.NamespacedName{Name: info.EdgeConfigMapName, Namespace: info.EdgeConfigMapNS}, configMap)
   219  	if err == nil {
   220  		resources = append(resources, fmt.Sprintf("config map %s", configMap.Name))
   221  	} else if !k8serrors.IsNotFound(err) {
   222  		return err
   223  	}
   224  
   225  	buckets := &sourceApi.BucketList{}
   226  	err = r.Client.List(ctx, buckets)
   227  	if err != nil && !isKindNotFound(err) {
   228  		return err
   229  	}
   230  	if len(buckets.Items) != 0 {
   231  		resources = append(resources, fmt.Sprintf("storage bucket %s", buckets.Items[0].Name))
   232  	}
   233  
   234  	kustomizations := &kustomizeApi.KustomizationList{}
   235  	err = r.Client.List(ctx, kustomizations)
   236  	if err != nil && !isKindNotFound(err) {
   237  		return err
   238  	}
   239  	if len(kustomizations.Items) != 0 {
   240  		resources = append(resources, fmt.Sprintf("kustomization %s", kustomizations.Items[0].Name))
   241  	}
   242  
   243  	externalSecrets := &goext.ExternalSecretList{}
   244  	err = r.Client.List(ctx, externalSecrets)
   245  	if err != nil && !isKindNotFound(err) {
   246  		return err
   247  	}
   248  	if len(externalSecrets.Items) != 0 {
   249  		resources = append(resources, fmt.Sprintf("external secret %s", externalSecrets.Items[0].Name))
   250  	}
   251  
   252  	secrets := &corev1.SecretList{}
   253  	err = r.Client.List(ctx, secrets, &client.ListOptions{Namespace: constants.FluxSystem})
   254  	if err != nil {
   255  		return err
   256  	}
   257  	if len(secrets.Items) != 0 {
   258  		resources = append(resources, fmt.Sprintf("secret %s", secrets.Items[0].Name))
   259  	}
   260  
   261  	err = r.Client.List(ctx, secrets, &client.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"metadata.name": "gcp-creds"})})
   262  	if err != nil {
   263  		return err
   264  	}
   265  	if len(secrets.Items) != 0 {
   266  		resources = append(resources, fmt.Sprintf("secret %s", secrets.Items[0].Name))
   267  	}
   268  	if len(resources) > 0 {
   269  		return fmt.Errorf("found %s in cluster", strings.Join(resources, ", "))
   270  	}
   271  	return nil
   272  }
   273  
   274  func (r *Registration) rFCluster(ctx context.Context, bffClient *edgeClient.EdgeClient, variables map[string]interface{}) (*model.RegistrationResponse, error) {
   275  	var mutation struct {
   276  		RegisterCluster model.RegistrationResponse `graphql:"registerCluster(payload: $payload)"`
   277  	}
   278  	err := bffClient.Mutate(ctx, &mutation, variables)
   279  	if err != nil && !strings.Contains(err.Error(), edgeErrors.ErrClusterAlreadyExists) {
   280  		return nil, fmt.Errorf("fail to call bff graphql registerCluster: %w", err)
   281  	}
   282  	fmt.Println("cluster registration data retrieved")
   283  	return &mutation.RegisterCluster, err
   284  }
   285  
   286  func (r *Registration) bootstrapCluster(ctx context.Context, bffClient *edgeClient.EdgeClient, variables map[string]interface{}) (*model.BootstrapResponse, error) {
   287  	var mutation struct {
   288  		BootstrapCluster model.BootstrapResponse `graphql:"bootstrapCluster(payload: $payload)"`
   289  	}
   290  	err := bffClient.Mutate(ctx, &mutation, variables)
   291  	if err != nil {
   292  		var mutation struct {
   293  			BootstrapCluster model.BootstrapResponseBwc `graphql:"bootstrapCluster(payload: $payload)"`
   294  		}
   295  		err := bffClient.Mutate(ctx, &mutation, variables)
   296  		if err != nil {
   297  			return nil, fmt.Errorf("fail to call bff graphql bootstrapFluxCluster: %w", err)
   298  		}
   299  		fmt.Println("cluster bootstrap data retrieved!")
   300  		return &model.BootstrapResponse{
   301  			ProjectID:        mutation.BootstrapCluster.ProjectID,
   302  			Secrets:          mutation.BootstrapCluster.Secrets,
   303  			FluxConfig:       mutation.BootstrapCluster.FluxConfig,
   304  			InstallManifests: nil,
   305  		}, nil
   306  	}
   307  	fmt.Println("Cluster bootstrap data retrieved!")
   308  	return &mutation.BootstrapCluster, nil
   309  }
   310  
   311  // installManifest installs the manifests returned by bootstrapping
   312  func (r *Registration) installManifests(ctx context.Context, bs *model.BootstrapResponse) error {
   313  	allManifests := []*unstructured.Unstructured{}
   314  	for _, manifest := range bs.InstallManifests {
   315  		unsManifest := &unstructured.Unstructured{}
   316  		if err := json.Unmarshal([]byte(manifest), unsManifest); err != nil {
   317  			return err
   318  		}
   319  		allManifests = append(allManifests, unsManifest)
   320  	}
   321  	_, err := r.Manager.ApplyAllStaged(ctx, allManifests, ssa.ApplyOptions{WaitTimeout: 60 * time.Second})
   322  	if err != nil {
   323  		return err
   324  	}
   325  	return nil
   326  }
   327  
   328  func GetRegistrationRuntime() (*runtime.Scheme, error) {
   329  	scheme := runtime.NewScheme()
   330  	if err := clientgoscheme.AddToScheme(scheme); err != nil {
   331  		return nil, err
   332  	}
   333  	if err := sourceApi.AddToScheme(scheme); err != nil {
   334  		return nil, err
   335  	}
   336  	if err := kustomizeApi.AddToScheme(scheme); err != nil {
   337  		return nil, err
   338  	}
   339  	if err := apiextensionsv1.AddToScheme(scheme); err != nil {
   340  		return nil, err
   341  	}
   342  	if err := goext.AddToScheme(scheme); err != nil {
   343  		return nil, err
   344  	}
   345  	return scheme, nil
   346  }
   347  
   348  // Cluster is the value type of GetClusters query
   349  type Cluster struct {
   350  	// UUID of the cluster in sql database
   351  	ClusterEdgeID string `graphql:"clusterEdgeId" json:"clusterEdgeId"`
   352  	// name of the cluster
   353  	Name string `graphql:"name" json:"name"`
   354  	// ID of the project that the cluster belongs to
   355  	ProjectID string `graphql:"projectId" json:"projectId"`
   356  	// edge id of the banner the cluster exists in
   357  	BannerEdgeID string `graphql:"bannerEdgeId" json:"bannerEdgeId"`
   358  	// a boolean value, if the cluster is registered
   359  	Registered *bool `graphql:"registered" json:"registered"`
   360  	// a boolean value, if the cluster is active
   361  	Active *bool `graphql:"active" json:"active"`
   362  	// a list of labels of the cluster
   363  	Labels []*struct {
   364  		// edge id of the label
   365  		LabelEdgeID string `graphql:"labelEdgeId" json:"labelEdgeId"`
   366  		// key is the string label displayed to user
   367  		LabelKey string `graphql:"labelKey" json:"labelKey"`
   368  		// type is a grouping of labels
   369  		LabelType string `graphql:"labelType" json:"labelType"`
   370  	} `graphql:"labels" json:"labels"`
   371  	// bsl site id of the cluster
   372  	BslSiteID *string `graphql:"bslSiteID" json:"bslSiteID"`
   373  	// cluster network services
   374  	ClusterNetworkServices []model.ClusterNetworkServiceInfo `graphql:"clusterNetworkServices" json:"clusterNetworkServices"`
   375  	// the config for a cluster
   376  	ClusterConfig *model.ClusterConfig `graphql:"clusterConfig" json:"clusterConfig"`
   377  }
   378  
   379  func (c Cluster) IsGke() bool {
   380  	for _, l := range c.Labels {
   381  		if l.LabelKey == cluster.GKE {
   382  			return true
   383  		}
   384  	}
   385  	return false
   386  }
   387  
   388  func isKindNotFound(err error) bool {
   389  	return strings.Contains(err.Error(), "no matches for kind")
   390  }
   391  

View as plain text