1
2
3 package storage
4
5 import (
6 "bytes"
7 "context"
8 "fmt"
9 "io"
10 "io/fs"
11 "log"
12 "os"
13 "path/filepath"
14 "strings"
15
16 goErrors "errors"
17
18 gcs "cloud.google.com/go/storage"
19 "google.golang.org/api/iterator"
20
21 "edge-infra.dev/pkg/lib/cli/rags"
22 "edge-infra.dev/pkg/lib/errors"
23 "edge-infra.dev/pkg/lib/fog"
24 )
25
26 const (
27 DefaultBucket = "edge-test-jobs"
28 DefaultTestingBucket = "jack-testing-bucket"
29 StartedFilename = "started.json"
30 FinishedFilename = "finished.json"
31 LogsFilename = "logs.txt"
32 )
33
34
35
36 func BindStorageBucketFlag(rs *rags.RagSet, d *string) {
37 rs.StringVar(d, "job-storage-bucket", DefaultBucket, "the storage bucket to upload job results to")
38 }
39
40
41 type Artifact struct {
42 Attributes *gcs.ObjectAttrs
43 Contents []byte
44 }
45
46
47 type Files struct {
48 XML map[string][]byte
49 JSON map[string][]byte
50 Logs map[string][]byte
51 }
52
53 func (a *Artifact) ToString() string {
54 return string(a.Contents)
55 }
56
57
58 type Option func(*storageOpts)
59
60 type BucketOption func(*bucketOpts)
61
62
63
64 func New(ctx context.Context, opts ...Option) (*Storage, error) {
65 o := &storageOpts{}
66 for _, opt := range opts {
67 opt(o)
68 }
69
70 if o.client == nil {
71
72 client, err := gcs.NewClient(ctx)
73 if err != nil {
74 return nil, errors.Wrap(err)
75 }
76 o.client = client
77 }
78
79 return &Storage{Client: o.client}, nil
80 }
81
82 func (s *Storage) NewBucket(ctx context.Context, opts ...BucketOption) (*Bucket, error) {
83 o := &bucketOpts{bucketName: DefaultBucket}
84 for _, opt := range opts {
85 opt(o)
86 }
87
88
89 bucket := s.Client.Bucket(o.bucketName)
90
91 _, err := bucket.Attrs(ctx)
92
93 if err == gcs.ErrBucketNotExist {
94 return nil, errors.New(fmt.Sprintf("bucket %s does not exist, please create first", o.bucketName), err)
95 }
96
97 if err != nil {
98 return nil, errors.New("failed to check if bucket exists", err)
99 }
100 return &Bucket{bucket}, nil
101 }
102
103
104
105
106
107
108
109
110 func BasePath(repo, workflow, runid, jobname string) string {
111 return fmt.Sprintf("actions/%s/%s/%s/%s", repo, workflow, runid, jobname)
112 }
113
114
115
116
117
118
119 func ArgoBasePath(repo, wf, runid string) string {
120 return fmt.Sprintf("argo/%s/%s/%s", repo, wf, runid)
121 }
122
123
124
125
126
127 func ArtifactsPath(storagePath string) string {
128 return filepath.Join(storagePath, "artifacts")
129 }
130
131
132 func LogsPath(storagePath string) string {
133 return filepath.Join(storagePath, "logs")
134 }
135
136 var (
137 ErrEmptyDir = goErrors.New("directory is empty")
138 )
139
140
141
142 func (b *Bucket) UploadArtifacts(ctx context.Context, storagePath, dir string) error {
143 files, err := os.ReadDir(dir)
144 if err != nil {
145 return errors.New(fmt.Sprintf("failed to read dir %s", dir), err)
146 }
147 if len(files) == 0 {
148 return errors.Wrap(ErrEmptyDir)
149 }
150
151
152
153 var finished fs.DirEntry
154 for _, file := range files {
155 name := file.Name()
156
157 if name == StartedFilename {
158 err = b.write(ctx, file.Name(), storagePath, dir, file)
159 if err != nil {
160 return err
161 }
162 continue
163 }
164
165
166 if name == FinishedFilename {
167 finished = file
168 continue
169 }
170
171 err = b.write(ctx, name, ArtifactsPath(storagePath), dir, file)
172 if err != nil {
173 return err
174 }
175 }
176
177
178 if finished != nil {
179 err = b.write(ctx, finished.Name(), storagePath, dir, finished)
180 if err != nil {
181 return err
182 }
183 }
184
185 return nil
186 }
187
188 func (b *Bucket) write(ctx context.Context, name, path, dir string, file fs.DirEntry) error {
189
190 w := b.Object(filepath.Join(path, name)).NewWriter(ctx)
191
192 r, err := os.Open(filepath.Join(dir, file.Name()))
193 if err != nil {
194 return errors.New(fmt.Sprintf("failed to open file %s", name), err)
195 }
196 defer r.Close()
197
198 if _, err := io.Copy(w, r); err != nil {
199 return errors.New(fmt.Sprintf("failed to upload file %s", name), err)
200 }
201 if err := w.Close(); err != nil {
202 return errors.New(fmt.Sprintf("failed to close file %s", name), err)
203 }
204 return nil
205 }
206
207
208 func (b *Bucket) UploadStartedJSON(ctx context.Context, repo, workflow, id, job string, file []byte) error {
209 storagePath := BasePath(repo, workflow, id, job)
210 return b.upload(ctx, storagePath, StartedFilename, file)
211 }
212
213
214 func (b *Bucket) UploadFinishedJSON(ctx context.Context, repo, workflow, id, job string, file []byte) error {
215 storagePath := BasePath(repo, workflow, id, job)
216 return b.upload(ctx, storagePath, FinishedFilename, file)
217 }
218
219
220 func (b *Bucket) UploadLogs(ctx context.Context, repo, workflow, id, job string, file []byte) error {
221 storagePath := BasePath(repo, workflow, id, job)
222 return b.upload(ctx, LogsPath(storagePath), LogsFilename, file)
223 }
224
225
226 func (b *Bucket) upload(ctx context.Context, storagePath, name string, file []byte) error {
227
228 w := b.Object(filepath.Join(storagePath, name)).NewWriter(ctx)
229
230 r := bytes.NewReader(file)
231
232
233 if _, err := io.Copy(w, r); err != nil {
234 return errors.New(fmt.Sprintf("failed to upload file %s", name), err)
235 }
236 if err := w.Close(); err != nil {
237 return errors.New(fmt.Sprintf("failed to close file %s", name), err)
238 }
239 return nil
240 }
241
242
243 func (b *Bucket) RetrieveArtifacts(ctx context.Context, storagePath string) ([]Artifact, error) {
244 query := &gcs.Query{Prefix: ArtifactsPath(storagePath)}
245
246 var result []Artifact
247 it := b.Objects(ctx, query)
248
249 for {
250 attrs, err := it.Next()
251 if err == iterator.Done {
252 break
253 }
254 if err != nil {
255 log.Fatal(err)
256 }
257
258 artifact := Artifact{Attributes: attrs}
259
260 data, err := b.readFile(ctx, attrs.Name)
261 if err != nil {
262 return nil, fmt.Errorf("storage.RetrieveArtifacts: unable to retrieve artifact: %w", err)
263 }
264 artifact.Contents = data
265 result = append(result, artifact)
266 }
267
268 return result, nil
269 }
270
271
272 func (b *Bucket) RetrieveLogs(ctx context.Context, storagePath string) (map[string][]byte, error) {
273 query := &gcs.Query{Prefix: LogsPath(storagePath)}
274 logs := make(map[string][]byte)
275
276 it := b.Objects(ctx, query)
277
278 for {
279 attrs, err := it.Next()
280 if err == iterator.Done {
281 break
282 }
283 if err != nil {
284 log.Fatal(err)
285 }
286
287 data, err := b.readFile(ctx, attrs.Name)
288 if err != nil {
289 return logs, fmt.Errorf("storage.RetrieveArtifacts: unable to retrieve artifact: %w", err)
290 }
291
292
293 nameArr := strings.SplitAfter(attrs.Name, "/")
294 name := nameArr[len(nameArr)-1]
295 logs[name] = data
296 }
297
298 return logs, nil
299 }
300
301
302 func (b *Bucket) RetrieveAllFiles(ctx context.Context, storagePath string) (Files, error) {
303 query := &gcs.Query{Prefix: storagePath}
304
305 files := Files{}
306 files.JSON = make(map[string][]byte)
307 files.XML = make(map[string][]byte)
308 files.Logs = make(map[string][]byte)
309
310 log := fog.FromContext(ctx)
311 it := b.Objects(ctx, query)
312
313 names := []string{}
314
315 for {
316 attrs, err := it.Next()
317 if err == iterator.Done {
318 break
319 }
320 if err != nil {
321 return files, err
322 }
323
324
325 ext := filepath.Ext(attrs.Name)
326 if ext != ".json" && ext != ".xml" && ext != ".txt" && ext != ".logs" && ext != ".log" {
327 continue
328 }
329
330 data, err := b.readFile(ctx, attrs.Name)
331 if err != nil {
332 log.Error(err, "storage.RetrieveArtifacts: unable to retrieve artifact")
333 continue
334 }
335
336
337 nameArr := strings.SplitAfter(attrs.Name, "/")
338 name := nameArr[len(nameArr)-1]
339 names = append(names, name)
340
341 switch ext {
342 case ".json":
343 files.JSON[name] = data
344 case ".log", ".logs", ".txt":
345 files.Logs[name] = data
346 case ".xml":
347 files.XML[name] = data
348 }
349 }
350
351 log.Info("files found", "files", names)
352
353 return files, nil
354 }
355
356
357 func (b *Bucket) readFile(ctx context.Context, fileName string) ([]byte, error) {
358 rc, err := b.Object(fileName).NewReader(ctx)
359 if err != nil {
360 return nil, fmt.Errorf("failed to get handle on object %s: %w", fileName, err)
361 }
362 defer rc.Close()
363
364 slurp, err := io.ReadAll(rc)
365 if err != nil {
366 fmt.Printf("%v", err)
367 return nil, fmt.Errorf("failed to read file %s: %w", fileName, err)
368 }
369
370 return slurp, nil
371 }
372
View as plain text