...

Source file src/edge-infra.dev/pkg/edge/api/services/store_service.go

Documentation: edge-infra.dev/pkg/edge/api/services

     1  package services
     2  
     3  import (
     4  	"context"
     5  	"database/sql"
     6  	"encoding/json"
     7  	"errors"
     8  	"fmt"
     9  	"regexp"
    10  	"slices"
    11  	"strconv"
    12  	"strings"
    13  
    14  	kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1"
    15  	sourceApi "github.com/fluxcd/source-controller/api/v1"
    16  	"github.com/lib/pq"
    17  	corev1 "k8s.io/api/core/v1"
    18  
    19  	sqlerr "edge-infra.dev/pkg/edge/api/apierror/sql"
    20  	"edge-infra.dev/pkg/edge/api/clients"
    21  	"edge-infra.dev/pkg/edge/api/graph/mapper"
    22  	"edge-infra.dev/pkg/edge/api/graph/model"
    23  	sqlquery "edge-infra.dev/pkg/edge/api/sql"
    24  	"edge-infra.dev/pkg/edge/api/status"
    25  	"edge-infra.dev/pkg/edge/api/types"
    26  	"edge-infra.dev/pkg/edge/constants"
    27  	ctypes "edge-infra.dev/pkg/edge/constants/api/cluster"
    28  	"edge-infra.dev/pkg/edge/constants/api/fleet"
    29  	"edge-infra.dev/pkg/edge/ctlfish/monitor"
    30  	whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha1"
    31  	"edge-infra.dev/pkg/lib/runtime/version"
    32  )
    33  
    34  var (
    35  	ErrClusterMissing = errors.New("cluster must be provided")
    36  	ErrKindMissing    = errors.New("kind must be provided")
    37  	components        = map[string]bool{
    38  		whv1.ShipmentGVK.Kind:          true,
    39  		kustomizeApi.KustomizationKind: true,
    40  		sourceApi.BucketKind:           true,
    41  	}
    42  )
    43  
    44  //go:generate mockgen -destination=../mocks/mock_store_cluster_service.go -package=mocks edge-infra.dev/pkg/edge/api/services StoreClusterService
    45  type StoreClusterService interface {
    46  	GetClusters(ctx context.Context, projectID string, labels []string) ([]*model.Cluster, error)
    47  	GetClusterStatus(ctx context.Context, cluster *model.Cluster) (*model.ClusterStatusResponse, error)
    48  	GetCluster(ctx context.Context, clusterEdgeID string) (*model.Cluster, error)
    49  	DeleteStoreEntry(ctx context.Context, clusterEdgeID string) error
    50  	GetClusterByNameAndProject(ctx context.Context, clusterName, projectID string) (*model.Cluster, error)
    51  	GetClusterByClusterEdgeID(ctx context.Context, clusterEdgeID string) (types.Cluster, error)
    52  	SoftDeleteStoreEntry(ctx context.Context, clusterEdgeID string) error
    53  	GetClusterConfigmap(ctx context.Context, cluster *model.Cluster, configmapName, namespace string) (*corev1.ConfigMap, error)
    54  	GetLabelsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Label, error)
    55  	GetClusterNetworkServices(ctx context.Context, clusterEdgeID string) ([]*model.ClusterNetworkServiceInfo, error)
    56  	GetClusterNetworkServiceByNetworkID(ctx context.Context, clusterEdgeID, networkServiceID string) (*model.ClusterNetworkServiceInfo, error)
    57  	GetClusterK8sNetworkServices(ctx context.Context, clusterEdgeID string) (map[string]string, error)
    58  	CreateClusterNetworkServices(ctx context.Context, clusterEdgeID string, networkServicesInfo []*model.CreateNetworkServiceInfo) ([]*model.ClusterNetworkServiceInfo, error)
    59  	UpdateClusterNetworkServices(ctx context.Context, clusterEdgeID string, networkServicesInfo []*model.UpdateNetworkServiceInfo, existingServiceTypesByID map[string]string) ([]*model.ClusterNetworkServiceInfo, error)
    60  	DeleteClusterNetworkService(ctx context.Context, clusterEdgeID string, networkServiceID string) (bool, error)
    61  	UpdateStoreName(ctx context.Context, clusterEdgeID string, name string) error
    62  	UpdateStoreSiteID(ctx context.Context, clusterEdgeID string, siteID string) error
    63  	GetEventsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Event, error)
    64  	GetActiveEdgeVersion(ctx context.Context, clusterEdgeID string) (string, error)
    65  	GetInfraStatus(ctx context.Context, clusterEdgeID string) (*model.InfraStatus, error)
    66  	GetCombinedClusterStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.ClusterStatus, error)
    67  	GetReplicationStatus(ctx context.Context, clusterEdgeID string) ([]*model.ReplicationStatus, error)
    68  	GetClusterComponentsStatus(ctx context.Context, cluster *model.CombinedStatus) ([]*model.ComponentStatus, error)
    69  	GetComponentStatus(ctx context.Context, clusterEdgeID, kind string, active bool) (*model.ClusterStatus, error)
    70  	GetSupportStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.SupportStatus, error)
    71  }
    72  
    73  type storeClusterService struct {
    74  	GkeService           GkeClient
    75  	BQClient             clients.BQClient
    76  	SQLDB                *sql.DB
    77  	ChariotService       ChariotService
    78  	TerminalService      TerminalService
    79  	CompatibilityService CompatibilityService
    80  }
    81  
    82  type activeVersionResp struct {
    83  	version string
    84  }
    85  
    86  func (s *storeClusterService) GetClusterByClusterEdgeID(ctx context.Context, clusterEdgeID string) (types.Cluster, error) {
    87  	cluster := types.Cluster{}
    88  	row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterByEdgeID, clusterEdgeID)
    89  	if err := row.Scan(&cluster.ClusterEdgeID, &cluster.ClusterName, &cluster.ProjectID, &cluster.Registered, &cluster.Active, &cluster.BannerEdgeID, &cluster.BSLSiteID, &cluster.FleetVersion); err != nil {
    90  		return cluster, sqlerr.Wrap(err)
    91  	}
    92  	return cluster, nil
    93  }
    94  
    95  func (s *storeClusterService) GetClusters(ctx context.Context, bannerID string, labels []string) ([]*model.Cluster, error) {
    96  	var uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion string
    97  	var registered, active bool
    98  	var rows *sql.Rows
    99  	var err error
   100  	if len(labels) > 0 {
   101  		rows, err = s.SQLDB.QueryContext(ctx, sqlquery.GetClustersWithLabelQuery, bannerID, pq.Array(labels))
   102  		if err != nil {
   103  			return nil, sqlerr.Wrap(err)
   104  		}
   105  	} else {
   106  		rows, err = s.SQLDB.QueryContext(ctx, sqlquery.GetClustersQuery, bannerID)
   107  		if err != nil {
   108  			return nil, sqlerr.Wrap(err)
   109  		}
   110  	}
   111  	defer rows.Close()
   112  	clusters := []*model.Cluster{}
   113  	for rows.Next() {
   114  		if err := rows.Scan(&uuid, &name, &projectID, &registered, &active, &bannerEdgeID, &bslSiteID, &fleetVersion); err != nil {
   115  			return nil, sqlerr.Wrap(err)
   116  		}
   117  		labelList, err := s.GetLabelsForCluster(ctx, uuid)
   118  		if err != nil {
   119  			return nil, sqlerr.Wrap(err)
   120  		}
   121  		clusters = append(clusters, getCluster(uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion, labelList, registered, active))
   122  	}
   123  	if err := rows.Err(); err != nil {
   124  		return nil, sqlerr.Wrap(err)
   125  	}
   126  	return clusters, nil
   127  }
   128  
   129  func (s *storeClusterService) GetLabelsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Label, error) {
   130  	var labels []*model.Label
   131  	rows, err := s.SQLDB.QueryContext(ctx, sqlquery.SelectEdgeLabelsForCluster, clusterEdgeID)
   132  	if err != nil {
   133  		return nil, err
   134  	}
   135  	for rows.Next() {
   136  		var label model.Label
   137  		//label_edge_id, label_key, color, visible, editable, banner_edge_id, label_unique, description, label_type
   138  		if err := rows.Scan(&label.LabelEdgeID, &label.Key, &label.Color, &label.Visible, &label.Editable, &label.BannerEdgeID, &label.Unique, &label.Description, &label.Type); err != nil {
   139  			return nil, sqlerr.Wrap(err)
   140  		}
   141  		labels = append(labels, &label)
   142  	}
   143  	if err := rows.Err(); err != nil {
   144  		return nil, sqlerr.Wrap(err)
   145  	}
   146  	return labels, nil
   147  }
   148  
   149  func (s *storeClusterService) GetCluster(ctx context.Context, clusterEdgeID string) (*model.Cluster, error) {
   150  	var uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion string
   151  	var registered, active bool
   152  	row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterByEdgeID, clusterEdgeID)
   153  	if err := row.Scan(&uuid, &name, &projectID, &registered, &active, &bannerEdgeID, &bslSiteID, &fleetVersion); err != nil {
   154  		return nil, sqlerr.Wrap(err)
   155  	}
   156  	labelList, err := s.GetLabelsForCluster(ctx, clusterEdgeID)
   157  	if err != nil {
   158  		return nil, sqlerr.Wrap(err)
   159  	}
   160  	return getCluster(uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion, labelList, registered, active), nil
   161  }
   162  
   163  func (s *storeClusterService) GetEventsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Event, error) {
   164  	var events []*model.Event
   165  	rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetClusterEvents, clusterEdgeID)
   166  	if err != nil {
   167  		return nil, err
   168  	}
   169  	for rows.Next() {
   170  		var e model.Event
   171  		e.InvolvedObject = &model.InvolvedObject{}
   172  		if err := rows.Scan(&e.EventEdgeID, &e.Name, &e.InvolvedObject.Kind, &e.InvolvedObject.Namespace, &e.InvolvedObject.Name, &e.Reason, &e.Message, &e.Status, &e.Source, &e.Annotations,
   173  			&e.TerminalID, &e.ClusterEdgeID, &e.CreatedAt); err != nil {
   174  			return nil, sqlerr.Wrap(err)
   175  		}
   176  		events = append(events, &e)
   177  	}
   178  	if err := rows.Err(); err != nil {
   179  		return nil, sqlerr.Wrap(err)
   180  	}
   181  	return events, nil
   182  }
   183  
   184  func (s *storeClusterService) GetInfraStatus(ctx context.Context, clusterEdgeID string) (*model.InfraStatus, error) {
   185  	var clusterID, clusterName, infraStatus, infraStatusDetails string
   186  	row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterInfraStatusByID, clusterEdgeID)
   187  	if err := row.Scan(&clusterID, &clusterName, &infraStatus, &infraStatusDetails); err != nil {
   188  		return nil, sqlerr.Wrap(err)
   189  	}
   190  
   191  	// Return status to either HEALTHY, UNHEALTHY or PROVISIONING
   192  	if infraStatus == "READY" {
   193  		infraStatus = "HEALTHY"
   194  	} else if infraStatus == "ERROR" {
   195  		infraStatus = "UNHEALTHY"
   196  	}
   197  
   198  	infraStatusObj := &model.InfraStatus{
   199  		Status:  infraStatus,
   200  		Message: infraStatusDetails,
   201  	}
   202  
   203  	return infraStatusObj, nil
   204  }
   205  
   206  func (s *storeClusterService) GetClusterByNameAndProject(ctx context.Context, clusterName, projectID string) (*model.Cluster, error) {
   207  	var clusterEdgeID, bannerEdgID, bslSiteID, fleetVersion string
   208  	var registered, active bool
   209  	row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterByNameAndProject, clusterName, projectID)
   210  	if err := row.Scan(&clusterEdgeID, &registered, &active, &bannerEdgID, &bslSiteID, &fleetVersion); err != nil {
   211  		return nil, sqlerr.Wrap(err)
   212  	}
   213  	return getCluster(clusterEdgeID, clusterName, projectID, bannerEdgID, bslSiteID, fleetVersion, nil, registered, active), nil
   214  }
   215  
   216  func getCluster(uuid, storeName, projectID, bannerEdgeID, bslSiteID, fleetVersion string, label []*model.Label, registered, active bool) *model.Cluster {
   217  	store := &model.Cluster{
   218  		ClusterEdgeID: uuid,
   219  		Name:          storeName,
   220  		ProjectID:     projectID,
   221  		Registered:    &registered,
   222  		Active:        &active,
   223  		Labels:        label,
   224  		BannerEdgeID:  bannerEdgeID,
   225  		BslSiteID:     &bslSiteID,
   226  		FleetVersion:  fleetVersion,
   227  	}
   228  	return store
   229  }
   230  
   231  func (s *storeClusterService) GetComponentStatus(ctx context.Context, clusterEdgeID, kind string, active bool) (*model.ClusterStatus, error) {
   232  	if clusterEdgeID == "" {
   233  		return nil, ErrClusterMissing
   234  	}
   235  	if kind == "" {
   236  		return nil, ErrKindMissing
   237  	}
   238  	clusterStatus := &model.ClusterStatus{}
   239  	switch active {
   240  	case false:
   241  		// Case 1 - Cluster Not Bootstrapped
   242  		// @returns status = REGISTERED
   243  		// @returns message = Registered but Edge is not installed
   244  		// if cluster is not bootstrapped (i.e Edge Manifests have not been applied)
   245  		// then there is no way status will make it up
   246  		clusterStatus.Status = status.NotAvailable
   247  		clusterStatus.Message = status.NotAvailableStatusMessage
   248  		return clusterStatus, nil
   249  	default:
   250  		isReady := ""
   251  		row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterStatus, kind, clusterEdgeID, false)
   252  		if err := row.Scan(&isReady); err != nil {
   253  			if !errors.Is(err, sql.ErrNoRows) {
   254  				return clusterStatus, err
   255  			}
   256  			clusterStatus.Status = status.NotAvailable
   257  			clusterStatus.Message = status.NotAvailableStatusMessage
   258  			return clusterStatus, nil
   259  		}
   260  
   261  		switch {
   262  		case isReady == status.IsReady:
   263  			// Case 2 - Cluster Bootstrapped, Status = Ready, Kind = Ready
   264  			// @returns status = Ready
   265  			// @returns message = Kind is ready
   266  			clusterStatus.Status = status.Ready
   267  			clusterStatus.Message = fmt.Sprintf(status.KindReadyMessage, kind)
   268  			return clusterStatus, nil
   269  		case isReady == status.NotReady:
   270  			var (
   271  				kindErr      = ""
   272  				errorMessage = make([]string, 0)
   273  				notReported  = make(map[string]bool)
   274  			)
   275  			if err := s.getStatusErrorMessage(ctx, clusterEdgeID, kind, &kindErr, notReported, &errorMessage); err != nil {
   276  				return clusterStatus, err
   277  			}
   278  			// Case 3 - Cluster Bootstrapped, kind = Not Reported
   279  			// @returns status = Installing
   280  			// @returns message = Waiting for Kind status
   281  			if kindNotReported := status.IsNotReported(kind, notReported); kindNotReported {
   282  				clusterStatus.Status = status.Installing
   283  				clusterStatus.Message = fmt.Sprintf("%s %s status", status.NotReportedFormat, kind)
   284  				return clusterStatus, nil
   285  			}
   286  			// Case 4 - Cluster Bootstrapped, An Error For Kind
   287  			// @returns status = Error
   288  			// @returns message = Kind error message
   289  			clusterStatus.Status = status.Error
   290  			clusterStatus.Message = status.MergeErrorMessages(errorMessage)
   291  			return clusterStatus, nil
   292  		}
   293  		return clusterStatus, nil
   294  	}
   295  }
   296  
   297  func (s *storeClusterService) GetClusterComponentsStatus(ctx context.Context, cluster *model.CombinedStatus) ([]*model.ComponentStatus, error) {
   298  	var (
   299  		clusterStatus = make([]*model.ComponentStatus, 0)
   300  		errs          error
   301  	)
   302  	if cluster.ClusterEdgeID == "" {
   303  		return nil, ErrClusterMissing
   304  	}
   305  	for component := range components {
   306  		status, err := s.GetComponentStatus(ctx, cluster.ClusterEdgeID, component, cluster.Active)
   307  		if err != nil {
   308  			errs = errors.Join(errs, err)
   309  			continue
   310  		}
   311  		clusterStatus = append(clusterStatus, &model.ComponentStatus{
   312  			Status: &model.ClusterStatus{
   313  				Status:  status.Status,
   314  				Message: status.Message,
   315  			},
   316  			Component: component,
   317  		})
   318  	}
   319  	return clusterStatus, errs
   320  }
   321  
   322  func (s *storeClusterService) GetCombinedClusterStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.ClusterStatus, error) {
   323  	if cluster == nil {
   324  		return nil, ErrClusterMissing
   325  	}
   326  	clusterStatus := &model.ClusterStatus{}
   327  	switch cluster.Active {
   328  	case false:
   329  		// Case 1 - Cluster Not Bootstrapped
   330  		// @returns status = REGISTERED
   331  		// @returns message = Registered but Edge is not installed
   332  		// if cluster is not bootstrapped (i.e Edge Manifests have not been applied)
   333  		// then there is no way status will make it up
   334  		clusterStatus.Status = status.Registered
   335  		clusterStatus.Message = status.RegisteredMessage
   336  		return clusterStatus, nil
   337  	default:
   338  		statuses, err := s.getStatus(ctx, cluster.ClusterEdgeID)
   339  		if err != nil {
   340  			return nil, err
   341  		}
   342  		shipmentReady := isReady(whv1.ShipmentGVK.Kind, statuses)
   343  		kustomizationReady := isReady(kustomizeApi.KustomizationKind, statuses)
   344  		bucketReady := isReady(sourceApi.BucketKind, statuses)
   345  		// Case 2 - Cluster Bootstrapped, Status = Ready, Kustomization = Ready, Shipment = Ready, Bucket = Ready
   346  		// @returns status = Ready
   347  		// @returns message = Cluster is ready
   348  		if shipmentReady && kustomizationReady && bucketReady {
   349  			clusterStatus.Status = status.Ready
   350  			clusterStatus.Message = status.ReadyMessage
   351  			return clusterStatus, nil
   352  		}
   353  		// Case 3 - Cluster Bootstrapped, Any Error in Kustomization, Shipment, or Bucket
   354  		// @returns status = Error
   355  		// @returns message = Kustomization + Shipment error message
   356  		if !shipmentReady || !kustomizationReady || !bucketReady { //nolint:nestif
   357  			var (
   358  				shipmentErr      = ""
   359  				kustomizationErr = ""
   360  				bucketErr        = ""
   361  				errorMessage     = make([]string, 0)
   362  				notReported      = make(map[string]bool)
   363  			)
   364  			if !shipmentReady {
   365  				if err := s.getStatusErrorMessage(ctx, cluster.ClusterEdgeID, whv1.ShipmentGVK.Kind, &shipmentErr, notReported, &errorMessage); err != nil {
   366  					return clusterStatus, err
   367  				}
   368  			}
   369  			if !kustomizationReady {
   370  				if err := s.getStatusErrorMessage(ctx, cluster.ClusterEdgeID, kustomizeApi.KustomizationKind, &kustomizationErr, notReported, &errorMessage); err != nil {
   371  					return clusterStatus, err
   372  				}
   373  			}
   374  			if !bucketReady {
   375  				if err := s.getStatusErrorMessage(ctx, cluster.ClusterEdgeID, sourceApi.BucketKind, &bucketErr, notReported, &errorMessage); err != nil {
   376  					return clusterStatus, err
   377  				}
   378  			}
   379  			// Case 4 - Cluster Bootstrapped, Kustomization = Error, Shipment = Not Reported, Bucket = Ready
   380  			// @returns status = Installing
   381  			// @returns message = Kustomization error message + Waiting for not Shipment status
   382  			//
   383  			// AND
   384  			//
   385  			// Case 5 - Cluster Bootstrapped, Kustomization = Not reported, Shipment = Not Reported, Bucket = Ready
   386  			// @returns status = Installing
   387  			// @returns message = Waiting for not Shipment, Kustomization status
   388  			shipmentNotReported := status.IsNotReported(whv1.ShipmentGVK.Kind, notReported)
   389  			kustomizationNotReported := status.IsNotReported(kustomizeApi.KustomizationKind, notReported)
   390  			bucketNotReported := status.IsNotReported(sourceApi.BucketKind, notReported)
   391  			if shipmentNotReported || kustomizationNotReported || bucketNotReported {
   392  				if shipmentNotReported {
   393  					errorMessage = append(errorMessage, fmt.Sprintf("%s %s status", status.NotReportedFormat, whv1.ShipmentGVK.Kind))
   394  				}
   395  				if kustomizationNotReported {
   396  					errorMessage = append(errorMessage, fmt.Sprintf("%s %s status", status.NotReportedFormat, kustomizeApi.KustomizationKind))
   397  				}
   398  				if bucketNotReported {
   399  					errorMessage = append(errorMessage, fmt.Sprintf("%s %s status", status.NotReportedFormat, sourceApi.BucketKind))
   400  				}
   401  				clusterStatus.Status = status.Installing
   402  				if shipmentNotReported && kustomizationNotReported && bucketNotReported {
   403  					// Case 6 - Cluster Bootstrapped, Ready = False, Kustomization = Not reported/NA, Shipment = Not Reported/NA, Bucket = Not Reported/NA
   404  					// @returns status = Installing
   405  					// @returns message = Waiting for installation to complete
   406  					clusterStatus.Message = status.InstallingMessage
   407  				} else {
   408  					clusterStatus.Message = status.MergeErrorMessages(errorMessage)
   409  				}
   410  				return clusterStatus, nil
   411  			}
   412  			clusterStatus.Status = status.Error
   413  			clusterStatus.Message = status.MergeErrorMessages(errorMessage)
   414  			return clusterStatus, nil
   415  		}
   416  		return clusterStatus, nil
   417  	}
   418  }
   419  
   420  func (s *storeClusterService) GetSupportStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.SupportStatus, error) {
   421  	supportStatus := &model.SupportStatus{
   422  		InfraSupportStatus:  &model.InfraSupportStatus{},
   423  		EdgeOsSupportStatus: &model.EdgeOsSupportStatus{},
   424  	}
   425  
   426  	labelList, err := s.GetLabelsForCluster(ctx, cluster.ClusterEdgeID)
   427  	if err != nil {
   428  		return nil, sqlerr.Wrap(err)
   429  	}
   430  	cloudInfraVersion := version.New()
   431  	_, cloudInfraMinorVersion, _, err := cloudInfraVersion.SemVerMajorMinorPatch()
   432  	if err != nil {
   433  		return nil, err
   434  	}
   435  	edgeOSArtifact := constants.EdgeOSArtifact
   436  	cloudInfraCompatibility, err := s.CompatibilityService.GetArtifactVersionCompatibility(ctx, model.ArtifactVersion{Name: fleet.Store, Version: cloudInfraVersion.SemVer}, &edgeOSArtifact)
   437  	if err != nil {
   438  		return nil, err
   439  	}
   440  	activeVer, err := s.GetActiveEdgeVersion(ctx, cluster.ClusterEdgeID)
   441  	if err != nil {
   442  		return nil, err
   443  	}
   444  	if activeVer == "" {
   445  		supportStatus.InfraSupportStatus.Status = status.ErrGettingVersion
   446  		supportStatus.InfraSupportStatus.Message = status.NoActiveVersion
   447  		return supportStatus, nil
   448  	}
   449  	// parse version for x.x.x
   450  	re := regexp.MustCompile(`v?(\d+\.\d+\.\d+)`)
   451  	activeVerMatch := re.FindStringSubmatch(activeVer)
   452  
   453  	fleetVersionParts := strings.Split(activeVerMatch[1], ".")
   454  	fleetMinorVersion, err := strconv.Atoi(fleetVersionParts[1])
   455  	if err != nil {
   456  		return nil, err
   457  	}
   458  
   459  	if cloudInfraMinorVersion-fleetMinorVersion > cloudInfraCompatibility.NthIndex {
   460  		supportStatus.InfraSupportStatus.Status = status.ClusterOutOfSupport
   461  		supportStatus.InfraSupportStatus.Message = fmt.Sprintf("Edge Infra version %s is out of support with current version %s", activeVerMatch[1], cloudInfraVersion.SemVer)
   462  	} else if cloudInfraMinorVersion-fleetMinorVersion == cloudInfraCompatibility.NthIndex {
   463  		supportStatus.InfraSupportStatus.Status = status.ClusterNearingEndOfSupport
   464  		supportStatus.InfraSupportStatus.Message = fmt.Sprintf("Edge Infra version %s will be out of support in the next release", activeVerMatch[1])
   465  	} else {
   466  		supportStatus.InfraSupportStatus.Status = status.Supported
   467  		supportStatus.InfraSupportStatus.Message = fmt.Sprintf("Edge Infra version %s supported", activeVerMatch[1])
   468  	}
   469  	if labelList == nil || !slices.ContainsFunc(labelList, func(label *model.Label) bool { return label.Key == ctypes.DSDS }) {
   470  		return supportStatus, nil
   471  	}
   472  	// get terminal version info
   473  	terminals, err := s.TerminalService.GetTerminals(ctx, &cluster.ClusterEdgeID, nil)
   474  	if err != nil {
   475  		return nil, err
   476  	}
   477  	// get earliest terminal version, set to an ASCII value higher than digits
   478  	earliestTerminalVersion := "a"
   479  	for _, terminal := range terminals {
   480  		if terminal.Version == "" {
   481  			continue
   482  		}
   483  		termVersion := re.FindStringSubmatch(terminal.Version)
   484  		if len(termVersion) <= 1 {
   485  			return supportStatus, nil
   486  		}
   487  		if termVersion[1] < earliestTerminalVersion {
   488  			earliestTerminalVersion = termVersion[1]
   489  		}
   490  	}
   491  	if earliestTerminalVersion == "a" {
   492  		supportStatus.EdgeOsSupportStatus.Status = status.ErrGettingVersion
   493  		supportStatus.EdgeOsSupportStatus.Message = status.NoTerminalVersion
   494  		return supportStatus, nil
   495  	}
   496  	// get minor version of terminal
   497  	termVersionParts := strings.Split(earliestTerminalVersion, ".")
   498  	edgeOsSupported := slices.ContainsFunc(cloudInfraCompatibility.CompatibleArtifacts, func(artifact *model.ArtifactVersion) bool {
   499  		return artifact.Version == termVersionParts[0]+"."+termVersionParts[1]
   500  	})
   501  	earliestCompatibleOS := cloudInfraCompatibility.CompatibleArtifacts[len(cloudInfraCompatibility.CompatibleArtifacts)-1].Version
   502  	if !edgeOsSupported {
   503  		supportStatus.EdgeOsSupportStatus.Status = status.ClusterOutOfSupport
   504  		supportStatus.EdgeOsSupportStatus.Message = fmt.Sprintf("One or more terminals on EdgeOS version %s is out of support", earliestTerminalVersion)
   505  	} else if termVersionParts[0]+"."+termVersionParts[1] == earliestCompatibleOS {
   506  		supportStatus.EdgeOsSupportStatus.Status = status.ClusterNearingEndOfSupport
   507  		supportStatus.EdgeOsSupportStatus.Message = fmt.Sprintf("One or more terminals on EdgeOS version %s will be out of support in next release", earliestTerminalVersion)
   508  	} else {
   509  		supportStatus.EdgeOsSupportStatus.Status = status.Supported
   510  		supportStatus.EdgeOsSupportStatus.Message = fmt.Sprintf("EdgeOS version %s supported", earliestTerminalVersion)
   511  	}
   512  	return supportStatus, nil
   513  }
   514  
   515  func (s *storeClusterService) getStatus(ctx context.Context, clusterEdgeID string) (map[string]map[string]string, error) {
   516  	stats := make(map[string]map[string]string)
   517  	rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetClusterStatuses, clusterEdgeID, false)
   518  	if err != nil {
   519  		return stats, err
   520  	}
   521  	for rows.Next() {
   522  		var (
   523  			kind     = ""
   524  			jsonpath = ""
   525  			value    = ""
   526  		)
   527  		if err := rows.Scan(&kind, &jsonpath, &value); err != nil {
   528  			return stats, err
   529  		}
   530  		_, exists := stats[kind]
   531  		if !exists {
   532  			stats[kind] = make(map[string]string)
   533  		}
   534  		stats[kind][jsonpath] = value
   535  	}
   536  	return stats, nil
   537  }
   538  
   539  func (s *storeClusterService) getStatusErrorMessage(ctx context.Context, clusterEdgeID, kind string, errMessage *string, notReported map[string]bool, errorMessage *[]string) error {
   540  	row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterErrorStatusMessage, kind, clusterEdgeID, false)
   541  	if err := row.Scan(errMessage); err != nil {
   542  		if !errors.Is(err, sql.ErrNoRows) {
   543  			return err
   544  		}
   545  		notReported[kind] = true
   546  	} else {
   547  		*errorMessage = append(*errorMessage, *errMessage)
   548  	}
   549  	return nil
   550  }
   551  
   552  func isReady(kind string, stat map[string]map[string]string) bool {
   553  	if statuses, exists := stat[kind]; exists {
   554  		for jsonpath, kindStatus := range statuses {
   555  			if jsonpath == `$.status.conditions[?(@.type == "Ready")].status` && kindStatus == status.IsReady {
   556  				return true
   557  			}
   558  		}
   559  	}
   560  	return false
   561  }
   562  
   563  func (s *storeClusterService) GetReplicationStatus(ctx context.Context, clusterEdgeID string) ([]*model.ReplicationStatus, error) {
   564  	rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetReplicationStatus, clusterEdgeID)
   565  	if err != nil {
   566  		return nil, err
   567  	}
   568  	replicationSets := false
   569  	var replications []string
   570  	for rows.Next() {
   571  		replicationSets = true
   572  		var replicationStatus model.ReplicationStatus
   573  		if err := rows.Scan(&replicationStatus.Name, &replicationStatus.Status); err != nil {
   574  			return nil, sqlerr.Wrap(err)
   575  		}
   576  		replicationStatus.Status = strings.Trim(replicationStatus.Status, "\"")
   577  		replications = append(replications, replicationStatus.Name)
   578  		// If any status is not "True", return immediately with CouchDBReplicationSet
   579  		if replicationStatus.Status != "True" { //nolint:goconst
   580  			return []*model.ReplicationStatus{
   581  				{Name: replicationStatus.Name, Status: replicationStatus.Status},
   582  			}, nil
   583  		}
   584  	}
   585  
   586  	if replicationSets {
   587  		return []*model.ReplicationStatus{{Name: strings.Join(replications, ", "), Status: "True"}}, nil
   588  	}
   589  
   590  	// empty case
   591  	return []*model.ReplicationStatus{}, nil
   592  }
   593  
   594  func (s *storeClusterService) GetClusterStatusFromCtlfish(ctx context.Context, cluster *model.Cluster) (*monitor.ClusterStatus, error) {
   595  	loqReq := mapper.GetClusterStatus()
   596  	res, err := s.BQClient.GetKubeResource(ctx, cluster.ProjectID, cluster, loqReq)
   597  	if err != nil {
   598  		return nil, err
   599  	}
   600  	if len(res) == 0 {
   601  		return nil, nil
   602  	}
   603  	clusterStatus := &monitor.ClusterStatus{}
   604  	err = json.Unmarshal([]byte(res[0]), clusterStatus)
   605  	if err != nil {
   606  		return nil, err
   607  	}
   608  	return clusterStatus, nil
   609  }
   610  
   611  func (s *storeClusterService) GetClusterStatus(ctx context.Context, cluster *model.Cluster) (*model.ClusterStatusResponse, error) {
   612  	storeStatus, err := s.GetClusterStatusFromCtlfish(ctx, cluster)
   613  	if err != nil {
   614  		return nil, err
   615  	}
   616  	if storeStatus == nil {
   617  		return &model.ClusterStatusResponse{Status: &model.ClusterStatus{Status: "Provisioning", Message: "Edge Provisioning"}}, nil
   618  	}
   619  	status := &model.ClusterStatus{}
   620  	status.Status = storeStatus.Status
   621  	status.Message = "Edge Syncing"
   622  	if storeStatus.Error != nil {
   623  		status.Message = storeStatus.Error.Message
   624  	}
   625  
   626  	return &model.ClusterStatusResponse{
   627  		Status:              status,
   628  		KubeVersion:         &storeStatus.NodeVersion,
   629  		BucketStatus:        toBucketStatusInformation(storeStatus.Buckets),
   630  		KustomizationStatus: toKustomizationStatusInformation(storeStatus.Kustomizations),
   631  	}, nil
   632  }
   633  
   634  func toBucketStatusInformation(bucketsInfo map[string]monitor.BucketInfo) []*model.BucketStatusInformation {
   635  	var buckets []*model.BucketStatusInformation
   636  	for name, bucket := range bucketsInfo {
   637  		fstatus := bucketsInfo[name].FluxStatus
   638  		bucketStatus := &model.BucketStatusInformation{
   639  			Excludes:   bucket.Excludes,
   640  			BucketName: bucket.BucketName,
   641  			FluxStatus: getFluxStatusInfo(name, &fstatus),
   642  		}
   643  		buckets = append(buckets, bucketStatus)
   644  	}
   645  	return buckets
   646  }
   647  
   648  func toKustomizationStatusInformation(KustomizationInfo map[string]monitor.KustomizationInfo) []*model.KustomizationStatusInformation {
   649  	var kustomizations []*model.KustomizationStatusInformation
   650  	for name, kustomization := range KustomizationInfo {
   651  		fstatus := KustomizationInfo[name].FluxStatus
   652  		kustomizationStatus := &model.KustomizationStatusInformation{
   653  			Path:       kustomization.Path,
   654  			Source:     kustomization.Source,
   655  			FluxStatus: getFluxStatusInfo(name, &fstatus),
   656  		}
   657  		kustomizations = append(kustomizations, kustomizationStatus)
   658  	}
   659  	return kustomizations
   660  }
   661  
   662  func getFluxStatusInfo(name string, fluxStatus *monitor.SyncInfo) *model.FluxStatusInformation {
   663  	return &model.FluxStatusInformation{
   664  		Name:          name,
   665  		Error:         fluxStatus.Error,
   666  		LastUpdated:   fluxStatus.LastUpdated,
   667  		Revision:      fluxStatus.Revision,
   668  		StatusMessage: fluxStatus.StatusMessage,
   669  		Suspended:     fluxStatus.Suspended,
   670  	}
   671  }
   672  
   673  func (s *storeClusterService) DeleteStoreEntry(ctx context.Context, clusterEdgeID string) error {
   674  	_, err := s.SQLDB.ExecContext(ctx, sqlquery.ClusterDeleteQuery, clusterEdgeID)
   675  	if err != nil {
   676  		return sqlerr.Wrap(err)
   677  	}
   678  	return nil
   679  }
   680  
   681  func (s *storeClusterService) SoftDeleteStoreEntry(ctx context.Context, clusterEdgeID string) error {
   682  	_, err := s.SQLDB.ExecContext(ctx, sqlquery.SoftClusterDeleteQuery, clusterEdgeID)
   683  	if err != nil {
   684  		return sqlerr.Wrap(err)
   685  	}
   686  	return nil
   687  }
   688  
   689  func (s *storeClusterService) UpdateStoreName(ctx context.Context, clusterEdgeID string, name string) error {
   690  	_, err := s.SQLDB.ExecContext(ctx, sqlquery.UpdateClusterNameQuery, name, clusterEdgeID)
   691  	return err
   692  }
   693  
   694  func (s *storeClusterService) GetClusterConfigmap(ctx context.Context, cluster *model.Cluster, configmapName, namespace string) (*corev1.ConfigMap, error) {
   695  	configmap, err := s.BQClient.GetKubeResource(ctx, cluster.ProjectID, cluster, model.LoqRequest{
   696  		Kind:      "ConfigMap",
   697  		Group:     "",
   698  		Version:   "v1",
   699  		Name:      &configmapName,
   700  		Namespace: &namespace,
   701  	})
   702  	if err != nil {
   703  		return nil, err
   704  	}
   705  	return mapper.ConvertToConfigMap(configmap)
   706  }
   707  
   708  func (s *storeClusterService) UpdateStoreSiteID(ctx context.Context, clusterEdgeID string, siteID string) error {
   709  	_, err := s.SQLDB.ExecContext(ctx, sqlquery.UpdateStoreSiteIDQuery, siteID, clusterEdgeID)
   710  	return err
   711  }
   712  
   713  func (s *storeClusterService) GetActiveEdgeVersion(ctx context.Context, clusterEdgeID string) (string, error) {
   714  	var activeVersion string
   715  
   716  	// Scan for active version from all healthy object id
   717  	row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetActiveEdgeVersionFromWatchedField, clusterEdgeID)
   718  	if err := row.Scan(&activeVersion); err != nil {
   719  		return "", nil
   720  	}
   721  
   722  	// Unmarshaling string data back
   723  	// When get value back from the DB, they are json string, e.g: ""hi"", so we want to unmarshal to get rid of the extra ""
   724  	var activeVersionUnmarshal activeVersionResp
   725  
   726  	errUnmarshal := json.Unmarshal([]byte(activeVersion), &activeVersionUnmarshal.version)
   727  	if errUnmarshal != nil {
   728  		return "", nil
   729  	}
   730  
   731  	return activeVersionUnmarshal.version, nil
   732  }
   733  
   734  func NewStoreClusterService(gkeService GkeClient, bqClient clients.BQClient, sqlDB *sql.DB, chariotService ChariotService, terminalService TerminalService, compatibilityService CompatibilityService) *storeClusterService { //nolint stupid
   735  	return &storeClusterService{
   736  		GkeService:           gkeService,
   737  		BQClient:             bqClient,
   738  		SQLDB:                sqlDB,
   739  		ChariotService:       chariotService,
   740  		TerminalService:      terminalService,
   741  		CompatibilityService: compatibilityService,
   742  	}
   743  }
   744  

View as plain text