1 package argo
2
3 import (
4 "context"
5 "encoding/base64"
6 "encoding/json"
7 "errors"
8 "flag"
9 "fmt"
10 "log"
11 "math/rand"
12 "os"
13 "path/filepath"
14 "strings"
15 "time"
16
17 "github.com/cli/go-gh/v2/pkg/api"
18 "github.com/peterbourgon/ff/v3"
19
20 "edge-infra.dev/pkg/f8n/devinfra/gcp/job/storage"
21 "edge-infra.dev/pkg/f8n/devinfra/job"
22 "edge-infra.dev/pkg/lib/build"
23 "edge-infra.dev/pkg/lib/cli/rags"
24 "edge-infra.dev/pkg/lib/cli/sink"
25 "edge-infra.dev/pkg/lib/fog"
26 )
27
28 var (
29
30 bucket string
31 dry bool
32 )
33
34
35 type argo struct {
36 workflow string
37 runID string
38 filepath string
39 branch string
40
41
42 startedTimestamp string
43 machine string
44 pull string
45 commit string
46 repo string
47 rosaVersionString string
48
49
50 finishedTimestamp string
51 passed string
52 metadata string
53 }
54
55 func argoFlags(a *argo) []*rags.Rag {
56 rs := rags.New("argo", flag.ExitOnError)
57 storage.BindStorageBucketFlag(rs, &bucket)
58
59 rs.StringVar(&a.workflow, "workflow", "", "argo workflow name", rags.WithRequired())
60 rs.StringVar(&a.runID, "run-id", "", "unique run id", rags.WithRequired())
61 rs.StringVar(&a.filepath, "filepath", "", "filepath to upload", rags.WithRequired())
62 rs.StringVar(&a.branch, "branch", "", "branch associated with the run ")
63
64
65 rs.StringVar(&a.commit, "commit", "", "hash of the commit associated with this workflow", rags.WithRequired())
66 rs.StringVar(&a.startedTimestamp, "started-time", "", "timestamp of when the workflow started", rags.WithRequired())
67 rs.StringVar(&a.machine, "machine", "", "name of the machine the job was ran on", rags.WithRequired())
68 rs.StringVar(&a.pull, "pull", "", "pull number associated with this workflow")
69 rs.StringVar(&a.repo, "repo", "", "name of the repo associated with this workflow", rags.WithRequired())
70 rs.StringVar(&a.rosaVersionString, "rosa-version", "", "rosa version associated with this run")
71
72
73 rs.StringVar(&a.passed, "passed", "", "whether or not the workflow passed", rags.WithRequired())
74 rs.StringVar(&a.finishedTimestamp, "finished-time", "", "timestamp of when the workflow ended", rags.WithRequired())
75 rs.StringVar(&a.metadata, "metadata", "", "JSON string with key value pairs of any extra relevant info")
76
77 rs.BoolVar(&dry, "dry-run", false, "print out paths being uploaded but dont upload them", rags.WithShort("d"))
78 return rs.Rags()
79 }
80
81 const letterBytes = "123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
82
83 func randStringBytes(n int) string {
84 b := make([]byte, n)
85 for i := range b {
86 timeNow := time.Now().UnixNano()
87 r := rand.New(rand.NewSource(timeNow))
88 b[i] = letterBytes[r.Intn(len(letterBytes))]
89 }
90 return string(b)
91 }
92
93 func New() *sink.Command {
94 argo := &argo{}
95
96 cmd := &sink.Command{
97 Use: "argo [flags] <directories>",
98 Short: "upload artifacts from Argo workflows to google cloud storage",
99 Extensions: []sink.Extension{},
100 Flags: argoFlags(argo),
101 Options: []ff.Option{
102 ff.WithEnvVarNoPrefix(),
103 },
104 Exec: func(ctx context.Context, r sink.Run) error {
105 run := fmt.Sprintf("%s-%s", argo.runID, randStringBytes(6))
106 r.Log.V(0).Info("generated new runID", "runID", run)
107
108 err := os.WriteFile(filepath.Join(argo.filepath, "run-id.txt"), []byte(run), 0644)
109 if err != nil {
110 return err
111 }
112
113 storagePath := storage.ArgoBasePath(
114 argo.repo,
115 argo.workflow,
116 run,
117 )
118
119 started, err := argo.generateStarted()
120 if err != nil {
121 return err
122 }
123 r.Log.V(0).Info("created started.json", "file", started)
124
125 finished, err := argo.generateFinished(run)
126 if err != nil {
127 return err
128 }
129 r.Log.V(0).Info("created finished.json", "file", finished)
130
131 return upload(ctx, storagePath, r.Args())
132 },
133 }
134 return cmd
135 }
136
137 func (a *argo) generateStarted() (string, error) {
138 err := os.MkdirAll(filepath.Dir(a.filepath), 0644)
139 if err != nil {
140 return "", err
141 }
142
143 t, err := time.Parse(time.RFC3339, a.startedTimestamp)
144 if err != nil {
145 return "", err
146 }
147
148 version := a.rosaVersionString
149 if a.rosaVersionString == "" {
150
151 if a.branch == "" {
152 return "", fmt.Errorf("if rosa version isnt supplied you must supply a branch name")
153 }
154
155 ver, err := a.fetchVersionString()
156 if err != nil {
157 return "", nil
158 }
159 version = ver
160 }
161
162 s, err := job.NewStarted(
163 job.WithCommit(a.commit),
164 job.WithMachine(a.machine),
165 job.WithRepo(a.repo),
166 job.WithVersion(version),
167 job.WithTimestamp(t.Unix()),
168 job.WithPull(a.pull),
169 )
170 if err != nil {
171 return "", err
172 }
173
174 sj, err := s.ToJSON()
175 if err != nil {
176 return "", err
177 }
178
179 if a.filepath != "" {
180 err = os.WriteFile(filepath.Join(a.filepath, "started.json"), sj, 0644)
181 if err != nil {
182 return "", err
183 }
184 }
185
186 return string(sj), nil
187 }
188
189 type Response struct {
190 Commit Commit `json:"commit"`
191 }
192
193 type Commit struct {
194 Committer Committer `json:"committer"`
195 }
196
197 type Committer struct {
198 Name string `json:"name"`
199 Email string `json:"email"`
200 Date string `json:"date"`
201 }
202
203 func (a *argo) fetchVersionString() (string, error) {
204 token := os.Getenv("TOKEN")
205 if token == "" {
206 return "", fmt.Errorf("github auth token was not found in env vars")
207 }
208
209 opts := api.ClientOptions{
210 AuthToken: token,
211 }
212 client, err := api.NewRESTClient(opts)
213 if err != nil {
214 return "", err
215 }
216
217 response := &Response{}
218 err = client.Get(
219 fmt.Sprintf("repos/ncrvoyix-swt-retail/%s/commits/%s", a.repo, a.commit),
220 &response,
221 )
222 if err != nil {
223 return "", err
224 }
225
226 version := struct {
227 Content string `json:"content"`
228 }{}
229 err = client.Get(
230 fmt.Sprintf("repos/ncrvoyix-swt-retail/%s/contents/.version?ref=%s", a.repo, a.commit),
231 &version,
232 )
233 if err != nil {
234 return "", err
235 }
236
237 data, err := base64.StdEncoding.DecodeString(version.Content)
238 if err != nil {
239 log.Fatal("error:", err)
240 }
241
242 ts, err := time.Parse(time.RFC3339, response.Commit.Committer.Date)
243 if err != nil {
244 return "", err
245 }
246
247 ver := &build.Version{
248 SemVer: strings.TrimSpace(string(data)),
249 Commit: a.commit,
250 Timestamp: ts.UTC().Unix(),
251 ReleaseCandidate: a.branch == "master" || strings.HasPrefix(a.branch, "release/"),
252 }
253
254 return ver.String(), nil
255 }
256
257 func (a *argo) generateFinished(run string) (string, error) {
258 result := make(map[string]string)
259 err := json.Unmarshal([]byte(a.metadata), &result)
260 if err != nil {
261 return "", err
262 }
263
264
265 result["run_id"] = a.runID
266 result["number"] = run
267 result["platform"] = "argo"
268 result["machine"] = a.machine
269 result["workflow_name"] = a.workflow
270
271 err = os.MkdirAll(filepath.Dir(a.filepath), 0644)
272 if err != nil {
273 return "", err
274 }
275
276 t, err := time.Parse(time.RFC3339, a.finishedTimestamp)
277 if err != nil {
278 return "", err
279 }
280
281 var p bool
282 switch a.passed {
283 case "Failed", "Error":
284 p = false
285 case "Succeeded":
286 p = true
287 }
288
289 f, err := job.NewFinished(
290 job.WithFinishedTimestamp(t.Unix()),
291 job.WithPassed(p),
292 job.WithMetadata(result),
293 )
294 if err != nil {
295 return "", err
296 }
297
298 fj, err := f.ToJSON()
299 if err != nil {
300 return "", err
301 }
302
303 if a.filepath != "" {
304 err = os.WriteFile(filepath.Join(a.filepath, "finished.json"), fj, 0644)
305 if err != nil {
306 return "", err
307 }
308 }
309
310 return string(fj), nil
311 }
312
313 func upload(ctx context.Context, storagePath string, dirs []string) error {
314 log := fog.FromContext(ctx)
315
316 s, err := storage.New(ctx)
317 if err != nil {
318 return fmt.Errorf("failed to create storage client: %w", err)
319 }
320
321 b, err := s.NewBucket(ctx, storage.WithBucket(bucket))
322 if err != nil {
323 return fmt.Errorf("failed to create bucket handler: %w", err)
324 }
325
326 log.Info("uploading job artifacts", "bucket", bucket, "storagePath", storagePath)
327 for _, dir := range dirs {
328
329 if dry {
330 log.Info("[dry run] uploading", "dir", dir)
331 continue
332 }
333
334 log.Info("uploading", "dir", dir)
335 err := b.UploadArtifacts(ctx, storagePath, dir)
336 if !errors.Is(err, storage.ErrEmptyDir) && err != nil {
337 return fmt.Errorf("failed to upload artifacts: %w", err)
338 }
339 }
340
341 return nil
342 }
343
View as plain text