package services import ( "context" "database/sql" "encoding/json" "errors" "fmt" "regexp" "slices" "strconv" "strings" kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1" sourceApi "github.com/fluxcd/source-controller/api/v1" "github.com/lib/pq" corev1 "k8s.io/api/core/v1" sqlerr "edge-infra.dev/pkg/edge/api/apierror/sql" "edge-infra.dev/pkg/edge/api/clients" "edge-infra.dev/pkg/edge/api/graph/mapper" "edge-infra.dev/pkg/edge/api/graph/model" sqlquery "edge-infra.dev/pkg/edge/api/sql" "edge-infra.dev/pkg/edge/api/status" "edge-infra.dev/pkg/edge/api/types" "edge-infra.dev/pkg/edge/constants" ctypes "edge-infra.dev/pkg/edge/constants/api/cluster" "edge-infra.dev/pkg/edge/constants/api/fleet" "edge-infra.dev/pkg/edge/ctlfish/monitor" whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha1" "edge-infra.dev/pkg/lib/runtime/version" ) var ( ErrClusterMissing = errors.New("cluster must be provided") ErrKindMissing = errors.New("kind must be provided") components = map[string]bool{ whv1.ShipmentGVK.Kind: true, kustomizeApi.KustomizationKind: true, sourceApi.BucketKind: true, } ) //go:generate mockgen -destination=../mocks/mock_store_cluster_service.go -package=mocks edge-infra.dev/pkg/edge/api/services StoreClusterService type StoreClusterService interface { GetClusters(ctx context.Context, projectID string, labels []string) ([]*model.Cluster, error) GetClusterStatus(ctx context.Context, cluster *model.Cluster) (*model.ClusterStatusResponse, error) GetCluster(ctx context.Context, clusterEdgeID string) (*model.Cluster, error) DeleteStoreEntry(ctx context.Context, clusterEdgeID string) error GetClusterByNameAndProject(ctx context.Context, clusterName, projectID string) (*model.Cluster, error) GetClusterByClusterEdgeID(ctx context.Context, clusterEdgeID string) (types.Cluster, error) SoftDeleteStoreEntry(ctx context.Context, clusterEdgeID string) error GetClusterConfigmap(ctx context.Context, cluster *model.Cluster, configmapName, namespace string) (*corev1.ConfigMap, error) GetLabelsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Label, error) GetClusterNetworkServices(ctx context.Context, clusterEdgeID string) ([]*model.ClusterNetworkServiceInfo, error) GetClusterNetworkServiceByNetworkID(ctx context.Context, clusterEdgeID, networkServiceID string) (*model.ClusterNetworkServiceInfo, error) GetClusterK8sNetworkServices(ctx context.Context, clusterEdgeID string) (map[string]string, error) CreateClusterNetworkServices(ctx context.Context, clusterEdgeID string, networkServicesInfo []*model.CreateNetworkServiceInfo) ([]*model.ClusterNetworkServiceInfo, error) UpdateClusterNetworkServices(ctx context.Context, clusterEdgeID string, networkServicesInfo []*model.UpdateNetworkServiceInfo, existingServiceTypesByID map[string]string) ([]*model.ClusterNetworkServiceInfo, error) DeleteClusterNetworkService(ctx context.Context, clusterEdgeID string, networkServiceID string) (bool, error) UpdateStoreName(ctx context.Context, clusterEdgeID string, name string) error UpdateStoreSiteID(ctx context.Context, clusterEdgeID string, siteID string) error GetEventsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Event, error) GetActiveEdgeVersion(ctx context.Context, clusterEdgeID string) (string, error) GetInfraStatus(ctx context.Context, clusterEdgeID string) (*model.InfraStatus, error) GetCombinedClusterStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.ClusterStatus, error) GetReplicationStatus(ctx context.Context, clusterEdgeID string) ([]*model.ReplicationStatus, error) GetClusterComponentsStatus(ctx context.Context, cluster *model.CombinedStatus) ([]*model.ComponentStatus, error) GetComponentStatus(ctx context.Context, clusterEdgeID, kind string, active bool) (*model.ClusterStatus, error) GetSupportStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.SupportStatus, error) } type storeClusterService struct { GkeService GkeClient BQClient clients.BQClient SQLDB *sql.DB ChariotService ChariotService TerminalService TerminalService CompatibilityService CompatibilityService } type activeVersionResp struct { version string } func (s *storeClusterService) GetClusterByClusterEdgeID(ctx context.Context, clusterEdgeID string) (types.Cluster, error) { cluster := types.Cluster{} row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterByEdgeID, clusterEdgeID) if err := row.Scan(&cluster.ClusterEdgeID, &cluster.ClusterName, &cluster.ProjectID, &cluster.Registered, &cluster.Active, &cluster.BannerEdgeID, &cluster.BSLSiteID, &cluster.FleetVersion); err != nil { return cluster, sqlerr.Wrap(err) } return cluster, nil } func (s *storeClusterService) GetClusters(ctx context.Context, bannerID string, labels []string) ([]*model.Cluster, error) { var uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion string var registered, active bool var rows *sql.Rows var err error if len(labels) > 0 { rows, err = s.SQLDB.QueryContext(ctx, sqlquery.GetClustersWithLabelQuery, bannerID, pq.Array(labels)) if err != nil { return nil, sqlerr.Wrap(err) } } else { rows, err = s.SQLDB.QueryContext(ctx, sqlquery.GetClustersQuery, bannerID) if err != nil { return nil, sqlerr.Wrap(err) } } defer rows.Close() clusters := []*model.Cluster{} for rows.Next() { if err := rows.Scan(&uuid, &name, &projectID, ®istered, &active, &bannerEdgeID, &bslSiteID, &fleetVersion); err != nil { return nil, sqlerr.Wrap(err) } labelList, err := s.GetLabelsForCluster(ctx, uuid) if err != nil { return nil, sqlerr.Wrap(err) } clusters = append(clusters, getCluster(uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion, labelList, registered, active)) } if err := rows.Err(); err != nil { return nil, sqlerr.Wrap(err) } return clusters, nil } func (s *storeClusterService) GetLabelsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Label, error) { var labels []*model.Label rows, err := s.SQLDB.QueryContext(ctx, sqlquery.SelectEdgeLabelsForCluster, clusterEdgeID) if err != nil { return nil, err } for rows.Next() { var label model.Label //label_edge_id, label_key, color, visible, editable, banner_edge_id, label_unique, description, label_type if err := rows.Scan(&label.LabelEdgeID, &label.Key, &label.Color, &label.Visible, &label.Editable, &label.BannerEdgeID, &label.Unique, &label.Description, &label.Type); err != nil { return nil, sqlerr.Wrap(err) } labels = append(labels, &label) } if err := rows.Err(); err != nil { return nil, sqlerr.Wrap(err) } return labels, nil } func (s *storeClusterService) GetCluster(ctx context.Context, clusterEdgeID string) (*model.Cluster, error) { var uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion string var registered, active bool row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterByEdgeID, clusterEdgeID) if err := row.Scan(&uuid, &name, &projectID, ®istered, &active, &bannerEdgeID, &bslSiteID, &fleetVersion); err != nil { return nil, sqlerr.Wrap(err) } labelList, err := s.GetLabelsForCluster(ctx, clusterEdgeID) if err != nil { return nil, sqlerr.Wrap(err) } return getCluster(uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion, labelList, registered, active), nil } func (s *storeClusterService) GetEventsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Event, error) { var events []*model.Event rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetClusterEvents, clusterEdgeID) if err != nil { return nil, err } for rows.Next() { var e model.Event e.InvolvedObject = &model.InvolvedObject{} if err := rows.Scan(&e.EventEdgeID, &e.Name, &e.InvolvedObject.Kind, &e.InvolvedObject.Namespace, &e.InvolvedObject.Name, &e.Reason, &e.Message, &e.Status, &e.Source, &e.Annotations, &e.TerminalID, &e.ClusterEdgeID, &e.CreatedAt); err != nil { return nil, sqlerr.Wrap(err) } events = append(events, &e) } if err := rows.Err(); err != nil { return nil, sqlerr.Wrap(err) } return events, nil } func (s *storeClusterService) GetInfraStatus(ctx context.Context, clusterEdgeID string) (*model.InfraStatus, error) { var clusterID, clusterName, infraStatus, infraStatusDetails string row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterInfraStatusByID, clusterEdgeID) if err := row.Scan(&clusterID, &clusterName, &infraStatus, &infraStatusDetails); err != nil { return nil, sqlerr.Wrap(err) } // Return status to either HEALTHY, UNHEALTHY or PROVISIONING if infraStatus == "READY" { infraStatus = "HEALTHY" } else if infraStatus == "ERROR" { infraStatus = "UNHEALTHY" } infraStatusObj := &model.InfraStatus{ Status: infraStatus, Message: infraStatusDetails, } return infraStatusObj, nil } func (s *storeClusterService) GetClusterByNameAndProject(ctx context.Context, clusterName, projectID string) (*model.Cluster, error) { var clusterEdgeID, bannerEdgID, bslSiteID, fleetVersion string var registered, active bool row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterByNameAndProject, clusterName, projectID) if err := row.Scan(&clusterEdgeID, ®istered, &active, &bannerEdgID, &bslSiteID, &fleetVersion); err != nil { return nil, sqlerr.Wrap(err) } return getCluster(clusterEdgeID, clusterName, projectID, bannerEdgID, bslSiteID, fleetVersion, nil, registered, active), nil } func getCluster(uuid, storeName, projectID, bannerEdgeID, bslSiteID, fleetVersion string, label []*model.Label, registered, active bool) *model.Cluster { store := &model.Cluster{ ClusterEdgeID: uuid, Name: storeName, ProjectID: projectID, Registered: ®istered, Active: &active, Labels: label, BannerEdgeID: bannerEdgeID, BslSiteID: &bslSiteID, FleetVersion: fleetVersion, } return store } func (s *storeClusterService) GetComponentStatus(ctx context.Context, clusterEdgeID, kind string, active bool) (*model.ClusterStatus, error) { if clusterEdgeID == "" { return nil, ErrClusterMissing } if kind == "" { return nil, ErrKindMissing } clusterStatus := &model.ClusterStatus{} switch active { case false: // Case 1 - Cluster Not Bootstrapped // @returns status = REGISTERED // @returns message = Registered but Edge is not installed // if cluster is not bootstrapped (i.e Edge Manifests have not been applied) // then there is no way status will make it up clusterStatus.Status = status.NotAvailable clusterStatus.Message = status.NotAvailableStatusMessage return clusterStatus, nil default: isReady := "" row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterStatus, kind, clusterEdgeID, false) if err := row.Scan(&isReady); err != nil { if !errors.Is(err, sql.ErrNoRows) { return clusterStatus, err } clusterStatus.Status = status.NotAvailable clusterStatus.Message = status.NotAvailableStatusMessage return clusterStatus, nil } switch { case isReady == status.IsReady: // Case 2 - Cluster Bootstrapped, Status = Ready, Kind = Ready // @returns status = Ready // @returns message = Kind is ready clusterStatus.Status = status.Ready clusterStatus.Message = fmt.Sprintf(status.KindReadyMessage, kind) return clusterStatus, nil case isReady == status.NotReady: var ( kindErr = "" errorMessage = make([]string, 0) notReported = make(map[string]bool) ) if err := s.getStatusErrorMessage(ctx, clusterEdgeID, kind, &kindErr, notReported, &errorMessage); err != nil { return clusterStatus, err } // Case 3 - Cluster Bootstrapped, kind = Not Reported // @returns status = Installing // @returns message = Waiting for Kind status if kindNotReported := status.IsNotReported(kind, notReported); kindNotReported { clusterStatus.Status = status.Installing clusterStatus.Message = fmt.Sprintf("%s %s status", status.NotReportedFormat, kind) return clusterStatus, nil } // Case 4 - Cluster Bootstrapped, An Error For Kind // @returns status = Error // @returns message = Kind error message clusterStatus.Status = status.Error clusterStatus.Message = status.MergeErrorMessages(errorMessage) return clusterStatus, nil } return clusterStatus, nil } } func (s *storeClusterService) GetClusterComponentsStatus(ctx context.Context, cluster *model.CombinedStatus) ([]*model.ComponentStatus, error) { var ( clusterStatus = make([]*model.ComponentStatus, 0) errs error ) if cluster.ClusterEdgeID == "" { return nil, ErrClusterMissing } for component := range components { status, err := s.GetComponentStatus(ctx, cluster.ClusterEdgeID, component, cluster.Active) if err != nil { errs = errors.Join(errs, err) continue } clusterStatus = append(clusterStatus, &model.ComponentStatus{ Status: &model.ClusterStatus{ Status: status.Status, Message: status.Message, }, Component: component, }) } return clusterStatus, errs } func (s *storeClusterService) GetCombinedClusterStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.ClusterStatus, error) { if cluster == nil { return nil, ErrClusterMissing } clusterStatus := &model.ClusterStatus{} switch cluster.Active { case false: // Case 1 - Cluster Not Bootstrapped // @returns status = REGISTERED // @returns message = Registered but Edge is not installed // if cluster is not bootstrapped (i.e Edge Manifests have not been applied) // then there is no way status will make it up clusterStatus.Status = status.Registered clusterStatus.Message = status.RegisteredMessage return clusterStatus, nil default: statuses, err := s.getStatus(ctx, cluster.ClusterEdgeID) if err != nil { return nil, err } shipmentReady := isReady(whv1.ShipmentGVK.Kind, statuses) kustomizationReady := isReady(kustomizeApi.KustomizationKind, statuses) bucketReady := isReady(sourceApi.BucketKind, statuses) // Case 2 - Cluster Bootstrapped, Status = Ready, Kustomization = Ready, Shipment = Ready, Bucket = Ready // @returns status = Ready // @returns message = Cluster is ready if shipmentReady && kustomizationReady && bucketReady { clusterStatus.Status = status.Ready clusterStatus.Message = status.ReadyMessage return clusterStatus, nil } // Case 3 - Cluster Bootstrapped, Any Error in Kustomization, Shipment, or Bucket // @returns status = Error // @returns message = Kustomization + Shipment error message if !shipmentReady || !kustomizationReady || !bucketReady { //nolint:nestif var ( shipmentErr = "" kustomizationErr = "" bucketErr = "" errorMessage = make([]string, 0) notReported = make(map[string]bool) ) if !shipmentReady { if err := s.getStatusErrorMessage(ctx, cluster.ClusterEdgeID, whv1.ShipmentGVK.Kind, &shipmentErr, notReported, &errorMessage); err != nil { return clusterStatus, err } } if !kustomizationReady { if err := s.getStatusErrorMessage(ctx, cluster.ClusterEdgeID, kustomizeApi.KustomizationKind, &kustomizationErr, notReported, &errorMessage); err != nil { return clusterStatus, err } } if !bucketReady { if err := s.getStatusErrorMessage(ctx, cluster.ClusterEdgeID, sourceApi.BucketKind, &bucketErr, notReported, &errorMessage); err != nil { return clusterStatus, err } } // Case 4 - Cluster Bootstrapped, Kustomization = Error, Shipment = Not Reported, Bucket = Ready // @returns status = Installing // @returns message = Kustomization error message + Waiting for not Shipment status // // AND // // Case 5 - Cluster Bootstrapped, Kustomization = Not reported, Shipment = Not Reported, Bucket = Ready // @returns status = Installing // @returns message = Waiting for not Shipment, Kustomization status shipmentNotReported := status.IsNotReported(whv1.ShipmentGVK.Kind, notReported) kustomizationNotReported := status.IsNotReported(kustomizeApi.KustomizationKind, notReported) bucketNotReported := status.IsNotReported(sourceApi.BucketKind, notReported) if shipmentNotReported || kustomizationNotReported || bucketNotReported { if shipmentNotReported { errorMessage = append(errorMessage, fmt.Sprintf("%s %s status", status.NotReportedFormat, whv1.ShipmentGVK.Kind)) } if kustomizationNotReported { errorMessage = append(errorMessage, fmt.Sprintf("%s %s status", status.NotReportedFormat, kustomizeApi.KustomizationKind)) } if bucketNotReported { errorMessage = append(errorMessage, fmt.Sprintf("%s %s status", status.NotReportedFormat, sourceApi.BucketKind)) } clusterStatus.Status = status.Installing if shipmentNotReported && kustomizationNotReported && bucketNotReported { // Case 6 - Cluster Bootstrapped, Ready = False, Kustomization = Not reported/NA, Shipment = Not Reported/NA, Bucket = Not Reported/NA // @returns status = Installing // @returns message = Waiting for installation to complete clusterStatus.Message = status.InstallingMessage } else { clusterStatus.Message = status.MergeErrorMessages(errorMessage) } return clusterStatus, nil } clusterStatus.Status = status.Error clusterStatus.Message = status.MergeErrorMessages(errorMessage) return clusterStatus, nil } return clusterStatus, nil } } func (s *storeClusterService) GetSupportStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.SupportStatus, error) { supportStatus := &model.SupportStatus{ InfraSupportStatus: &model.InfraSupportStatus{}, EdgeOsSupportStatus: &model.EdgeOsSupportStatus{}, } labelList, err := s.GetLabelsForCluster(ctx, cluster.ClusterEdgeID) if err != nil { return nil, sqlerr.Wrap(err) } cloudInfraVersion := version.New() _, cloudInfraMinorVersion, _, err := cloudInfraVersion.SemVerMajorMinorPatch() if err != nil { return nil, err } edgeOSArtifact := constants.EdgeOSArtifact cloudInfraCompatibility, err := s.CompatibilityService.GetArtifactVersionCompatibility(ctx, model.ArtifactVersion{Name: fleet.Store, Version: cloudInfraVersion.SemVer}, &edgeOSArtifact) if err != nil { return nil, err } activeVer, err := s.GetActiveEdgeVersion(ctx, cluster.ClusterEdgeID) if err != nil { return nil, err } if activeVer == "" { supportStatus.InfraSupportStatus.Status = status.ErrGettingVersion supportStatus.InfraSupportStatus.Message = status.NoActiveVersion return supportStatus, nil } // parse version for x.x.x re := regexp.MustCompile(`v?(\d+\.\d+\.\d+)`) activeVerMatch := re.FindStringSubmatch(activeVer) fleetVersionParts := strings.Split(activeVerMatch[1], ".") fleetMinorVersion, err := strconv.Atoi(fleetVersionParts[1]) if err != nil { return nil, err } if cloudInfraMinorVersion-fleetMinorVersion > cloudInfraCompatibility.NthIndex { supportStatus.InfraSupportStatus.Status = status.ClusterOutOfSupport supportStatus.InfraSupportStatus.Message = fmt.Sprintf("Edge Infra version %s is out of support with current version %s", activeVerMatch[1], cloudInfraVersion.SemVer) } else if cloudInfraMinorVersion-fleetMinorVersion == cloudInfraCompatibility.NthIndex { supportStatus.InfraSupportStatus.Status = status.ClusterNearingEndOfSupport supportStatus.InfraSupportStatus.Message = fmt.Sprintf("Edge Infra version %s will be out of support in the next release", activeVerMatch[1]) } else { supportStatus.InfraSupportStatus.Status = status.Supported supportStatus.InfraSupportStatus.Message = fmt.Sprintf("Edge Infra version %s supported", activeVerMatch[1]) } if labelList == nil || !slices.ContainsFunc(labelList, func(label *model.Label) bool { return label.Key == ctypes.DSDS }) { return supportStatus, nil } // get terminal version info terminals, err := s.TerminalService.GetTerminals(ctx, &cluster.ClusterEdgeID, nil) if err != nil { return nil, err } // get earliest terminal version, set to an ASCII value higher than digits earliestTerminalVersion := "a" for _, terminal := range terminals { if terminal.Version == "" { continue } termVersion := re.FindStringSubmatch(terminal.Version) if len(termVersion) <= 1 { return supportStatus, nil } if termVersion[1] < earliestTerminalVersion { earliestTerminalVersion = termVersion[1] } } if earliestTerminalVersion == "a" { supportStatus.EdgeOsSupportStatus.Status = status.ErrGettingVersion supportStatus.EdgeOsSupportStatus.Message = status.NoTerminalVersion return supportStatus, nil } // get minor version of terminal termVersionParts := strings.Split(earliestTerminalVersion, ".") edgeOsSupported := slices.ContainsFunc(cloudInfraCompatibility.CompatibleArtifacts, func(artifact *model.ArtifactVersion) bool { return artifact.Version == termVersionParts[0]+"."+termVersionParts[1] }) earliestCompatibleOS := cloudInfraCompatibility.CompatibleArtifacts[len(cloudInfraCompatibility.CompatibleArtifacts)-1].Version if !edgeOsSupported { supportStatus.EdgeOsSupportStatus.Status = status.ClusterOutOfSupport supportStatus.EdgeOsSupportStatus.Message = fmt.Sprintf("One or more terminals on EdgeOS version %s is out of support", earliestTerminalVersion) } else if termVersionParts[0]+"."+termVersionParts[1] == earliestCompatibleOS { supportStatus.EdgeOsSupportStatus.Status = status.ClusterNearingEndOfSupport supportStatus.EdgeOsSupportStatus.Message = fmt.Sprintf("One or more terminals on EdgeOS version %s will be out of support in next release", earliestTerminalVersion) } else { supportStatus.EdgeOsSupportStatus.Status = status.Supported supportStatus.EdgeOsSupportStatus.Message = fmt.Sprintf("EdgeOS version %s supported", earliestTerminalVersion) } return supportStatus, nil } func (s *storeClusterService) getStatus(ctx context.Context, clusterEdgeID string) (map[string]map[string]string, error) { stats := make(map[string]map[string]string) rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetClusterStatuses, clusterEdgeID, false) if err != nil { return stats, err } for rows.Next() { var ( kind = "" jsonpath = "" value = "" ) if err := rows.Scan(&kind, &jsonpath, &value); err != nil { return stats, err } _, exists := stats[kind] if !exists { stats[kind] = make(map[string]string) } stats[kind][jsonpath] = value } return stats, nil } func (s *storeClusterService) getStatusErrorMessage(ctx context.Context, clusterEdgeID, kind string, errMessage *string, notReported map[string]bool, errorMessage *[]string) error { row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterErrorStatusMessage, kind, clusterEdgeID, false) if err := row.Scan(errMessage); err != nil { if !errors.Is(err, sql.ErrNoRows) { return err } notReported[kind] = true } else { *errorMessage = append(*errorMessage, *errMessage) } return nil } func isReady(kind string, stat map[string]map[string]string) bool { if statuses, exists := stat[kind]; exists { for jsonpath, kindStatus := range statuses { if jsonpath == `$.status.conditions[?(@.type == "Ready")].status` && kindStatus == status.IsReady { return true } } } return false } func (s *storeClusterService) GetReplicationStatus(ctx context.Context, clusterEdgeID string) ([]*model.ReplicationStatus, error) { rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetReplicationStatus, clusterEdgeID) if err != nil { return nil, err } replicationSets := false var replications []string for rows.Next() { replicationSets = true var replicationStatus model.ReplicationStatus if err := rows.Scan(&replicationStatus.Name, &replicationStatus.Status); err != nil { return nil, sqlerr.Wrap(err) } replicationStatus.Status = strings.Trim(replicationStatus.Status, "\"") replications = append(replications, replicationStatus.Name) // If any status is not "True", return immediately with CouchDBReplicationSet if replicationStatus.Status != "True" { //nolint:goconst return []*model.ReplicationStatus{ {Name: replicationStatus.Name, Status: replicationStatus.Status}, }, nil } } if replicationSets { return []*model.ReplicationStatus{{Name: strings.Join(replications, ", "), Status: "True"}}, nil } // empty case return []*model.ReplicationStatus{}, nil } func (s *storeClusterService) GetClusterStatusFromCtlfish(ctx context.Context, cluster *model.Cluster) (*monitor.ClusterStatus, error) { loqReq := mapper.GetClusterStatus() res, err := s.BQClient.GetKubeResource(ctx, cluster.ProjectID, cluster, loqReq) if err != nil { return nil, err } if len(res) == 0 { return nil, nil } clusterStatus := &monitor.ClusterStatus{} err = json.Unmarshal([]byte(res[0]), clusterStatus) if err != nil { return nil, err } return clusterStatus, nil } func (s *storeClusterService) GetClusterStatus(ctx context.Context, cluster *model.Cluster) (*model.ClusterStatusResponse, error) { storeStatus, err := s.GetClusterStatusFromCtlfish(ctx, cluster) if err != nil { return nil, err } if storeStatus == nil { return &model.ClusterStatusResponse{Status: &model.ClusterStatus{Status: "Provisioning", Message: "Edge Provisioning"}}, nil } status := &model.ClusterStatus{} status.Status = storeStatus.Status status.Message = "Edge Syncing" if storeStatus.Error != nil { status.Message = storeStatus.Error.Message } return &model.ClusterStatusResponse{ Status: status, KubeVersion: &storeStatus.NodeVersion, BucketStatus: toBucketStatusInformation(storeStatus.Buckets), KustomizationStatus: toKustomizationStatusInformation(storeStatus.Kustomizations), }, nil } func toBucketStatusInformation(bucketsInfo map[string]monitor.BucketInfo) []*model.BucketStatusInformation { var buckets []*model.BucketStatusInformation for name, bucket := range bucketsInfo { fstatus := bucketsInfo[name].FluxStatus bucketStatus := &model.BucketStatusInformation{ Excludes: bucket.Excludes, BucketName: bucket.BucketName, FluxStatus: getFluxStatusInfo(name, &fstatus), } buckets = append(buckets, bucketStatus) } return buckets } func toKustomizationStatusInformation(KustomizationInfo map[string]monitor.KustomizationInfo) []*model.KustomizationStatusInformation { var kustomizations []*model.KustomizationStatusInformation for name, kustomization := range KustomizationInfo { fstatus := KustomizationInfo[name].FluxStatus kustomizationStatus := &model.KustomizationStatusInformation{ Path: kustomization.Path, Source: kustomization.Source, FluxStatus: getFluxStatusInfo(name, &fstatus), } kustomizations = append(kustomizations, kustomizationStatus) } return kustomizations } func getFluxStatusInfo(name string, fluxStatus *monitor.SyncInfo) *model.FluxStatusInformation { return &model.FluxStatusInformation{ Name: name, Error: fluxStatus.Error, LastUpdated: fluxStatus.LastUpdated, Revision: fluxStatus.Revision, StatusMessage: fluxStatus.StatusMessage, Suspended: fluxStatus.Suspended, } } func (s *storeClusterService) DeleteStoreEntry(ctx context.Context, clusterEdgeID string) error { _, err := s.SQLDB.ExecContext(ctx, sqlquery.ClusterDeleteQuery, clusterEdgeID) if err != nil { return sqlerr.Wrap(err) } return nil } func (s *storeClusterService) SoftDeleteStoreEntry(ctx context.Context, clusterEdgeID string) error { _, err := s.SQLDB.ExecContext(ctx, sqlquery.SoftClusterDeleteQuery, clusterEdgeID) if err != nil { return sqlerr.Wrap(err) } return nil } func (s *storeClusterService) UpdateStoreName(ctx context.Context, clusterEdgeID string, name string) error { _, err := s.SQLDB.ExecContext(ctx, sqlquery.UpdateClusterNameQuery, name, clusterEdgeID) return err } func (s *storeClusterService) GetClusterConfigmap(ctx context.Context, cluster *model.Cluster, configmapName, namespace string) (*corev1.ConfigMap, error) { configmap, err := s.BQClient.GetKubeResource(ctx, cluster.ProjectID, cluster, model.LoqRequest{ Kind: "ConfigMap", Group: "", Version: "v1", Name: &configmapName, Namespace: &namespace, }) if err != nil { return nil, err } return mapper.ConvertToConfigMap(configmap) } func (s *storeClusterService) UpdateStoreSiteID(ctx context.Context, clusterEdgeID string, siteID string) error { _, err := s.SQLDB.ExecContext(ctx, sqlquery.UpdateStoreSiteIDQuery, siteID, clusterEdgeID) return err } func (s *storeClusterService) GetActiveEdgeVersion(ctx context.Context, clusterEdgeID string) (string, error) { var activeVersion string // Scan for active version from all healthy object id row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetActiveEdgeVersionFromWatchedField, clusterEdgeID) if err := row.Scan(&activeVersion); err != nil { return "", nil } // Unmarshaling string data back // When get value back from the DB, they are json string, e.g: ""hi"", so we want to unmarshal to get rid of the extra "" var activeVersionUnmarshal activeVersionResp errUnmarshal := json.Unmarshal([]byte(activeVersion), &activeVersionUnmarshal.version) if errUnmarshal != nil { return "", nil } return activeVersionUnmarshal.version, nil } func NewStoreClusterService(gkeService GkeClient, bqClient clients.BQClient, sqlDB *sql.DB, chariotService ChariotService, terminalService TerminalService, compatibilityService CompatibilityService) *storeClusterService { //nolint stupid return &storeClusterService{ GkeService: gkeService, BQClient: bqClient, SQLDB: sqlDB, ChariotService: chariotService, TerminalService: terminalService, CompatibilityService: compatibilityService, } }