1 package uploadjob
2
3 import (
4 "archive/zip"
5 "bufio"
6 "encoding/base64"
7 "fmt"
8 "io"
9 "net/http"
10 "os"
11 "strings"
12
13 "github.com/google/go-github/v47/github"
14
15 "edge-infra.dev/pkg/f8n/devinfra/gcp/job/storage"
16 "edge-infra.dev/pkg/f8n/devinfra/jack/constants"
17 "edge-infra.dev/pkg/f8n/devinfra/jack/plugin"
18 "edge-infra.dev/pkg/f8n/devinfra/job"
19 "edge-infra.dev/pkg/lib/build"
20 )
21
22 func init() {
23 plugin.RegisterWorkflowEventHandler(constants.PluginUploadJob, handleWorkflowEvent)
24 plugin.RegisterWorkflowRunEventHandler(constants.PluginUploadJob, handleWorkflowRunEvent)
25 }
26
27
28 func handleWorkflowEvent(hp plugin.HandlerParams, wf github.WorkflowJobEvent) {
29 hp.Log.WithName(constants.PluginUploadJob)
30
31 if wf.GetAction() != "in_progress" {
32 return
33 }
34
35 ctx := hp.Ctx
36 log := hp.Log
37 client := hp.Client
38 org := hp.Org
39 repo := hp.Repo
40
41 log.Info("Running uploadjob plugin")
42 log.Info("in progress job detected")
43
44
45 runs, _, err := client.Actions().GetWorkflowRunByID(ctx, org, repo, wf.WorkflowJob.GetRunID())
46 if err != nil {
47 log.Error(err, "Failed to get workflow run by ID")
48 return
49 }
50
51
52 var pr int
53 if len(runs.PullRequests) > 0 {
54 pr = runs.PullRequests[0].GetNumber()
55 }
56
57 timestamp := wf.WorkflowJob.GetStartedAt().UTC().Unix()
58 machine := wf.WorkflowJob.Labels[0]
59 headsha := wf.WorkflowJob.GetHeadSHA()
60 number := fmt.Sprintf("%d-%d", wf.WorkflowJob.GetRunID(), runs.GetRunAttempt())
61 jobName := wf.WorkflowJob.GetName()
62 workflow := *runs.Name
63 branch := *runs.HeadBranch
64
65
66 base, err := getVersion(hp, build.VersionFile)
67 if err != nil {
68 log.Error(err, "Failed to get version")
69 }
70
71
72 opts := &github.ListOptions{}
73 commit, _, err := client.Repositories().GetCommit(ctx, org, repo, headsha, opts)
74 if err != nil {
75 log.Error(err, "Failed to get commit from repo")
76 return
77 }
78
79 ghrepo, _, err := client.Repositories().Get(ctx, org, repo)
80 if err != nil {
81 log.Error(err, "failed to get default branch for repo")
82 return
83 }
84
85 ver := &build.Version{
86 SemVer: base,
87 Commit: headsha,
88 Timestamp: commit.GetCommitter().GetCreatedAt().UTC().Unix(),
89 ReleaseCandidate: branch == *ghrepo.DefaultBranch || strings.HasPrefix(branch, "release/"),
90 }
91
92
93 started, err := job.NewStarted(
94 job.WithCommit(headsha),
95 job.WithMachine(machine),
96 job.WithRepo(repo),
97 job.WithTimestamp(timestamp),
98 job.WithVersion(ver.String()),
99 job.WithPull(fmt.Sprint(pr)),
100 )
101 if err != nil {
102 log.Error(err, "failed to create started struct")
103 return
104 }
105
106
107 startedjson, err := started.ToJSON()
108 if err != nil {
109 log.Error(err, "failed to parse started to json")
110 return
111 }
112
113 log.Info("creating storage client")
114
115
116 for _, job := range hp.Params.Uploadjob {
117 log.Info(fmt.Sprintf("uploading to the %s bucket", job.Bucket))
118 s, err := storage.New(ctx)
119 if err != nil {
120 log.Error(err, "failed to create storage client")
121 return
122 }
123
124 b, err := s.NewBucket(ctx, storage.WithBucket(job.Bucket))
125 if err != nil {
126 log.Error(err, "failed to create bucket handler")
127 return
128 }
129
130 log.Info(fmt.Sprintf("attempting to upload started file to %s", storage.BasePath(repo, workflow, number, jobName)))
131
132 err = b.UploadStartedJSON(ctx, repo, workflow, number, jobName, startedjson)
133 if err != nil {
134 log.Error(err, "failed to upload started json")
135 return
136 }
137 }
138 }
139
140
141 func handleWorkflowRunEvent(hp plugin.HandlerParams, wfr github.WorkflowRunEvent) {
142 hp.Log.WithName(constants.PluginUploadJob)
143
144 if wfr.GetAction() != "completed" {
145 return
146 }
147
148 client := hp.Client
149 ctx := hp.Ctx
150 log := hp.Log
151 org := hp.Org
152 repo := hp.Repo
153 suiteID := wfr.WorkflowRun.GetCheckSuiteID()
154
155 log.Info("completed run detected")
156
157
158 opts := &github.ListCheckRunsOptions{}
159 checkrun, _, err := client.Checks().ListCheckRunsCheckSuite(ctx, org, repo, suiteID, opts)
160 if err != nil {
161 log.Error(err, "Failed to lists check runs for a check suite")
162 return
163 }
164
165
166 for _, run := range checkrun.CheckRuns {
167 jobName := run.GetName()
168 log.Info(fmt.Sprintf("attempting to upload finished data for job %+v", jobName))
169
170
171 logs, err := getWorkflowRunLogs(hp, wfr, jobName)
172 if err != nil {
173 log.Error(err, "failed to get workflow logs")
174 }
175
176
177
178 passed := false
179 outcome := "failed"
180 if run.GetConclusion() == "success" {
181 passed = true
182 outcome = "passed"
183 }
184
185 workflow := wfr.WorkflowRun.GetName()
186 runID := fmt.Sprint(wfr.WorkflowRun.GetID())
187 attempt := fmt.Sprint(wfr.WorkflowRun.GetRunAttempt())
188 number := fmt.Sprintf("%s-%s", runID, attempt)
189
190 if len(wfr.WorkflowRun.PullRequests) > 0 {
191 pr := wfr.WorkflowRun.PullRequests[0].Number
192 nameJobName := fmt.Sprintf("%s/%s", workflow, jobName)
193 nameJobNameID := fmt.Sprintf("%s/%s/%s", workflow, jobName, number)
194 msg := fmt.Sprintf("| %s | [%s](https://gridbug.edge-infra.dev/%s/%s)|", nameJobName, outcome, repo, nameJobNameID)
195
196 err := addCommentOnPR(hp, msg, constants.PrCommentIdentifier, nameJobName, *pr)
197 if err != nil {
198 log.Error(err, "Failed to add a comment on the PR")
199 }
200 }
201
202 timestamp := run.GetCompletedAt().UTC().Unix()
203
204 metadata := make(map[string]string)
205 metadata["job_name"] = jobName
206 metadata["workflow_name"] = workflow
207 metadata["run_id"] = runID
208 metadata["number"] = number
209 metadata["attempt"] = attempt
210 metadata["platform"] = "actions"
211
212
213 finished, err := job.NewFinished(
214 job.WithFinishedTimestamp(timestamp),
215 job.WithPassed(passed),
216 job.WithMetadata(metadata),
217 )
218 if err != nil {
219 log.Error(err, "failed to create finished struct")
220 return
221 }
222
223
224 finishedJSON, err := finished.ToJSON()
225 if err != nil {
226 log.Error(err, "failed to generate finished json")
227 return
228 }
229
230 log.Info("creating storage client")
231
232
233 for _, job := range hp.Params.Uploadjob {
234 log.Info(fmt.Sprintf("uploading to the %s bucket", job.Bucket))
235 s, err := storage.New(ctx)
236 if err != nil {
237 log.Error(err, "failed to create storage client")
238 return
239 }
240
241 b, err := s.NewBucket(ctx, storage.WithBucket(job.Bucket))
242 if err != nil {
243 log.Error(err, "failed to create bucket handler")
244 return
245 }
246
247 base := storage.BasePath(repo, workflow, number, jobName)
248 log.Info(fmt.Sprintf("attempting to upload finished file to %s", base))
249
250 err = b.UploadFinishedJSON(ctx, repo, workflow, number, jobName, finishedJSON)
251 if err != nil {
252 log.Error(err, "failed to upload finished json")
253 return
254 }
255 log.Info(fmt.Sprintf("attempting to upload logs to %s", storage.LogsPath(base)))
256
257 err = b.UploadLogs(ctx, repo, workflow, number, jobName, logs)
258 if err != nil {
259 log.Error(err, "failed to upload logs")
260 return
261 }
262 }
263 }
264 }
265
266 func getWorkflowRunLogs(hp plugin.HandlerParams, wfr github.WorkflowRunEvent, jobName string) ([]byte, error) {
267
268 nurl, _, err := hp.Client.Actions().GetWorkflowRunLogs(hp.Ctx, hp.Org, hp.Repo, wfr.WorkflowRun.GetID(), true)
269 if err != nil {
270 return nil, err
271 }
272
273
274 getresp, err := http.Get(nurl.String())
275 if err != nil {
276 return nil, err
277 }
278 defer getresp.Body.Close()
279
280
281 body, err := io.ReadAll(getresp.Body)
282 if err != nil {
283 return nil, err
284 }
285
286
287 tmpfile, err := os.CreateTemp("", "logs-*.zip")
288 if err != nil {
289 return nil, err
290 }
291 defer os.Remove(tmpfile.Name())
292
293
294 if _, err := tmpfile.Write(body); err != nil {
295 return nil, err
296 }
297 if err := tmpfile.Close(); err != nil {
298 return nil, err
299 }
300
301
302 logs, files, err := unzip(tmpfile.Name(), jobName)
303 hp.Log.Logger.Info("unzipped log file", "logs", string(logs), "files", files)
304 return logs, err
305 }
306
307
308
309 func unzip(path string, jobName string) ([]byte, []string, error) {
310
311 z, err := zip.OpenReader(path)
312 if err != nil {
313 return nil, nil, err
314 }
315 defer z.Close()
316
317 filename := jobName + ".txt"
318
319
320 var logs []byte
321 var files []string
322 for _, f := range z.File {
323 files = append(files, f.Name)
324 if strings.HasSuffix(f.Name, filename) {
325 rc, err := f.Open()
326 if err != nil {
327 return nil, files, err
328 }
329 b, err := io.ReadAll(rc)
330 if err != nil {
331 return nil, files, err
332 }
333 rc.Close()
334 logs = b
335 }
336 }
337 return logs, files, nil
338 }
339
340
341 func getVersion(hp plugin.HandlerParams, fileName string) (string, error) {
342 return GetFile(hp, fileName, "")
343 }
344
345 func GetFile(hp plugin.HandlerParams, fileName string, commit string) (string, error) {
346
347 opts := &github.RepositoryContentGetOptions{Ref: commit}
348 file, _, _, err := hp.Client.Repositories().GetContents(hp.Ctx, hp.Org, hp.Repo, fileName, opts)
349 if err != nil {
350 return "", err
351 }
352
353 data, err := base64.StdEncoding.DecodeString(*file.Content)
354 if err != nil {
355 return "", err
356 }
357 return strings.TrimSpace(string(data)), nil
358 }
359
360
361 func addCommentOnPR(hp plugin.HandlerParams, msg, id, namejob string, number int) error {
362 client := hp.Client
363 ctx := hp.Ctx
364 org := hp.Org
365 repo := hp.Repo
366
367
368 opts := &github.IssueListCommentsOptions{}
369 comments, _, err := client.Issues().ListComments(ctx, org, repo, number, opts)
370 if err != nil {
371 return err
372 }
373
374 for _, comment := range comments {
375
376 if !strings.Contains(comment.GetBody(), id) || !strings.HasSuffix(comment.GetUser().GetLogin(), "[bot]") {
377 continue
378 }
379
380
381 if !strings.Contains(comment.GetBody(), namejob) {
382 nmsg := comment.GetBody() + "\n" + msg
383 issueComment := github.IssueComment{Body: &nmsg}
384 if _, _, err := client.Issues().EditComment(ctx, org, repo, comment.GetID(), &issueComment); err != nil {
385 return err
386 }
387 return nil
388 }
389
390
391 updatedBody := []string{}
392 scanner := bufio.NewScanner(strings.NewReader(comment.GetBody()))
393 for scanner.Scan() {
394 text := scanner.Text()
395 if strings.Contains(text, namejob) {
396 updatedBody = append(updatedBody, msg)
397 continue
398 }
399 updatedBody = append(updatedBody, text)
400 }
401 nmsg := strings.Join(updatedBody[:], "\n")
402 issueComment := github.IssueComment{Body: &nmsg}
403 if _, _, err := client.Issues().EditComment(ctx, org, repo, comment.GetID(), &issueComment); err != nil {
404 return err
405 }
406 return nil
407 }
408
409
410 msg = id + msg
411 issueComment := github.IssueComment{Body: &msg}
412 if _, _, err := client.Issues().CreateComment(ctx, org, repo, number, &issueComment); err != nil {
413 return err
414 }
415 return nil
416 }
417
View as plain text