1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mysql
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21
22 "github.com/jmoiron/sqlx"
23 "github.com/sigstore/rekor/pkg/log"
24
25
26 _ "github.com/go-sql-driver/mysql"
27 )
28
29 const (
30 ProviderType = "mysql"
31
32 lookupStmt = "SELECT EntryUUID FROM EntryIndex WHERE EntryKey IN (?)"
33 writeStmt = "INSERT IGNORE INTO EntryIndex (EntryKey, EntryUUID) VALUES (:key, :uuid)"
34 createTableStmt = `CREATE TABLE IF NOT EXISTS EntryIndex (
35 PK BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
36 EntryKey varchar(512) NOT NULL,
37 EntryUUID char(80) NOT NULL,
38 PRIMARY KEY(PK),
39 UNIQUE(EntryKey, EntryUUID)
40 )`
41 )
42
43
44 type IndexStorageProvider struct {
45 db *sqlx.DB
46 }
47
48 func NewProvider(dsn string, opts ...Options) (*IndexStorageProvider, error) {
49 var err error
50 provider := &IndexStorageProvider{}
51 provider.db, err = sqlx.Open(ProviderType, dsn)
52 if err != nil {
53 return nil, err
54 }
55 if err = provider.db.Ping(); err != nil {
56 return nil, err
57 }
58
59 for _, o := range opts {
60 o.applyConnMaxIdleTime(provider.db)
61 o.applyConnMaxLifetime(provider.db)
62 o.applyMaxIdleConns(provider.db)
63 o.applyMaxOpenConns(provider.db)
64 }
65
66 if _, err := provider.db.Exec(createTableStmt); err != nil {
67 return nil, fmt.Errorf("create table if not exists failed: %w", err)
68 }
69
70 return provider, nil
71 }
72
73
74
75 func (isp *IndexStorageProvider) LookupIndices(ctx context.Context, keys []string) ([]string, error) {
76 if isp.db == nil {
77 return []string{}, errors.New("sql client has not been initialized")
78 }
79
80 query, args, err := sqlx.In(lookupStmt, keys)
81 if err != nil {
82 return []string{}, fmt.Errorf("error preparing statement: %w", err)
83 }
84 rows, err := isp.db.QueryContext(ctx, isp.db.Rebind(query), args...)
85 if err != nil {
86 return []string{}, fmt.Errorf("error looking up indices from mysql: %w", err)
87 }
88 defer rows.Close()
89
90 var entryUUIDs []string
91 for rows.Next() {
92 var result string
93 if err := rows.Scan(&result); err != nil {
94 return []string{}, fmt.Errorf("error parsing results from mysql: %w", err)
95 }
96 entryUUIDs = append(entryUUIDs, result)
97 }
98
99 if err := rows.Err(); err != nil {
100 return []string{}, fmt.Errorf("error processing results from mysql: %w", err)
101 }
102 return entryUUIDs, nil
103 }
104
105
106
107 func (isp *IndexStorageProvider) WriteIndex(ctx context.Context, keys []string, index string) error {
108 if isp.db == nil {
109 return errors.New("sql client has not been initialized")
110 }
111
112 var valueMaps []map[string]interface{}
113 for _, key := range keys {
114 valueMaps = append(valueMaps, map[string]interface{}{"key": key, "uuid": index})
115 }
116 result, err := isp.db.NamedExecContext(ctx, writeStmt, valueMaps)
117 if err != nil {
118 return fmt.Errorf("mysql write error: %w", err)
119 }
120 rowsAffected, err := result.RowsAffected()
121 if err != nil {
122 return fmt.Errorf("mysql error getting rowsAffected: %w", err)
123 }
124 log.ContextLogger(ctx).Debugf("WriteIndex affected %d rows", rowsAffected)
125 return nil
126 }
127
128
129 func (isp *IndexStorageProvider) Shutdown() error {
130 if isp.db == nil {
131 return nil
132 }
133
134 return isp.db.Close()
135 }
136
View as plain text