package uploadjob import ( "archive/zip" "bufio" "encoding/base64" "fmt" "io" "net/http" "os" "strings" "github.com/google/go-github/v47/github" "edge-infra.dev/pkg/f8n/devinfra/gcp/job/storage" "edge-infra.dev/pkg/f8n/devinfra/jack/constants" "edge-infra.dev/pkg/f8n/devinfra/jack/plugin" "edge-infra.dev/pkg/f8n/devinfra/job" "edge-infra.dev/pkg/lib/build" ) func init() { plugin.RegisterWorkflowEventHandler(constants.PluginUploadJob, handleWorkflowEvent) plugin.RegisterWorkflowRunEventHandler(constants.PluginUploadJob, handleWorkflowRunEvent) } // handleWorkflowEvent handles events when a workflow has started func handleWorkflowEvent(hp plugin.HandlerParams, wf github.WorkflowJobEvent) { hp.Log.WithName(constants.PluginUploadJob) // exit if the job action isnt in_progress if wf.GetAction() != "in_progress" { return } ctx := hp.Ctx log := hp.Log client := hp.Client org := hp.Org repo := hp.Repo log.Info("Running uploadjob plugin") log.Info("in progress job detected") // get the details of this workflow run runs, _, err := client.Actions().GetWorkflowRunByID(ctx, org, repo, wf.WorkflowJob.GetRunID()) if err != nil { log.Error(err, "Failed to get workflow run by ID") return } // check if theres a pr linked to this workflow run var pr int if len(runs.PullRequests) > 0 { pr = runs.PullRequests[0].GetNumber() } timestamp := wf.WorkflowJob.GetStartedAt().UTC().Unix() machine := wf.WorkflowJob.Labels[0] // TODO dont just grab the first label headsha := wf.WorkflowJob.GetHeadSHA() number := fmt.Sprintf("%d-%d", wf.WorkflowJob.GetRunID(), runs.GetRunAttempt()) jobName := wf.WorkflowJob.GetName() workflow := *runs.Name branch := *runs.HeadBranch // grab the version from the repo base, err := getVersion(hp, build.VersionFile) if err != nil { log.Error(err, "Failed to get version") } // get the details of the head commit opts := &github.ListOptions{} commit, _, err := client.Repositories().GetCommit(ctx, org, repo, headsha, opts) if err != nil { log.Error(err, "Failed to get commit from repo") return } ghrepo, _, err := client.Repositories().Get(ctx, org, repo) if err != nil { log.Error(err, "failed to get default branch for repo") return } ver := &build.Version{ SemVer: base, Commit: headsha, Timestamp: commit.GetCommitter().GetCreatedAt().UTC().Unix(), ReleaseCandidate: branch == *ghrepo.DefaultBranch || strings.HasPrefix(branch, "release/"), } // create the started struct started, err := job.NewStarted( job.WithCommit(headsha), job.WithMachine(machine), job.WithRepo(repo), job.WithTimestamp(timestamp), job.WithVersion(ver.String()), job.WithPull(fmt.Sprint(pr)), ) if err != nil { log.Error(err, "failed to create started struct") return } // generate json startedjson, err := started.ToJSON() if err != nil { log.Error(err, "failed to parse started to json") return } log.Info("creating storage client") // send started to all configured buckets for _, job := range hp.Params.Uploadjob { log.Info(fmt.Sprintf("uploading to the %s bucket", job.Bucket)) s, err := storage.New(ctx) if err != nil { log.Error(err, "failed to create storage client") return } b, err := s.NewBucket(ctx, storage.WithBucket(job.Bucket)) if err != nil { log.Error(err, "failed to create bucket handler") return } log.Info(fmt.Sprintf("attempting to upload started file to %s", storage.BasePath(repo, workflow, number, jobName))) // attempt to upload the tmp file err = b.UploadStartedJSON(ctx, repo, workflow, number, jobName, startedjson) if err != nil { log.Error(err, "failed to upload started json") return } } } // handleWorkflowRunEvent handles workflow events when they have finished func handleWorkflowRunEvent(hp plugin.HandlerParams, wfr github.WorkflowRunEvent) { hp.Log.WithName(constants.PluginUploadJob) // exit if the action isnt completed if wfr.GetAction() != "completed" { return } client := hp.Client ctx := hp.Ctx log := hp.Log org := hp.Org repo := hp.Repo suiteID := wfr.WorkflowRun.GetCheckSuiteID() log.Info("completed run detected") // fetch the check suite run to get the jobID opts := &github.ListCheckRunsOptions{} checkrun, _, err := client.Checks().ListCheckRunsCheckSuite(ctx, org, repo, suiteID, opts) if err != nil { log.Error(err, "Failed to lists check runs for a check suite") return } // go through each job in the checkrun for _, run := range checkrun.CheckRuns { jobName := run.GetName() log.Info(fmt.Sprintf("attempting to upload finished data for job %+v", jobName)) // get the workflow logs logs, err := getWorkflowRunLogs(hp, wfr, jobName) if err != nil { log.Error(err, "failed to get workflow logs") } // Determine if the run passed or failed // possible states: action_required, cancelled, failure, neutral, success, skipped, stale, or timed_out passed := false outcome := "failed" if run.GetConclusion() == "success" { passed = true outcome = "passed" } workflow := wfr.WorkflowRun.GetName() runID := fmt.Sprint(wfr.WorkflowRun.GetID()) attempt := fmt.Sprint(wfr.WorkflowRun.GetRunAttempt()) number := fmt.Sprintf("%s-%s", runID, attempt) if len(wfr.WorkflowRun.PullRequests) > 0 { pr := wfr.WorkflowRun.PullRequests[0].Number nameJobName := fmt.Sprintf("%s/%s", workflow, jobName) nameJobNameID := fmt.Sprintf("%s/%s/%s", workflow, jobName, number) msg := fmt.Sprintf("| %s | [%s](https://gridbug.edge-infra.dev/%s/%s)|", nameJobName, outcome, repo, nameJobNameID) err := addCommentOnPR(hp, msg, constants.PrCommentIdentifier, nameJobName, *pr) if err != nil { log.Error(err, "Failed to add a comment on the PR") } } timestamp := run.GetCompletedAt().UTC().Unix() metadata := make(map[string]string) metadata["job_name"] = jobName metadata["workflow_name"] = workflow metadata["run_id"] = runID metadata["number"] = number metadata["attempt"] = attempt metadata["platform"] = "actions" // create the finished struct finished, err := job.NewFinished( job.WithFinishedTimestamp(timestamp), job.WithPassed(passed), job.WithMetadata(metadata), ) if err != nil { log.Error(err, "failed to create finished struct") return } // generate json finishedJSON, err := finished.ToJSON() if err != nil { log.Error(err, "failed to generate finished json") return } log.Info("creating storage client") // send started to all configured buckets for _, job := range hp.Params.Uploadjob { log.Info(fmt.Sprintf("uploading to the %s bucket", job.Bucket)) s, err := storage.New(ctx) if err != nil { log.Error(err, "failed to create storage client") return } b, err := s.NewBucket(ctx, storage.WithBucket(job.Bucket)) if err != nil { log.Error(err, "failed to create bucket handler") return } base := storage.BasePath(repo, workflow, number, jobName) log.Info(fmt.Sprintf("attempting to upload finished file to %s", base)) err = b.UploadFinishedJSON(ctx, repo, workflow, number, jobName, finishedJSON) if err != nil { log.Error(err, "failed to upload finished json") return } log.Info(fmt.Sprintf("attempting to upload logs to %s", storage.LogsPath(base))) err = b.UploadLogs(ctx, repo, workflow, number, jobName, logs) if err != nil { log.Error(err, "failed to upload logs") return } } } } func getWorkflowRunLogs(hp plugin.HandlerParams, wfr github.WorkflowRunEvent, jobName string) ([]byte, error) { // get the url for the logs nurl, _, err := hp.Client.Actions().GetWorkflowRunLogs(hp.Ctx, hp.Org, hp.Repo, wfr.WorkflowRun.GetID(), true) if err != nil { return nil, err } // get the logs getresp, err := http.Get(nurl.String()) if err != nil { return nil, err } defer getresp.Body.Close() // read the body into a byte array body, err := io.ReadAll(getresp.Body) if err != nil { return nil, err } // create a temp file and defer its removal tmpfile, err := os.CreateTemp("", "logs-*.zip") if err != nil { return nil, err } defer os.Remove(tmpfile.Name()) // write the byte array to the temp file and close it if _, err := tmpfile.Write(body); err != nil { return nil, err } if err := tmpfile.Close(); err != nil { return nil, err } // unzip the file and get the consolidated logs logs, files, err := unzip(tmpfile.Name(), jobName) hp.Log.Logger.Info("unzipped log file", "logs", string(logs), "files", files) return logs, err } // unzip takes in a file path and unzips it // returns the body of a specific file within the zipped logs func unzip(path string, jobName string) ([]byte, []string, error) { // open the zip file z, err := zip.OpenReader(path) if err != nil { return nil, nil, err } defer z.Close() filename := jobName + ".txt" // search for the {jobname}.txt which holds the consolidated logs var logs []byte var files []string for _, f := range z.File { files = append(files, f.Name) if strings.HasSuffix(f.Name, filename) { rc, err := f.Open() if err != nil { return nil, files, err } b, err := io.ReadAll(rc) if err != nil { return nil, files, err } rc.Close() logs = b } } return logs, files, nil } // find the .version file in a given org/repo func getVersion(hp plugin.HandlerParams, fileName string) (string, error) { return GetFile(hp, fileName, "") } func GetFile(hp plugin.HandlerParams, fileName string, commit string) (string, error) { // fetch the given file name from the org/repo opts := &github.RepositoryContentGetOptions{Ref: commit} file, _, _, err := hp.Client.Repositories().GetContents(hp.Ctx, hp.Org, hp.Repo, fileName, opts) if err != nil { return "", err } // decode the base64 encoded file contents data, err := base64.StdEncoding.DecodeString(*file.Content) if err != nil { return "", err } return strings.TrimSpace(string(data)), nil } // add a comment to a given pr func addCommentOnPR(hp plugin.HandlerParams, msg, id, namejob string, number int) error { client := hp.Client ctx := hp.Ctx org := hp.Org repo := hp.Repo // get comments for the pr opts := &github.IssueListCommentsOptions{} comments, _, err := client.Issues().ListComments(ctx, org, repo, number, opts) if err != nil { return err } for _, comment := range comments { // if the comment was not made by a bot or it doesnt have the jack link header continue if !strings.Contains(comment.GetBody(), id) || !strings.HasSuffix(comment.GetUser().GetLogin(), "[bot]") { continue } // if the comment doesnt contain a name/job pair append to the comment and update it if !strings.Contains(comment.GetBody(), namejob) { nmsg := comment.GetBody() + "\n" + msg issueComment := github.IssueComment{Body: &nmsg} if _, _, err := client.Issues().EditComment(ctx, org, repo, comment.GetID(), &issueComment); err != nil { return err } return nil } // if the body contains the name/job combo then replace that line and update comment updatedBody := []string{} scanner := bufio.NewScanner(strings.NewReader(comment.GetBody())) for scanner.Scan() { text := scanner.Text() if strings.Contains(text, namejob) { updatedBody = append(updatedBody, msg) continue } updatedBody = append(updatedBody, text) } nmsg := strings.Join(updatedBody[:], "\n") issueComment := github.IssueComment{Body: &nmsg} if _, _, err := client.Issues().EditComment(ctx, org, repo, comment.GetID(), &issueComment); err != nil { return err } return nil } // if no comment was found create a new comment msg = id + msg issueComment := github.IssueComment{Body: &msg} if _, _, err := client.Issues().CreateComment(ctx, org, repo, number, &issueComment); err != nil { return err } return nil }