1 package services
2
3 import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "strconv"
10
11 "github.com/google/uuid"
12
13 "edge-infra.dev/pkg/edge/api/graph/model"
14 edgenode "edge-infra.dev/pkg/edge/api/services/edgenode/common"
15 "edge-infra.dev/pkg/edge/api/services/interfaces"
16 sqlquery "edge-infra.dev/pkg/edge/api/sql"
17 "edge-infra.dev/pkg/edge/api/utils"
18 "edge-infra.dev/pkg/edge/apis/cluster/v1alpha1"
19 edgeCapabilities "edge-infra.dev/pkg/edge/capabilities"
20 edgeconstants "edge-infra.dev/pkg/edge/constants"
21 clustertype "edge-infra.dev/pkg/edge/constants/api/cluster"
22 "edge-infra.dev/pkg/edge/constants/api/fleet"
23 )
24
25
26 type RegistrationService interface {
27 IsClusterSQLEntryExist(ctx context.Context, clusterName, projectID string) (bool, error)
28 CreateClusterSQLEntry(ctx context.Context, payload *model.RegistrationPayload, projectID, clusterGUID, siteID, bannerEdgeID string, active bool, location string) error
29 UpdateClusterSQLEntry(ctx context.Context, active bool, clusterEdgeID string) error
30 CreateAClusterCR(ctx context.Context, clusterType, name, projectID, fleet, organization, clusterEdgeID string, banner *model.Banner, gkeClusterSpec *model.ClusterInfo) (string, error)
31 UploadClusterCaHash(ctx context.Context, clusterEdgeID, clusterCaHash string) error
32 ClusterCaHash(ctx context.Context, clusterEdgeID string) (string, error)
33
34
35 Reset(ctx context.Context, clusterEdgeID string, force bool) error
36 }
37
38 type registrationService struct {
39 GkeService GkeClient
40 TopLevelProjectID string
41 SecretService SecretService
42 GCPService GCPService
43 BSLSiteService BSLSiteService
44 IAMService IAMService
45 ChariotService ChariotService
46 SQLDB *sql.DB
47 TerminalService TerminalService
48 ActivationCodeService edgenode.ActivationCode
49 ClusterLabelService interfaces.ClusterLabelService
50 LabelService LabelService
51 }
52
53 var (
54 ErrMustForceClusterReset = errors.New("cluster cannot be reset without force being set to true")
55 ErrCanOnlyResetStore = errors.New("only store clusters can be reset")
56 ErrNoOptionalPalletLabels = errors.New("no optional pallet labels found in banner")
57 )
58
59
60
61 func (s *registrationService) CreateClusterSQLEntry(ctx context.Context, payload *model.RegistrationPayload, projectID, clusterGUID, siteID, bannerEdgeID string, active bool, location string) error {
62 clusterName := payload.Name
63 clusterType := payload.ClusterType
64 fleetType := payload.Fleet
65 fleetVersion := *payload.FleetVersion
66 autoUpdate := *payload.AutoUpdateEnabled
67
68 transaction, err := s.SQLDB.BeginTx(ctx, nil)
69 if err != nil {
70 return err
71 }
72 defer func() { _ = transaction.Rollback() }()
73
74
75 if _, err := transaction.ExecContext(ctx, sqlquery.ClusterInsertQuery, clusterName, projectID, true, active, clusterGUID, sqlquery.NewNullString(bannerEdgeID), siteID, location, fleetVersion); err != nil {
76 return err
77 }
78
79
80 var labelID string
81 if row := transaction.QueryRowContext(ctx, sqlquery.GetLabelTypeByKey, clusterType, clustertype.LabelType); row.Scan(&labelID) != nil {
82 clusterTypeError := clustertype.Type(clusterType).IsValid()
83 return fmt.Errorf("%s: %w", err, clusterTypeError)
84 }
85 if _, err := transaction.ExecContext(ctx, sqlquery.ClusterLabelInsertQuery, clusterGUID, labelID); err != nil {
86 return err
87 }
88 if row := transaction.QueryRowContext(ctx, sqlquery.GetLabelTypeByKey, fleetType, fleet.LabelType); row.Scan(&labelID) != nil {
89 fleetTypeError := fleet.IsValid(fleetType)
90 return fmt.Errorf("%s: %w", err, fleetTypeError)
91 }
92 if _, err := transaction.ExecContext(ctx, sqlquery.ClusterLabelInsertQuery, clusterGUID, labelID); err != nil {
93 return err
94 }
95
96
97 if fleet.IsStoreCluster(fleet.Type(fleetType)) {
98
99 if _, err := transaction.ExecContext(ctx, sqlquery.CreateClusterArtifactVersion, clusterGUID, fleetType, fleetVersion); err != nil {
100 return err
101 }
102
103 if err := s.registerClusterDefaultOptionalPallets(ctx, transaction, clusterGUID, bannerEdgeID, fleetVersion); err != nil {
104 return err
105 }
106
107 if _, err := transaction.ExecContext(ctx, sqlquery.UpdateClusterConfig, clusterGUID, AutoUpdateEnabledKey, strconv.FormatBool(autoUpdate), uuid.NewString()); err != nil {
108 return err
109 }
110 }
111
112 return transaction.Commit()
113 }
114
115 func (s *registrationService) registerClusterDefaultOptionalPallets(ctx context.Context, transaction *sql.Tx, clusterEdgeID, bannerEdgeID, fleetVersion string) error {
116
117 existingLabels, err := s.LabelService.GetLabels(ctx, &bannerEdgeID)
118 if err != nil {
119 return err
120 }
121
122
123 edgeCapabilityLabels := edgeCapabilities.GetCapabilityLabels(existingLabels, edgeCapabilities.DefaultStoreEdgeCapabilities...)
124
125
126 supportedEdgeCapabilityLabels, err := edgeCapabilities.GetCapabilityLabelsForSupportedVersion(edgeCapabilityLabels, fleetVersion, nil)
127 if err != nil {
128 return err
129 }
130
131 for _, label := range supportedEdgeCapabilityLabels {
132 if _, err := transaction.ExecContext(ctx, sqlquery.ClusterLabelInsertQuery, clusterEdgeID, label.LabelEdgeID); err != nil {
133 return err
134 }
135 if _, err := transaction.ExecContext(ctx, sqlquery.CreateClusterArtifactVersion, clusterEdgeID, label.Key, fleetVersion); err != nil {
136 return err
137 }
138 }
139 return nil
140 }
141
142 func (s *registrationService) IsClusterSQLEntryExist(ctx context.Context, clusterName, projectID string) (bool, error) {
143 var exist bool
144 row := s.SQLDB.QueryRowContext(ctx, sqlquery.IsClusterExistQuery, clusterName, projectID)
145 if err := row.Scan(&exist); err != nil {
146 return false, err
147 }
148 return exist, nil
149 }
150
151 func (s *registrationService) UpdateClusterSQLEntry(ctx context.Context, active bool, clusterEdgeID string) error {
152 _, err := s.SQLDB.ExecContext(ctx, sqlquery.ClusterUpdateQuery, active, clusterEdgeID)
153 if err != nil {
154 return err
155 }
156 return nil
157 }
158
159 func (s *registrationService) CreateAClusterCR(_ context.Context, clusterType, name, projectID, fleet, organization, clusterEdgeID string, banner *model.Banner, gkeClusterSpec *model.ClusterInfo) (string, error) {
160 var storeCluster *v1alpha1.Cluster
161 if gkeClusterSpec != nil {
162 storeCluster = v1alpha1.NewCluster(name, projectID, organization, fleet, clusterType, gkeClusterSpec.Location, utils.ConvertToString(gkeClusterSpec.NodeVersion), gkeClusterSpec.MachineType, clusterEdgeID, gkeClusterSpec.NumNodes, banner)
163 storeCluster.AddAutoscaling(gkeClusterSpec.Autoscale, utils.ConvertToInt(gkeClusterSpec.MinNodes), utils.ConvertToInt(gkeClusterSpec.MaxNodes))
164 } else {
165 storeCluster = v1alpha1.NewCluster(name, projectID, organization, fleet, clusterType, edgeconstants.DefaultClusterLocation, "", "", clusterEdgeID, 0, banner)
166 }
167 storeClusterByte, err := json.Marshal(storeCluster)
168 if err != nil {
169 return "", err
170 }
171 storeClusterBase64 := utils.ToBase64(storeClusterByte)
172 return storeClusterBase64, nil
173 }
174
175 func (s *registrationService) UploadClusterCaHash(ctx context.Context, clusterEdgeID, clusterCaHash string) error {
176 _, err := s.SQLDB.ExecContext(ctx, sqlquery.UploadClusterCaHash, clusterCaHash, clusterEdgeID)
177 return err
178 }
179
180 func (s *registrationService) ClusterCaHash(ctx context.Context, clusterEdgeID string) (string, error) {
181 var clusterCaHash *string
182 row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterCaHash, clusterEdgeID)
183 if err := row.Scan(&clusterCaHash); err != nil {
184 return "", err
185 }
186 if clusterCaHash == nil {
187 return "", nil
188 }
189 return *clusterCaHash, nil
190 }
191
192 func (s *registrationService) Reset(ctx context.Context, clusterEdgeID string, force bool) error {
193 if !force {
194 return ErrMustForceClusterReset
195 }
196 clusterFleet, err := s.ClusterLabelService.FetchFleetType(ctx, clusterEdgeID)
197 if err != nil {
198 return err
199 }
200 if !fleet.IsStoreCluster(*clusterFleet) {
201 return ErrCanOnlyResetStore
202 }
203
204 if err := s.UpdateClusterSQLEntry(ctx, false, clusterEdgeID); err != nil {
205 return err
206 }
207 terminalList, err := s.TerminalService.GetTerminalsByClusterID(ctx, clusterEdgeID)
208 if err != nil {
209 return err
210 }
211
212 for _, terminal := range terminalList {
213 if _, err := s.ActivationCodeService.Refresh(ctx, terminal.TerminalID); err != nil {
214 return err
215 }
216 }
217 return nil
218 }
219
220 func NewRegistrationService(gkeService GkeClient, topLevelProjectId string, secretService SecretService, gcpService GCPService, bslSiteService BSLSiteService, iamService IAMService, sqlDB *sql.DB, chariotService ChariotService, terminalService TerminalService, activationCodeService edgenode.ActivationCode, clusterLabelSvc interfaces.ClusterLabelService, labelService LabelService) *registrationService {
221 return ®istrationService{
222 GkeService: gkeService,
223 TopLevelProjectID: topLevelProjectId,
224 SecretService: secretService,
225 GCPService: gcpService,
226 BSLSiteService: bslSiteService,
227 IAMService: iamService,
228 ChariotService: chariotService,
229 SQLDB: sqlDB,
230 TerminalService: terminalService,
231 ActivationCodeService: activationCodeService,
232 ClusterLabelService: clusterLabelSvc,
233 LabelService: labelService,
234 }
235 }
236
View as plain text