1 package services
2
3 import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "regexp"
10 "slices"
11 "strconv"
12 "strings"
13
14 kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1"
15 sourceApi "github.com/fluxcd/source-controller/api/v1"
16 "github.com/lib/pq"
17 corev1 "k8s.io/api/core/v1"
18
19 sqlerr "edge-infra.dev/pkg/edge/api/apierror/sql"
20 "edge-infra.dev/pkg/edge/api/clients"
21 "edge-infra.dev/pkg/edge/api/graph/mapper"
22 "edge-infra.dev/pkg/edge/api/graph/model"
23 sqlquery "edge-infra.dev/pkg/edge/api/sql"
24 "edge-infra.dev/pkg/edge/api/status"
25 "edge-infra.dev/pkg/edge/api/types"
26 "edge-infra.dev/pkg/edge/constants"
27 ctypes "edge-infra.dev/pkg/edge/constants/api/cluster"
28 "edge-infra.dev/pkg/edge/constants/api/fleet"
29 "edge-infra.dev/pkg/edge/ctlfish/monitor"
30 whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha1"
31 "edge-infra.dev/pkg/lib/runtime/version"
32 )
33
34 var (
35 ErrClusterMissing = errors.New("cluster must be provided")
36 ErrKindMissing = errors.New("kind must be provided")
37 components = map[string]bool{
38 whv1.ShipmentGVK.Kind: true,
39 kustomizeApi.KustomizationKind: true,
40 sourceApi.BucketKind: true,
41 }
42 )
43
44
45 type StoreClusterService interface {
46 GetClusters(ctx context.Context, projectID string, labels []string) ([]*model.Cluster, error)
47 GetClusterStatus(ctx context.Context, cluster *model.Cluster) (*model.ClusterStatusResponse, error)
48 GetCluster(ctx context.Context, clusterEdgeID string) (*model.Cluster, error)
49 DeleteStoreEntry(ctx context.Context, clusterEdgeID string) error
50 GetClusterByNameAndProject(ctx context.Context, clusterName, projectID string) (*model.Cluster, error)
51 GetClusterByClusterEdgeID(ctx context.Context, clusterEdgeID string) (types.Cluster, error)
52 SoftDeleteStoreEntry(ctx context.Context, clusterEdgeID string) error
53 GetClusterConfigmap(ctx context.Context, cluster *model.Cluster, configmapName, namespace string) (*corev1.ConfigMap, error)
54 GetLabelsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Label, error)
55 GetClusterNetworkServices(ctx context.Context, clusterEdgeID string) ([]*model.ClusterNetworkServiceInfo, error)
56 GetClusterNetworkServiceByNetworkID(ctx context.Context, clusterEdgeID, networkServiceID string) (*model.ClusterNetworkServiceInfo, error)
57 GetClusterK8sNetworkServices(ctx context.Context, clusterEdgeID string) (map[string]string, error)
58 CreateClusterNetworkServices(ctx context.Context, clusterEdgeID string, networkServicesInfo []*model.CreateNetworkServiceInfo) ([]*model.ClusterNetworkServiceInfo, error)
59 UpdateClusterNetworkServices(ctx context.Context, clusterEdgeID string, networkServicesInfo []*model.UpdateNetworkServiceInfo, existingServiceTypesByID map[string]string) ([]*model.ClusterNetworkServiceInfo, error)
60 DeleteClusterNetworkService(ctx context.Context, clusterEdgeID string, networkServiceID string) (bool, error)
61 UpdateStoreName(ctx context.Context, clusterEdgeID string, name string) error
62 UpdateStoreSiteID(ctx context.Context, clusterEdgeID string, siteID string) error
63 GetEventsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Event, error)
64 GetActiveEdgeVersion(ctx context.Context, clusterEdgeID string) (string, error)
65 GetInfraStatus(ctx context.Context, clusterEdgeID string) (*model.InfraStatus, error)
66 GetCombinedClusterStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.ClusterStatus, error)
67 GetReplicationStatus(ctx context.Context, clusterEdgeID string) ([]*model.ReplicationStatus, error)
68 GetClusterComponentsStatus(ctx context.Context, cluster *model.CombinedStatus) ([]*model.ComponentStatus, error)
69 GetComponentStatus(ctx context.Context, clusterEdgeID, kind string, active bool) (*model.ClusterStatus, error)
70 GetSupportStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.SupportStatus, error)
71 }
72
73 type storeClusterService struct {
74 GkeService GkeClient
75 BQClient clients.BQClient
76 SQLDB *sql.DB
77 ChariotService ChariotService
78 TerminalService TerminalService
79 CompatibilityService CompatibilityService
80 }
81
82 type activeVersionResp struct {
83 version string
84 }
85
86 func (s *storeClusterService) GetClusterByClusterEdgeID(ctx context.Context, clusterEdgeID string) (types.Cluster, error) {
87 cluster := types.Cluster{}
88 row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterByEdgeID, clusterEdgeID)
89 if err := row.Scan(&cluster.ClusterEdgeID, &cluster.ClusterName, &cluster.ProjectID, &cluster.Registered, &cluster.Active, &cluster.BannerEdgeID, &cluster.BSLSiteID, &cluster.FleetVersion); err != nil {
90 return cluster, sqlerr.Wrap(err)
91 }
92 return cluster, nil
93 }
94
95 func (s *storeClusterService) GetClusters(ctx context.Context, bannerID string, labels []string) ([]*model.Cluster, error) {
96 var uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion string
97 var registered, active bool
98 var rows *sql.Rows
99 var err error
100 if len(labels) > 0 {
101 rows, err = s.SQLDB.QueryContext(ctx, sqlquery.GetClustersWithLabelQuery, bannerID, pq.Array(labels))
102 if err != nil {
103 return nil, sqlerr.Wrap(err)
104 }
105 } else {
106 rows, err = s.SQLDB.QueryContext(ctx, sqlquery.GetClustersQuery, bannerID)
107 if err != nil {
108 return nil, sqlerr.Wrap(err)
109 }
110 }
111 defer rows.Close()
112 clusters := []*model.Cluster{}
113 for rows.Next() {
114 if err := rows.Scan(&uuid, &name, &projectID, ®istered, &active, &bannerEdgeID, &bslSiteID, &fleetVersion); err != nil {
115 return nil, sqlerr.Wrap(err)
116 }
117 labelList, err := s.GetLabelsForCluster(ctx, uuid)
118 if err != nil {
119 return nil, sqlerr.Wrap(err)
120 }
121 clusters = append(clusters, getCluster(uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion, labelList, registered, active))
122 }
123 if err := rows.Err(); err != nil {
124 return nil, sqlerr.Wrap(err)
125 }
126 return clusters, nil
127 }
128
129 func (s *storeClusterService) GetLabelsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Label, error) {
130 var labels []*model.Label
131 rows, err := s.SQLDB.QueryContext(ctx, sqlquery.SelectEdgeLabelsForCluster, clusterEdgeID)
132 if err != nil {
133 return nil, err
134 }
135 for rows.Next() {
136 var label model.Label
137
138 if err := rows.Scan(&label.LabelEdgeID, &label.Key, &label.Color, &label.Visible, &label.Editable, &label.BannerEdgeID, &label.Unique, &label.Description, &label.Type); err != nil {
139 return nil, sqlerr.Wrap(err)
140 }
141 labels = append(labels, &label)
142 }
143 if err := rows.Err(); err != nil {
144 return nil, sqlerr.Wrap(err)
145 }
146 return labels, nil
147 }
148
149 func (s *storeClusterService) GetCluster(ctx context.Context, clusterEdgeID string) (*model.Cluster, error) {
150 var uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion string
151 var registered, active bool
152 row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterByEdgeID, clusterEdgeID)
153 if err := row.Scan(&uuid, &name, &projectID, ®istered, &active, &bannerEdgeID, &bslSiteID, &fleetVersion); err != nil {
154 return nil, sqlerr.Wrap(err)
155 }
156 labelList, err := s.GetLabelsForCluster(ctx, clusterEdgeID)
157 if err != nil {
158 return nil, sqlerr.Wrap(err)
159 }
160 return getCluster(uuid, name, projectID, bannerEdgeID, bslSiteID, fleetVersion, labelList, registered, active), nil
161 }
162
163 func (s *storeClusterService) GetEventsForCluster(ctx context.Context, clusterEdgeID string) ([]*model.Event, error) {
164 var events []*model.Event
165 rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetClusterEvents, clusterEdgeID)
166 if err != nil {
167 return nil, err
168 }
169 for rows.Next() {
170 var e model.Event
171 e.InvolvedObject = &model.InvolvedObject{}
172 if err := rows.Scan(&e.EventEdgeID, &e.Name, &e.InvolvedObject.Kind, &e.InvolvedObject.Namespace, &e.InvolvedObject.Name, &e.Reason, &e.Message, &e.Status, &e.Source, &e.Annotations,
173 &e.TerminalID, &e.ClusterEdgeID, &e.CreatedAt); err != nil {
174 return nil, sqlerr.Wrap(err)
175 }
176 events = append(events, &e)
177 }
178 if err := rows.Err(); err != nil {
179 return nil, sqlerr.Wrap(err)
180 }
181 return events, nil
182 }
183
184 func (s *storeClusterService) GetInfraStatus(ctx context.Context, clusterEdgeID string) (*model.InfraStatus, error) {
185 var clusterID, clusterName, infraStatus, infraStatusDetails string
186 row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterInfraStatusByID, clusterEdgeID)
187 if err := row.Scan(&clusterID, &clusterName, &infraStatus, &infraStatusDetails); err != nil {
188 return nil, sqlerr.Wrap(err)
189 }
190
191
192 if infraStatus == "READY" {
193 infraStatus = "HEALTHY"
194 } else if infraStatus == "ERROR" {
195 infraStatus = "UNHEALTHY"
196 }
197
198 infraStatusObj := &model.InfraStatus{
199 Status: infraStatus,
200 Message: infraStatusDetails,
201 }
202
203 return infraStatusObj, nil
204 }
205
206 func (s *storeClusterService) GetClusterByNameAndProject(ctx context.Context, clusterName, projectID string) (*model.Cluster, error) {
207 var clusterEdgeID, bannerEdgID, bslSiteID, fleetVersion string
208 var registered, active bool
209 row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterByNameAndProject, clusterName, projectID)
210 if err := row.Scan(&clusterEdgeID, ®istered, &active, &bannerEdgID, &bslSiteID, &fleetVersion); err != nil {
211 return nil, sqlerr.Wrap(err)
212 }
213 return getCluster(clusterEdgeID, clusterName, projectID, bannerEdgID, bslSiteID, fleetVersion, nil, registered, active), nil
214 }
215
216 func getCluster(uuid, storeName, projectID, bannerEdgeID, bslSiteID, fleetVersion string, label []*model.Label, registered, active bool) *model.Cluster {
217 store := &model.Cluster{
218 ClusterEdgeID: uuid,
219 Name: storeName,
220 ProjectID: projectID,
221 Registered: ®istered,
222 Active: &active,
223 Labels: label,
224 BannerEdgeID: bannerEdgeID,
225 BslSiteID: &bslSiteID,
226 FleetVersion: fleetVersion,
227 }
228 return store
229 }
230
231 func (s *storeClusterService) GetComponentStatus(ctx context.Context, clusterEdgeID, kind string, active bool) (*model.ClusterStatus, error) {
232 if clusterEdgeID == "" {
233 return nil, ErrClusterMissing
234 }
235 if kind == "" {
236 return nil, ErrKindMissing
237 }
238 clusterStatus := &model.ClusterStatus{}
239 switch active {
240 case false:
241
242
243
244
245
246 clusterStatus.Status = status.NotAvailable
247 clusterStatus.Message = status.NotAvailableStatusMessage
248 return clusterStatus, nil
249 default:
250 isReady := ""
251 row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterStatus, kind, clusterEdgeID, false)
252 if err := row.Scan(&isReady); err != nil {
253 if !errors.Is(err, sql.ErrNoRows) {
254 return clusterStatus, err
255 }
256 clusterStatus.Status = status.NotAvailable
257 clusterStatus.Message = status.NotAvailableStatusMessage
258 return clusterStatus, nil
259 }
260
261 switch {
262 case isReady == status.IsReady:
263
264
265
266 clusterStatus.Status = status.Ready
267 clusterStatus.Message = fmt.Sprintf(status.KindReadyMessage, kind)
268 return clusterStatus, nil
269 case isReady == status.NotReady:
270 var (
271 kindErr = ""
272 errorMessage = make([]string, 0)
273 notReported = make(map[string]bool)
274 )
275 if err := s.getStatusErrorMessage(ctx, clusterEdgeID, kind, &kindErr, notReported, &errorMessage); err != nil {
276 return clusterStatus, err
277 }
278
279
280
281 if kindNotReported := status.IsNotReported(kind, notReported); kindNotReported {
282 clusterStatus.Status = status.Installing
283 clusterStatus.Message = fmt.Sprintf("%s %s status", status.NotReportedFormat, kind)
284 return clusterStatus, nil
285 }
286
287
288
289 clusterStatus.Status = status.Error
290 clusterStatus.Message = status.MergeErrorMessages(errorMessage)
291 return clusterStatus, nil
292 }
293 return clusterStatus, nil
294 }
295 }
296
297 func (s *storeClusterService) GetClusterComponentsStatus(ctx context.Context, cluster *model.CombinedStatus) ([]*model.ComponentStatus, error) {
298 var (
299 clusterStatus = make([]*model.ComponentStatus, 0)
300 errs error
301 )
302 if cluster.ClusterEdgeID == "" {
303 return nil, ErrClusterMissing
304 }
305 for component := range components {
306 status, err := s.GetComponentStatus(ctx, cluster.ClusterEdgeID, component, cluster.Active)
307 if err != nil {
308 errs = errors.Join(errs, err)
309 continue
310 }
311 clusterStatus = append(clusterStatus, &model.ComponentStatus{
312 Status: &model.ClusterStatus{
313 Status: status.Status,
314 Message: status.Message,
315 },
316 Component: component,
317 })
318 }
319 return clusterStatus, errs
320 }
321
322 func (s *storeClusterService) GetCombinedClusterStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.ClusterStatus, error) {
323 if cluster == nil {
324 return nil, ErrClusterMissing
325 }
326 clusterStatus := &model.ClusterStatus{}
327 switch cluster.Active {
328 case false:
329
330
331
332
333
334 clusterStatus.Status = status.Registered
335 clusterStatus.Message = status.RegisteredMessage
336 return clusterStatus, nil
337 default:
338 statuses, err := s.getStatus(ctx, cluster.ClusterEdgeID)
339 if err != nil {
340 return nil, err
341 }
342 shipmentReady := isReady(whv1.ShipmentGVK.Kind, statuses)
343 kustomizationReady := isReady(kustomizeApi.KustomizationKind, statuses)
344 bucketReady := isReady(sourceApi.BucketKind, statuses)
345
346
347
348 if shipmentReady && kustomizationReady && bucketReady {
349 clusterStatus.Status = status.Ready
350 clusterStatus.Message = status.ReadyMessage
351 return clusterStatus, nil
352 }
353
354
355
356 if !shipmentReady || !kustomizationReady || !bucketReady {
357 var (
358 shipmentErr = ""
359 kustomizationErr = ""
360 bucketErr = ""
361 errorMessage = make([]string, 0)
362 notReported = make(map[string]bool)
363 )
364 if !shipmentReady {
365 if err := s.getStatusErrorMessage(ctx, cluster.ClusterEdgeID, whv1.ShipmentGVK.Kind, &shipmentErr, notReported, &errorMessage); err != nil {
366 return clusterStatus, err
367 }
368 }
369 if !kustomizationReady {
370 if err := s.getStatusErrorMessage(ctx, cluster.ClusterEdgeID, kustomizeApi.KustomizationKind, &kustomizationErr, notReported, &errorMessage); err != nil {
371 return clusterStatus, err
372 }
373 }
374 if !bucketReady {
375 if err := s.getStatusErrorMessage(ctx, cluster.ClusterEdgeID, sourceApi.BucketKind, &bucketErr, notReported, &errorMessage); err != nil {
376 return clusterStatus, err
377 }
378 }
379
380
381
382
383
384
385
386
387
388 shipmentNotReported := status.IsNotReported(whv1.ShipmentGVK.Kind, notReported)
389 kustomizationNotReported := status.IsNotReported(kustomizeApi.KustomizationKind, notReported)
390 bucketNotReported := status.IsNotReported(sourceApi.BucketKind, notReported)
391 if shipmentNotReported || kustomizationNotReported || bucketNotReported {
392 if shipmentNotReported {
393 errorMessage = append(errorMessage, fmt.Sprintf("%s %s status", status.NotReportedFormat, whv1.ShipmentGVK.Kind))
394 }
395 if kustomizationNotReported {
396 errorMessage = append(errorMessage, fmt.Sprintf("%s %s status", status.NotReportedFormat, kustomizeApi.KustomizationKind))
397 }
398 if bucketNotReported {
399 errorMessage = append(errorMessage, fmt.Sprintf("%s %s status", status.NotReportedFormat, sourceApi.BucketKind))
400 }
401 clusterStatus.Status = status.Installing
402 if shipmentNotReported && kustomizationNotReported && bucketNotReported {
403
404
405
406 clusterStatus.Message = status.InstallingMessage
407 } else {
408 clusterStatus.Message = status.MergeErrorMessages(errorMessage)
409 }
410 return clusterStatus, nil
411 }
412 clusterStatus.Status = status.Error
413 clusterStatus.Message = status.MergeErrorMessages(errorMessage)
414 return clusterStatus, nil
415 }
416 return clusterStatus, nil
417 }
418 }
419
420 func (s *storeClusterService) GetSupportStatus(ctx context.Context, cluster *model.CombinedStatus) (*model.SupportStatus, error) {
421 supportStatus := &model.SupportStatus{
422 InfraSupportStatus: &model.InfraSupportStatus{},
423 EdgeOsSupportStatus: &model.EdgeOsSupportStatus{},
424 }
425
426 labelList, err := s.GetLabelsForCluster(ctx, cluster.ClusterEdgeID)
427 if err != nil {
428 return nil, sqlerr.Wrap(err)
429 }
430 cloudInfraVersion := version.New()
431 _, cloudInfraMinorVersion, _, err := cloudInfraVersion.SemVerMajorMinorPatch()
432 if err != nil {
433 return nil, err
434 }
435 edgeOSArtifact := constants.EdgeOSArtifact
436 cloudInfraCompatibility, err := s.CompatibilityService.GetArtifactVersionCompatibility(ctx, model.ArtifactVersion{Name: fleet.Store, Version: cloudInfraVersion.SemVer}, &edgeOSArtifact)
437 if err != nil {
438 return nil, err
439 }
440 activeVer, err := s.GetActiveEdgeVersion(ctx, cluster.ClusterEdgeID)
441 if err != nil {
442 return nil, err
443 }
444 if activeVer == "" {
445 supportStatus.InfraSupportStatus.Status = status.ErrGettingVersion
446 supportStatus.InfraSupportStatus.Message = status.NoActiveVersion
447 return supportStatus, nil
448 }
449
450 re := regexp.MustCompile(`v?(\d+\.\d+\.\d+)`)
451 activeVerMatch := re.FindStringSubmatch(activeVer)
452
453 fleetVersionParts := strings.Split(activeVerMatch[1], ".")
454 fleetMinorVersion, err := strconv.Atoi(fleetVersionParts[1])
455 if err != nil {
456 return nil, err
457 }
458
459 if cloudInfraMinorVersion-fleetMinorVersion > cloudInfraCompatibility.NthIndex {
460 supportStatus.InfraSupportStatus.Status = status.ClusterOutOfSupport
461 supportStatus.InfraSupportStatus.Message = fmt.Sprintf("Edge Infra version %s is out of support with current version %s", activeVerMatch[1], cloudInfraVersion.SemVer)
462 } else if cloudInfraMinorVersion-fleetMinorVersion == cloudInfraCompatibility.NthIndex {
463 supportStatus.InfraSupportStatus.Status = status.ClusterNearingEndOfSupport
464 supportStatus.InfraSupportStatus.Message = fmt.Sprintf("Edge Infra version %s will be out of support in the next release", activeVerMatch[1])
465 } else {
466 supportStatus.InfraSupportStatus.Status = status.Supported
467 supportStatus.InfraSupportStatus.Message = fmt.Sprintf("Edge Infra version %s supported", activeVerMatch[1])
468 }
469 if labelList == nil || !slices.ContainsFunc(labelList, func(label *model.Label) bool { return label.Key == ctypes.DSDS }) {
470 return supportStatus, nil
471 }
472
473 terminals, err := s.TerminalService.GetTerminals(ctx, &cluster.ClusterEdgeID, nil)
474 if err != nil {
475 return nil, err
476 }
477
478 earliestTerminalVersion := "a"
479 for _, terminal := range terminals {
480 if terminal.Version == "" {
481 continue
482 }
483 termVersion := re.FindStringSubmatch(terminal.Version)
484 if len(termVersion) <= 1 {
485 return supportStatus, nil
486 }
487 if termVersion[1] < earliestTerminalVersion {
488 earliestTerminalVersion = termVersion[1]
489 }
490 }
491 if earliestTerminalVersion == "a" {
492 supportStatus.EdgeOsSupportStatus.Status = status.ErrGettingVersion
493 supportStatus.EdgeOsSupportStatus.Message = status.NoTerminalVersion
494 return supportStatus, nil
495 }
496
497 termVersionParts := strings.Split(earliestTerminalVersion, ".")
498 edgeOsSupported := slices.ContainsFunc(cloudInfraCompatibility.CompatibleArtifacts, func(artifact *model.ArtifactVersion) bool {
499 return artifact.Version == termVersionParts[0]+"."+termVersionParts[1]
500 })
501 earliestCompatibleOS := cloudInfraCompatibility.CompatibleArtifacts[len(cloudInfraCompatibility.CompatibleArtifacts)-1].Version
502 if !edgeOsSupported {
503 supportStatus.EdgeOsSupportStatus.Status = status.ClusterOutOfSupport
504 supportStatus.EdgeOsSupportStatus.Message = fmt.Sprintf("One or more terminals on EdgeOS version %s is out of support", earliestTerminalVersion)
505 } else if termVersionParts[0]+"."+termVersionParts[1] == earliestCompatibleOS {
506 supportStatus.EdgeOsSupportStatus.Status = status.ClusterNearingEndOfSupport
507 supportStatus.EdgeOsSupportStatus.Message = fmt.Sprintf("One or more terminals on EdgeOS version %s will be out of support in next release", earliestTerminalVersion)
508 } else {
509 supportStatus.EdgeOsSupportStatus.Status = status.Supported
510 supportStatus.EdgeOsSupportStatus.Message = fmt.Sprintf("EdgeOS version %s supported", earliestTerminalVersion)
511 }
512 return supportStatus, nil
513 }
514
515 func (s *storeClusterService) getStatus(ctx context.Context, clusterEdgeID string) (map[string]map[string]string, error) {
516 stats := make(map[string]map[string]string)
517 rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetClusterStatuses, clusterEdgeID, false)
518 if err != nil {
519 return stats, err
520 }
521 for rows.Next() {
522 var (
523 kind = ""
524 jsonpath = ""
525 value = ""
526 )
527 if err := rows.Scan(&kind, &jsonpath, &value); err != nil {
528 return stats, err
529 }
530 _, exists := stats[kind]
531 if !exists {
532 stats[kind] = make(map[string]string)
533 }
534 stats[kind][jsonpath] = value
535 }
536 return stats, nil
537 }
538
539 func (s *storeClusterService) getStatusErrorMessage(ctx context.Context, clusterEdgeID, kind string, errMessage *string, notReported map[string]bool, errorMessage *[]string) error {
540 row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterErrorStatusMessage, kind, clusterEdgeID, false)
541 if err := row.Scan(errMessage); err != nil {
542 if !errors.Is(err, sql.ErrNoRows) {
543 return err
544 }
545 notReported[kind] = true
546 } else {
547 *errorMessage = append(*errorMessage, *errMessage)
548 }
549 return nil
550 }
551
552 func isReady(kind string, stat map[string]map[string]string) bool {
553 if statuses, exists := stat[kind]; exists {
554 for jsonpath, kindStatus := range statuses {
555 if jsonpath == `$.status.conditions[?(@.type == "Ready")].status` && kindStatus == status.IsReady {
556 return true
557 }
558 }
559 }
560 return false
561 }
562
563 func (s *storeClusterService) GetReplicationStatus(ctx context.Context, clusterEdgeID string) ([]*model.ReplicationStatus, error) {
564 rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetReplicationStatus, clusterEdgeID)
565 if err != nil {
566 return nil, err
567 }
568 replicationSets := false
569 var replications []string
570 for rows.Next() {
571 replicationSets = true
572 var replicationStatus model.ReplicationStatus
573 if err := rows.Scan(&replicationStatus.Name, &replicationStatus.Status); err != nil {
574 return nil, sqlerr.Wrap(err)
575 }
576 replicationStatus.Status = strings.Trim(replicationStatus.Status, "\"")
577 replications = append(replications, replicationStatus.Name)
578
579 if replicationStatus.Status != "True" {
580 return []*model.ReplicationStatus{
581 {Name: replicationStatus.Name, Status: replicationStatus.Status},
582 }, nil
583 }
584 }
585
586 if replicationSets {
587 return []*model.ReplicationStatus{{Name: strings.Join(replications, ", "), Status: "True"}}, nil
588 }
589
590
591 return []*model.ReplicationStatus{}, nil
592 }
593
594 func (s *storeClusterService) GetClusterStatusFromCtlfish(ctx context.Context, cluster *model.Cluster) (*monitor.ClusterStatus, error) {
595 loqReq := mapper.GetClusterStatus()
596 res, err := s.BQClient.GetKubeResource(ctx, cluster.ProjectID, cluster, loqReq)
597 if err != nil {
598 return nil, err
599 }
600 if len(res) == 0 {
601 return nil, nil
602 }
603 clusterStatus := &monitor.ClusterStatus{}
604 err = json.Unmarshal([]byte(res[0]), clusterStatus)
605 if err != nil {
606 return nil, err
607 }
608 return clusterStatus, nil
609 }
610
611 func (s *storeClusterService) GetClusterStatus(ctx context.Context, cluster *model.Cluster) (*model.ClusterStatusResponse, error) {
612 storeStatus, err := s.GetClusterStatusFromCtlfish(ctx, cluster)
613 if err != nil {
614 return nil, err
615 }
616 if storeStatus == nil {
617 return &model.ClusterStatusResponse{Status: &model.ClusterStatus{Status: "Provisioning", Message: "Edge Provisioning"}}, nil
618 }
619 status := &model.ClusterStatus{}
620 status.Status = storeStatus.Status
621 status.Message = "Edge Syncing"
622 if storeStatus.Error != nil {
623 status.Message = storeStatus.Error.Message
624 }
625
626 return &model.ClusterStatusResponse{
627 Status: status,
628 KubeVersion: &storeStatus.NodeVersion,
629 BucketStatus: toBucketStatusInformation(storeStatus.Buckets),
630 KustomizationStatus: toKustomizationStatusInformation(storeStatus.Kustomizations),
631 }, nil
632 }
633
634 func toBucketStatusInformation(bucketsInfo map[string]monitor.BucketInfo) []*model.BucketStatusInformation {
635 var buckets []*model.BucketStatusInformation
636 for name, bucket := range bucketsInfo {
637 fstatus := bucketsInfo[name].FluxStatus
638 bucketStatus := &model.BucketStatusInformation{
639 Excludes: bucket.Excludes,
640 BucketName: bucket.BucketName,
641 FluxStatus: getFluxStatusInfo(name, &fstatus),
642 }
643 buckets = append(buckets, bucketStatus)
644 }
645 return buckets
646 }
647
648 func toKustomizationStatusInformation(KustomizationInfo map[string]monitor.KustomizationInfo) []*model.KustomizationStatusInformation {
649 var kustomizations []*model.KustomizationStatusInformation
650 for name, kustomization := range KustomizationInfo {
651 fstatus := KustomizationInfo[name].FluxStatus
652 kustomizationStatus := &model.KustomizationStatusInformation{
653 Path: kustomization.Path,
654 Source: kustomization.Source,
655 FluxStatus: getFluxStatusInfo(name, &fstatus),
656 }
657 kustomizations = append(kustomizations, kustomizationStatus)
658 }
659 return kustomizations
660 }
661
662 func getFluxStatusInfo(name string, fluxStatus *monitor.SyncInfo) *model.FluxStatusInformation {
663 return &model.FluxStatusInformation{
664 Name: name,
665 Error: fluxStatus.Error,
666 LastUpdated: fluxStatus.LastUpdated,
667 Revision: fluxStatus.Revision,
668 StatusMessage: fluxStatus.StatusMessage,
669 Suspended: fluxStatus.Suspended,
670 }
671 }
672
673 func (s *storeClusterService) DeleteStoreEntry(ctx context.Context, clusterEdgeID string) error {
674 _, err := s.SQLDB.ExecContext(ctx, sqlquery.ClusterDeleteQuery, clusterEdgeID)
675 if err != nil {
676 return sqlerr.Wrap(err)
677 }
678 return nil
679 }
680
681 func (s *storeClusterService) SoftDeleteStoreEntry(ctx context.Context, clusterEdgeID string) error {
682 _, err := s.SQLDB.ExecContext(ctx, sqlquery.SoftClusterDeleteQuery, clusterEdgeID)
683 if err != nil {
684 return sqlerr.Wrap(err)
685 }
686 return nil
687 }
688
689 func (s *storeClusterService) UpdateStoreName(ctx context.Context, clusterEdgeID string, name string) error {
690 _, err := s.SQLDB.ExecContext(ctx, sqlquery.UpdateClusterNameQuery, name, clusterEdgeID)
691 return err
692 }
693
694 func (s *storeClusterService) GetClusterConfigmap(ctx context.Context, cluster *model.Cluster, configmapName, namespace string) (*corev1.ConfigMap, error) {
695 configmap, err := s.BQClient.GetKubeResource(ctx, cluster.ProjectID, cluster, model.LoqRequest{
696 Kind: "ConfigMap",
697 Group: "",
698 Version: "v1",
699 Name: &configmapName,
700 Namespace: &namespace,
701 })
702 if err != nil {
703 return nil, err
704 }
705 return mapper.ConvertToConfigMap(configmap)
706 }
707
708 func (s *storeClusterService) UpdateStoreSiteID(ctx context.Context, clusterEdgeID string, siteID string) error {
709 _, err := s.SQLDB.ExecContext(ctx, sqlquery.UpdateStoreSiteIDQuery, siteID, clusterEdgeID)
710 return err
711 }
712
713 func (s *storeClusterService) GetActiveEdgeVersion(ctx context.Context, clusterEdgeID string) (string, error) {
714 var activeVersion string
715
716
717 row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetActiveEdgeVersionFromWatchedField, clusterEdgeID)
718 if err := row.Scan(&activeVersion); err != nil {
719 return "", nil
720 }
721
722
723
724 var activeVersionUnmarshal activeVersionResp
725
726 errUnmarshal := json.Unmarshal([]byte(activeVersion), &activeVersionUnmarshal.version)
727 if errUnmarshal != nil {
728 return "", nil
729 }
730
731 return activeVersionUnmarshal.version, nil
732 }
733
734 func NewStoreClusterService(gkeService GkeClient, bqClient clients.BQClient, sqlDB *sql.DB, chariotService ChariotService, terminalService TerminalService, compatibilityService CompatibilityService) *storeClusterService {
735 return &storeClusterService{
736 GkeService: gkeService,
737 BQClient: bqClient,
738 SQLDB: sqlDB,
739 ChariotService: chariotService,
740 TerminalService: terminalService,
741 CompatibilityService: compatibilityService,
742 }
743 }
744
View as plain text