1
16
17 package driver
18
19 import (
20 "fmt"
21 "sort"
22 "strconv"
23 "time"
24
25 "github.com/jmoiron/sqlx"
26 migrate "github.com/rubenv/sql-migrate"
27
28 sq "github.com/Masterminds/squirrel"
29
30
31 _ "github.com/lib/pq"
32
33 rspb "helm.sh/helm/v3/pkg/release"
34 )
35
36 var _ Driver = (*SQL)(nil)
37
38 var labelMap = map[string]struct{}{
39 "modifiedAt": {},
40 "createdAt": {},
41 "version": {},
42 "status": {},
43 "owner": {},
44 "name": {},
45 }
46
47 const postgreSQLDialect = "postgres"
48
49
50 const SQLDriverName = "SQL"
51
52 const sqlReleaseTableName = "releases_v1"
53 const sqlCustomLabelsTableName = "custom_labels_v1"
54
55 const (
56 sqlReleaseTableKeyColumn = "key"
57 sqlReleaseTableTypeColumn = "type"
58 sqlReleaseTableBodyColumn = "body"
59 sqlReleaseTableNameColumn = "name"
60 sqlReleaseTableNamespaceColumn = "namespace"
61 sqlReleaseTableVersionColumn = "version"
62 sqlReleaseTableStatusColumn = "status"
63 sqlReleaseTableOwnerColumn = "owner"
64 sqlReleaseTableCreatedAtColumn = "createdAt"
65 sqlReleaseTableModifiedAtColumn = "modifiedAt"
66
67 sqlCustomLabelsTableReleaseKeyColumn = "releaseKey"
68 sqlCustomLabelsTableReleaseNamespaceColumn = "releaseNamespace"
69 sqlCustomLabelsTableKeyColumn = "key"
70 sqlCustomLabelsTableValueColumn = "value"
71 )
72
73
74 const (
75 sqlCustomLabelsTableKeyMaxLenght = 253 + 1 + 63
76 sqlCustomLabelsTableValueMaxLenght = 63
77 )
78
79 const (
80 sqlReleaseDefaultOwner = "helm"
81 sqlReleaseDefaultType = "helm.sh/release.v1"
82 )
83
84
85 type SQL struct {
86 db *sqlx.DB
87 namespace string
88 statementBuilder sq.StatementBuilderType
89
90 Log func(string, ...interface{})
91 }
92
93
94 func (s *SQL) Name() string {
95 return SQLDriverName
96 }
97
98
99 func (s *SQL) checkAlreadyApplied(migrations []*migrate.Migration) bool {
100
101 migrationsIDs := make(map[string]struct{})
102 for _, migration := range migrations {
103 migrationsIDs[migration.Id] = struct{}{}
104 }
105
106
107 migrate.SetDisableCreateTable(true)
108 records, err := migrate.GetMigrationRecords(s.db.DB, postgreSQLDialect)
109 migrate.SetDisableCreateTable(false)
110 if err != nil {
111 s.Log("checkAlreadyApplied: failed to get migration records: %v", err)
112 return false
113 }
114
115 for _, record := range records {
116 if _, ok := migrationsIDs[record.Id]; ok {
117 s.Log("checkAlreadyApplied: found previous migration (Id: %v) applied at %v", record.Id, record.AppliedAt)
118 delete(migrationsIDs, record.Id)
119 }
120 }
121
122
123 if len(migrationsIDs) != 0 {
124 for id := range migrationsIDs {
125 s.Log("checkAlreadyApplied: find unapplied migration (id: %v)", id)
126 }
127 return false
128 }
129 return true
130 }
131
132 func (s *SQL) ensureDBSetup() error {
133
134 migrations := &migrate.MemoryMigrationSource{
135 Migrations: []*migrate.Migration{
136 {
137 Id: "init",
138 Up: []string{
139 fmt.Sprintf(`
140 CREATE TABLE %s (
141 %s VARCHAR(90),
142 %s VARCHAR(64) NOT NULL,
143 %s TEXT NOT NULL,
144 %s VARCHAR(64) NOT NULL,
145 %s VARCHAR(64) NOT NULL,
146 %s INTEGER NOT NULL,
147 %s TEXT NOT NULL,
148 %s TEXT NOT NULL,
149 %s INTEGER NOT NULL,
150 %s INTEGER NOT NULL DEFAULT 0,
151 PRIMARY KEY(%s, %s)
152 );
153 CREATE INDEX ON %s (%s, %s);
154 CREATE INDEX ON %s (%s);
155 CREATE INDEX ON %s (%s);
156 CREATE INDEX ON %s (%s);
157 CREATE INDEX ON %s (%s);
158 CREATE INDEX ON %s (%s);
159
160 GRANT ALL ON %s TO PUBLIC;
161
162 ALTER TABLE %s ENABLE ROW LEVEL SECURITY;
163 `,
164 sqlReleaseTableName,
165 sqlReleaseTableKeyColumn,
166 sqlReleaseTableTypeColumn,
167 sqlReleaseTableBodyColumn,
168 sqlReleaseTableNameColumn,
169 sqlReleaseTableNamespaceColumn,
170 sqlReleaseTableVersionColumn,
171 sqlReleaseTableStatusColumn,
172 sqlReleaseTableOwnerColumn,
173 sqlReleaseTableCreatedAtColumn,
174 sqlReleaseTableModifiedAtColumn,
175 sqlReleaseTableKeyColumn,
176 sqlReleaseTableNamespaceColumn,
177 sqlReleaseTableName,
178 sqlReleaseTableKeyColumn,
179 sqlReleaseTableNamespaceColumn,
180 sqlReleaseTableName,
181 sqlReleaseTableVersionColumn,
182 sqlReleaseTableName,
183 sqlReleaseTableStatusColumn,
184 sqlReleaseTableName,
185 sqlReleaseTableOwnerColumn,
186 sqlReleaseTableName,
187 sqlReleaseTableCreatedAtColumn,
188 sqlReleaseTableName,
189 sqlReleaseTableModifiedAtColumn,
190 sqlReleaseTableName,
191 sqlReleaseTableName,
192 ),
193 },
194 Down: []string{
195 fmt.Sprintf(`
196 DROP TABLE %s;
197 `, sqlReleaseTableName),
198 },
199 },
200 {
201 Id: "custom_labels",
202 Up: []string{
203 fmt.Sprintf(`
204 CREATE TABLE %s (
205 %s VARCHAR(64),
206 %s VARCHAR(67),
207 %s VARCHAR(%d),
208 %s VARCHAR(%d)
209 );
210 CREATE INDEX ON %s (%s, %s);
211
212 GRANT ALL ON %s TO PUBLIC;
213 ALTER TABLE %s ENABLE ROW LEVEL SECURITY;
214 `,
215 sqlCustomLabelsTableName,
216 sqlCustomLabelsTableReleaseKeyColumn,
217 sqlCustomLabelsTableReleaseNamespaceColumn,
218 sqlCustomLabelsTableKeyColumn,
219 sqlCustomLabelsTableKeyMaxLenght,
220 sqlCustomLabelsTableValueColumn,
221 sqlCustomLabelsTableValueMaxLenght,
222 sqlCustomLabelsTableName,
223 sqlCustomLabelsTableReleaseKeyColumn,
224 sqlCustomLabelsTableReleaseNamespaceColumn,
225 sqlCustomLabelsTableName,
226 sqlCustomLabelsTableName,
227 ),
228 },
229 Down: []string{
230 fmt.Sprintf(`
231 DELETE TABLE %s;
232 `, sqlCustomLabelsTableName),
233 },
234 },
235 },
236 }
237
238
239 if s.checkAlreadyApplied(migrations.Migrations) {
240 return nil
241 }
242
243
244 _, err := migrate.Exec(s.db.DB, postgreSQLDialect, migrations, migrate.Up)
245 return err
246 }
247
248
249 type SQLReleaseWrapper struct {
250
251 Key string `db:"key"`
252
253
254 Type string `db:"type"`
255
256
257 Body string `db:"body"`
258
259
260
261
262 Name string `db:"name"`
263 Namespace string `db:"namespace"`
264 Version int `db:"version"`
265 Status string `db:"status"`
266 Owner string `db:"owner"`
267 CreatedAt int `db:"createdAt"`
268 ModifiedAt int `db:"modifiedAt"`
269 }
270
271 type SQLReleaseCustomLabelWrapper struct {
272 ReleaseKey string `db:"release_key"`
273 ReleaseNamespace string `db:"release_namespace"`
274 Key string `db:"key"`
275 Value string `db:"value"`
276 }
277
278
279 func NewSQL(connectionString string, logger func(string, ...interface{}), namespace string) (*SQL, error) {
280 db, err := sqlx.Connect(postgreSQLDialect, connectionString)
281 if err != nil {
282 return nil, err
283 }
284
285 driver := &SQL{
286 db: db,
287 Log: logger,
288 statementBuilder: sq.StatementBuilder.PlaceholderFormat(sq.Dollar),
289 }
290
291 if err := driver.ensureDBSetup(); err != nil {
292 return nil, err
293 }
294
295 driver.namespace = namespace
296
297 return driver, nil
298 }
299
300
301 func (s *SQL) Get(key string) (*rspb.Release, error) {
302 var record SQLReleaseWrapper
303
304 qb := s.statementBuilder.
305 Select(sqlReleaseTableBodyColumn).
306 From(sqlReleaseTableName).
307 Where(sq.Eq{sqlReleaseTableKeyColumn: key}).
308 Where(sq.Eq{sqlReleaseTableNamespaceColumn: s.namespace})
309
310 query, args, err := qb.ToSql()
311 if err != nil {
312 s.Log("failed to build query: %v", err)
313 return nil, err
314 }
315
316
317 if err := s.db.Get(&record, query, args...); err != nil {
318 s.Log("got SQL error when getting release %s: %v", key, err)
319 return nil, ErrReleaseNotFound
320 }
321
322 release, err := decodeRelease(record.Body)
323 if err != nil {
324 s.Log("get: failed to decode data %q: %v", key, err)
325 return nil, err
326 }
327
328 if release.Labels, err = s.getReleaseCustomLabels(key, s.namespace); err != nil {
329 s.Log("failed to get release %s/%s custom labels: %v", s.namespace, key, err)
330 return nil, err
331 }
332
333 return release, nil
334 }
335
336
337 func (s *SQL) List(filter func(*rspb.Release) bool) ([]*rspb.Release, error) {
338 sb := s.statementBuilder.
339 Select(sqlReleaseTableKeyColumn, sqlReleaseTableNamespaceColumn, sqlReleaseTableBodyColumn).
340 From(sqlReleaseTableName).
341 Where(sq.Eq{sqlReleaseTableOwnerColumn: sqlReleaseDefaultOwner})
342
343
344 if s.namespace != "" {
345 sb = sb.Where(sq.Eq{sqlReleaseTableNamespaceColumn: s.namespace})
346 }
347
348 query, args, err := sb.ToSql()
349 if err != nil {
350 s.Log("failed to build query: %v", err)
351 return nil, err
352 }
353
354 var records = []SQLReleaseWrapper{}
355 if err := s.db.Select(&records, query, args...); err != nil {
356 s.Log("list: failed to list: %v", err)
357 return nil, err
358 }
359
360 var releases []*rspb.Release
361 for _, record := range records {
362 release, err := decodeRelease(record.Body)
363 if err != nil {
364 s.Log("list: failed to decode release: %v: %v", record, err)
365 continue
366 }
367
368 if release.Labels, err = s.getReleaseCustomLabels(record.Key, record.Namespace); err != nil {
369 s.Log("failed to get release %s/%s custom labels: %v", record.Namespace, record.Key, err)
370 return nil, err
371 }
372 for k, v := range getReleaseSystemLabels(release) {
373 release.Labels[k] = v
374 }
375
376 if filter(release) {
377 releases = append(releases, release)
378 }
379 }
380
381 return releases, nil
382 }
383
384
385 func (s *SQL) Query(labels map[string]string) ([]*rspb.Release, error) {
386 sb := s.statementBuilder.
387 Select(sqlReleaseTableKeyColumn, sqlReleaseTableNamespaceColumn, sqlReleaseTableBodyColumn).
388 From(sqlReleaseTableName)
389
390 keys := make([]string, 0, len(labels))
391 for key := range labels {
392 keys = append(keys, key)
393 }
394 sort.Strings(keys)
395 for _, key := range keys {
396 if _, ok := labelMap[key]; ok {
397 sb = sb.Where(sq.Eq{key: labels[key]})
398 } else {
399 s.Log("unknown label %s", key)
400 return nil, fmt.Errorf("unknown label %s", key)
401 }
402 }
403
404
405 if s.namespace != "" {
406 sb = sb.Where(sq.Eq{sqlReleaseTableNamespaceColumn: s.namespace})
407 }
408
409
410 query, args, err := sb.ToSql()
411 if err != nil {
412 s.Log("failed to build query: %v", err)
413 return nil, err
414 }
415
416 var records = []SQLReleaseWrapper{}
417 if err := s.db.Select(&records, query, args...); err != nil {
418 s.Log("list: failed to query with labels: %v", err)
419 return nil, err
420 }
421
422 if len(records) == 0 {
423 return nil, ErrReleaseNotFound
424 }
425
426 var releases []*rspb.Release
427 for _, record := range records {
428 release, err := decodeRelease(record.Body)
429 if err != nil {
430 s.Log("list: failed to decode release: %v: %v", record, err)
431 continue
432 }
433
434 if release.Labels, err = s.getReleaseCustomLabels(record.Key, record.Namespace); err != nil {
435 s.Log("failed to get release %s/%s custom labels: %v", record.Namespace, record.Key, err)
436 return nil, err
437 }
438
439 releases = append(releases, release)
440 }
441
442 if len(releases) == 0 {
443 return nil, ErrReleaseNotFound
444 }
445
446 return releases, nil
447 }
448
449
450 func (s *SQL) Create(key string, rls *rspb.Release) error {
451 namespace := rls.Namespace
452 if namespace == "" {
453 namespace = defaultNamespace
454 }
455 s.namespace = namespace
456
457 body, err := encodeRelease(rls)
458 if err != nil {
459 s.Log("failed to encode release: %v", err)
460 return err
461 }
462
463 transaction, err := s.db.Beginx()
464 if err != nil {
465 s.Log("failed to start SQL transaction: %v", err)
466 return fmt.Errorf("error beginning transaction: %v", err)
467 }
468
469 insertQuery, args, err := s.statementBuilder.
470 Insert(sqlReleaseTableName).
471 Columns(
472 sqlReleaseTableKeyColumn,
473 sqlReleaseTableTypeColumn,
474 sqlReleaseTableBodyColumn,
475 sqlReleaseTableNameColumn,
476 sqlReleaseTableNamespaceColumn,
477 sqlReleaseTableVersionColumn,
478 sqlReleaseTableStatusColumn,
479 sqlReleaseTableOwnerColumn,
480 sqlReleaseTableCreatedAtColumn,
481 ).
482 Values(
483 key,
484 sqlReleaseDefaultType,
485 body,
486 rls.Name,
487 namespace,
488 int(rls.Version),
489 rls.Info.Status.String(),
490 sqlReleaseDefaultOwner,
491 int(time.Now().Unix()),
492 ).ToSql()
493 if err != nil {
494 s.Log("failed to build insert query: %v", err)
495 return err
496 }
497
498 if _, err := transaction.Exec(insertQuery, args...); err != nil {
499 defer transaction.Rollback()
500
501 selectQuery, args, buildErr := s.statementBuilder.
502 Select(sqlReleaseTableKeyColumn).
503 From(sqlReleaseTableName).
504 Where(sq.Eq{sqlReleaseTableKeyColumn: key}).
505 Where(sq.Eq{sqlReleaseTableNamespaceColumn: s.namespace}).
506 ToSql()
507 if buildErr != nil {
508 s.Log("failed to build select query: %v", buildErr)
509 return err
510 }
511
512 var record SQLReleaseWrapper
513 if err := transaction.Get(&record, selectQuery, args...); err == nil {
514 s.Log("release %s already exists", key)
515 return ErrReleaseExists
516 }
517
518 s.Log("failed to store release %s in SQL database: %v", key, err)
519 return err
520 }
521
522
523 for k, v := range filterSystemLabels(rls.Labels) {
524 insertLabelsQuery, args, err := s.statementBuilder.
525 Insert(sqlCustomLabelsTableName).
526 Columns(
527 sqlCustomLabelsTableReleaseKeyColumn,
528 sqlCustomLabelsTableReleaseNamespaceColumn,
529 sqlCustomLabelsTableKeyColumn,
530 sqlCustomLabelsTableValueColumn,
531 ).
532 Values(
533 key,
534 namespace,
535 k,
536 v,
537 ).ToSql()
538
539 if err != nil {
540 defer transaction.Rollback()
541 s.Log("failed to build insert query: %v", err)
542 return err
543 }
544
545 if _, err := transaction.Exec(insertLabelsQuery, args...); err != nil {
546 defer transaction.Rollback()
547 s.Log("failed to write Labels: %v", err)
548 return err
549 }
550 }
551 defer transaction.Commit()
552
553 return nil
554 }
555
556
557 func (s *SQL) Update(key string, rls *rspb.Release) error {
558 namespace := rls.Namespace
559 if namespace == "" {
560 namespace = defaultNamespace
561 }
562 s.namespace = namespace
563
564 body, err := encodeRelease(rls)
565 if err != nil {
566 s.Log("failed to encode release: %v", err)
567 return err
568 }
569
570 query, args, err := s.statementBuilder.
571 Update(sqlReleaseTableName).
572 Set(sqlReleaseTableBodyColumn, body).
573 Set(sqlReleaseTableNameColumn, rls.Name).
574 Set(sqlReleaseTableVersionColumn, int(rls.Version)).
575 Set(sqlReleaseTableStatusColumn, rls.Info.Status.String()).
576 Set(sqlReleaseTableOwnerColumn, sqlReleaseDefaultOwner).
577 Set(sqlReleaseTableModifiedAtColumn, int(time.Now().Unix())).
578 Where(sq.Eq{sqlReleaseTableKeyColumn: key}).
579 Where(sq.Eq{sqlReleaseTableNamespaceColumn: namespace}).
580 ToSql()
581
582 if err != nil {
583 s.Log("failed to build update query: %v", err)
584 return err
585 }
586
587 if _, err := s.db.Exec(query, args...); err != nil {
588 s.Log("failed to update release %s in SQL database: %v", key, err)
589 return err
590 }
591
592 return nil
593 }
594
595
596 func (s *SQL) Delete(key string) (*rspb.Release, error) {
597 transaction, err := s.db.Beginx()
598 if err != nil {
599 s.Log("failed to start SQL transaction: %v", err)
600 return nil, fmt.Errorf("error beginning transaction: %v", err)
601 }
602
603 selectQuery, args, err := s.statementBuilder.
604 Select(sqlReleaseTableBodyColumn).
605 From(sqlReleaseTableName).
606 Where(sq.Eq{sqlReleaseTableKeyColumn: key}).
607 Where(sq.Eq{sqlReleaseTableNamespaceColumn: s.namespace}).
608 ToSql()
609 if err != nil {
610 s.Log("failed to build select query: %v", err)
611 return nil, err
612 }
613
614 var record SQLReleaseWrapper
615 err = transaction.Get(&record, selectQuery, args...)
616 if err != nil {
617 s.Log("release %s not found: %v", key, err)
618 return nil, ErrReleaseNotFound
619 }
620
621 release, err := decodeRelease(record.Body)
622 if err != nil {
623 s.Log("failed to decode release %s: %v", key, err)
624 transaction.Rollback()
625 return nil, err
626 }
627 defer transaction.Commit()
628
629 deleteQuery, args, err := s.statementBuilder.
630 Delete(sqlReleaseTableName).
631 Where(sq.Eq{sqlReleaseTableKeyColumn: key}).
632 Where(sq.Eq{sqlReleaseTableNamespaceColumn: s.namespace}).
633 ToSql()
634 if err != nil {
635 s.Log("failed to build delete query: %v", err)
636 return nil, err
637 }
638
639 _, err = transaction.Exec(deleteQuery, args...)
640 if err != nil {
641 s.Log("failed perform delete query: %v", err)
642 return release, err
643 }
644
645 if release.Labels, err = s.getReleaseCustomLabels(key, s.namespace); err != nil {
646 s.Log("failed to get release %s/%s custom labels: %v", s.namespace, key, err)
647 return nil, err
648 }
649
650 deleteCustomLabelsQuery, args, err := s.statementBuilder.
651 Delete(sqlCustomLabelsTableName).
652 Where(sq.Eq{sqlCustomLabelsTableReleaseKeyColumn: key}).
653 Where(sq.Eq{sqlCustomLabelsTableReleaseNamespaceColumn: s.namespace}).
654 ToSql()
655
656 if err != nil {
657 s.Log("failed to build delete Labels query: %v", err)
658 return nil, err
659 }
660 _, err = transaction.Exec(deleteCustomLabelsQuery, args...)
661 return release, err
662 }
663
664
665 func (s *SQL) getReleaseCustomLabels(key string, _ string) (map[string]string, error) {
666 query, args, err := s.statementBuilder.
667 Select(sqlCustomLabelsTableKeyColumn, sqlCustomLabelsTableValueColumn).
668 From(sqlCustomLabelsTableName).
669 Where(sq.Eq{sqlCustomLabelsTableReleaseKeyColumn: key,
670 sqlCustomLabelsTableReleaseNamespaceColumn: s.namespace}).
671 ToSql()
672 if err != nil {
673 return nil, err
674 }
675
676 var labelsList = []SQLReleaseCustomLabelWrapper{}
677 if err := s.db.Select(&labelsList, query, args...); err != nil {
678 return nil, err
679 }
680
681 labelsMap := make(map[string]string)
682 for _, i := range labelsList {
683 labelsMap[i.Key] = i.Value
684 }
685
686 return filterSystemLabels(labelsMap), nil
687 }
688
689
690 func getReleaseSystemLabels(rls *rspb.Release) map[string]string {
691 return map[string]string{
692 "name": rls.Name,
693 "owner": sqlReleaseDefaultOwner,
694 "status": rls.Info.Status.String(),
695 "version": strconv.Itoa(rls.Version),
696 }
697 }
698
View as plain text