1 package artifacts
2
3 import (
4 "context"
5 "database/sql"
6 "fmt"
7 "slices"
8 "strings"
9
10 "github.com/google/go-containerregistry/pkg/name"
11
12 sqlerr "edge-infra.dev/pkg/edge/api/apierror/sql"
13 "edge-infra.dev/pkg/edge/api/services/interfaces"
14 sqlquery "edge-infra.dev/pkg/edge/api/sql"
15 "edge-infra.dev/pkg/edge/api/types"
16 "edge-infra.dev/pkg/edge/api/utils"
17 )
18
19
20 type Service interface {
21 GetAvailableArtifactVersions(ctx context.Context, name string) ([]types.ArtifactVersion, error)
22 GetLatestAvailableArtifactVersion(ctx context.Context, name string) (*types.ArtifactVersion, error)
23 GetClusterArtifactVersions(ctx context.Context, clusterEdgeID string) ([]types.ArtifactVersion, error)
24 UpdateClusterFleetVersionAndArtifact(ctx context.Context, clusterEdgeID, version string) error
25 UpdateClustersToLatestArtifactVersion(ctx context.Context, name string) (clustersUpdated int, err error)
26 AddClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error
27 DeleteClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error
28 }
29
30 type artifactsService struct {
31 SQLDB *sql.DB
32 ClusterLabelService interfaces.ClusterLabelService
33 }
34
35 func NewArtifactsService(sqlDB *sql.DB, clusterLabelSvc interfaces.ClusterLabelService) Service {
36 return &artifactsService{
37 SQLDB: sqlDB,
38 ClusterLabelService: clusterLabelSvc,
39 }
40 }
41
42
43 func (s *artifactsService) GetAvailableArtifactVersions(ctx context.Context, name string) ([]types.ArtifactVersion, error) {
44 artifacts := []types.ArtifactVersion{}
45
46 rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetAvailableArtifactVersions, name)
47 if err != nil {
48 return []types.ArtifactVersion{}, err
49 }
50 for rows.Next() {
51 var a types.ArtifactVersion
52 if err := rows.Scan(&a.Name, &a.Version); err != nil {
53 return []types.ArtifactVersion{}, err
54 }
55 artifacts = append(artifacts, a)
56 }
57 if err := rows.Err(); err != nil {
58 return nil, sqlerr.Wrap(err)
59 }
60
61 return artifacts, nil
62 }
63
64
65 func (s *artifactsService) GetLatestAvailableArtifactVersion(ctx context.Context, name string) (*types.ArtifactVersion, error) {
66 artifact := &types.ArtifactVersion{
67 Name: name,
68 }
69 row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetLatestAvailableArtifactVersion, name)
70 if err := row.Scan(&artifact.Version); err != nil {
71 return nil, sqlerr.Wrap(err)
72 }
73
74 return artifact, nil
75 }
76
77
78
79 func (s *artifactsService) GetClusterArtifactVersions(ctx context.Context, clusterEdgeID string) ([]types.ArtifactVersion, error) {
80 artifacts := []types.ArtifactVersion{}
81 rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetClusterArtifactVersions, clusterEdgeID)
82 if err != nil {
83 return []types.ArtifactVersion{}, err
84 }
85 for rows.Next() {
86 var a types.ArtifactVersion
87 if err := rows.Scan(&a.Name, &a.Version); err != nil {
88 return []types.ArtifactVersion{}, err
89 }
90 artifacts = append(artifacts, a)
91 }
92 if err := rows.Err(); err != nil {
93 return nil, sqlerr.Wrap(err)
94 }
95 return artifacts, nil
96 }
97
98
99
100 func (s *artifactsService) UpdateClusterFleetVersionAndArtifact(ctx context.Context, clusterEdgeID, version string) error {
101
102 fleetType, err := s.ClusterLabelService.FetchFleetType(ctx, clusterEdgeID)
103 if err != nil {
104 return err
105 }
106
107
108 if version == types.DefaultVersionTag {
109 latestVersion, err := s.GetLatestAvailableArtifactVersion(ctx, fleetType.String())
110 if err != nil {
111 return fmt.Errorf("failed to resolve latest version of artifact %s. err: %v", fleetType, err)
112 }
113 version = latestVersion.Version
114 }
115
116
117 if err := validateWeakImageRef(fleetType.String(), version); err != nil {
118 return err
119 }
120
121
122 tx, err := s.SQLDB.BeginTx(ctx, nil)
123 if err != nil {
124 return err
125 }
126 if _, err = tx.ExecContext(ctx, sqlquery.UpdateClusterFleetVersion, clusterEdgeID, version); err != nil {
127 errors := utils.NewErrorWrapper().AddError(err, "error executing UpdateClusterFleetVersion")
128 if rollbackErr := tx.Rollback(); rollbackErr != nil {
129 errors.AddError(err, "failed to roll back transaction")
130 }
131 return errors.Errors
132 }
133 if _, err = tx.ExecContext(ctx, sqlquery.UpdateClusterArtifactVersions, clusterEdgeID, version); err != nil {
134 errors := utils.NewErrorWrapper().AddError(err, "error executing UpdateClusterArtifactVersions")
135 if rollbackErr := tx.Rollback(); rollbackErr != nil {
136 errors.AddError(err, "failed to roll back transaction")
137 }
138 return errors.Errors
139 }
140 if err := tx.Commit(); err != nil {
141 return fmt.Errorf("failed to commit transaction. err: %v", err)
142 }
143
144 return nil
145 }
146
147
148
149
150 func (s *artifactsService) UpdateClustersToLatestArtifactVersion(ctx context.Context, name string) (int, error) {
151 res, err := s.SQLDB.ExecContext(ctx, sqlquery.UpdateClustersToLatestFleetArtifactVersion, name)
152 if err != nil {
153 return -1, err
154 }
155 n, err := res.RowsAffected()
156 if err != nil {
157 return -1, err
158 }
159 return int(n), nil
160 }
161
162
163 func (s *artifactsService) AddClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error {
164 var storeVersion string
165
166 artifactVersions, err := s.GetClusterArtifactVersions(ctx, clusterEdgeID)
167 if err != nil {
168 return err
169 }
170
171
172 fleetType, err := s.ClusterLabelService.FetchFleetType(ctx, clusterEdgeID)
173 if err != nil {
174 return err
175 }
176
177 for _, artifact := range artifactVersions {
178 if artifact.Name == fleetType.String() {
179 storeVersion = artifact.Version
180 }
181 }
182
183 if _, err := transaction.ExecContext(ctx, sqlquery.CreateClusterArtifactVersion, clusterEdgeID, name, storeVersion); err != nil {
184 return err
185 }
186 return nil
187 }
188
189
190 func (s *artifactsService) DeleteClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error {
191
192 clusterArtifactVersions, err := s.GetClusterArtifactVersions(ctx, clusterEdgeID)
193 if err != nil {
194 return err
195 }
196 exists := slices.ContainsFunc(clusterArtifactVersions, func(artifact types.ArtifactVersion) bool {
197 return artifact.Name == name
198 })
199 if !exists {
200 return nil
201 }
202
203 if _, err := transaction.ExecContext(ctx, sqlquery.DeleteClusterArtifactVersion, clusterEdgeID, name); err != nil {
204 return err
205 }
206 return nil
207 }
208
209 func validateWeakImageRef(fleet, version string) error {
210 if fleet == "" || version == "" {
211 return fmt.Errorf("name and version are required to build image reference. name: %v, version: %v", fleet, version)
212 }
213 if strings.HasPrefix(version, ".") || strings.HasPrefix(version, "-") {
214 return fmt.Errorf("version cannot start with a period or hyphen. version: %v", version)
215 }
216 d, dErr := name.NewDigest(fmt.Sprintf("%s@%s", fleet, version), name.WeakValidation)
217 if dErr == nil {
218 if _, refErr := name.ParseReference(d.String(), name.WeakValidation); refErr != nil {
219 return fmt.Errorf("could not parse digest as image reference. err: %v", refErr)
220 }
221 }
222
223 t, tErr := name.NewTag(fmt.Sprintf("%s:%s", fleet, version), name.WeakValidation)
224 if tErr == nil {
225 if _, refErr := name.ParseReference(t.String(), name.WeakValidation); refErr != nil {
226 return fmt.Errorf("could not parse tag as image reference. err: %v", refErr)
227 }
228 }
229
230 if dErr != nil && tErr != nil {
231 return fmt.Errorf("invalid artifact name or version. could not parse either tag or digest. err(s): [%v, %v]", dErr, tErr)
232 }
233
234 return nil
235 }
236
View as plain text