...

Source file src/edge-infra.dev/pkg/f8n/devinfra/jack/plugin/uploadjob/handler.go

Documentation: edge-infra.dev/pkg/f8n/devinfra/jack/plugin/uploadjob

     1  package uploadjob
     2  
     3  import (
     4  	"archive/zip"
     5  	"bufio"
     6  	"encoding/base64"
     7  	"fmt"
     8  	"io"
     9  	"net/http"
    10  	"os"
    11  	"strings"
    12  
    13  	"github.com/google/go-github/v47/github"
    14  
    15  	"edge-infra.dev/pkg/f8n/devinfra/gcp/job/storage"
    16  	"edge-infra.dev/pkg/f8n/devinfra/jack/constants"
    17  	"edge-infra.dev/pkg/f8n/devinfra/jack/plugin"
    18  	"edge-infra.dev/pkg/f8n/devinfra/job"
    19  	"edge-infra.dev/pkg/lib/build"
    20  )
    21  
    22  func init() {
    23  	plugin.RegisterWorkflowEventHandler(constants.PluginUploadJob, handleWorkflowEvent)
    24  	plugin.RegisterWorkflowRunEventHandler(constants.PluginUploadJob, handleWorkflowRunEvent)
    25  }
    26  
    27  // handleWorkflowEvent handles events when a workflow has started
    28  func handleWorkflowEvent(hp plugin.HandlerParams, wf github.WorkflowJobEvent) {
    29  	hp.Log.WithName(constants.PluginUploadJob)
    30  	// exit if the job action isnt in_progress
    31  	if wf.GetAction() != "in_progress" {
    32  		return
    33  	}
    34  
    35  	ctx := hp.Ctx
    36  	log := hp.Log
    37  	client := hp.Client
    38  	org := hp.Org
    39  	repo := hp.Repo
    40  
    41  	log.Info("Running uploadjob plugin")
    42  	log.Info("in progress job detected")
    43  
    44  	// get the details of this workflow run
    45  	runs, _, err := client.Actions().GetWorkflowRunByID(ctx, org, repo, wf.WorkflowJob.GetRunID())
    46  	if err != nil {
    47  		log.Error(err, "Failed to get workflow run by ID")
    48  		return
    49  	}
    50  
    51  	// check if theres a pr linked to this workflow run
    52  	var pr int
    53  	if len(runs.PullRequests) > 0 {
    54  		pr = runs.PullRequests[0].GetNumber()
    55  	}
    56  
    57  	timestamp := wf.WorkflowJob.GetStartedAt().UTC().Unix()
    58  	machine := wf.WorkflowJob.Labels[0] // TODO dont just grab the first label
    59  	headsha := wf.WorkflowJob.GetHeadSHA()
    60  	number := fmt.Sprintf("%d-%d", wf.WorkflowJob.GetRunID(), runs.GetRunAttempt())
    61  	jobName := wf.WorkflowJob.GetName()
    62  	workflow := *runs.Name
    63  	branch := *runs.HeadBranch
    64  
    65  	// grab the version from the repo
    66  	base, err := getVersion(hp, build.VersionFile)
    67  	if err != nil {
    68  		log.Error(err, "Failed to get version")
    69  	}
    70  
    71  	// get the details of the head commit
    72  	opts := &github.ListOptions{}
    73  	commit, _, err := client.Repositories().GetCommit(ctx, org, repo, headsha, opts)
    74  	if err != nil {
    75  		log.Error(err, "Failed to get commit from repo")
    76  		return
    77  	}
    78  
    79  	ghrepo, _, err := client.Repositories().Get(ctx, org, repo)
    80  	if err != nil {
    81  		log.Error(err, "failed to get default branch for repo")
    82  		return
    83  	}
    84  
    85  	ver := &build.Version{
    86  		SemVer:           base,
    87  		Commit:           headsha,
    88  		Timestamp:        commit.GetCommitter().GetCreatedAt().UTC().Unix(),
    89  		ReleaseCandidate: branch == *ghrepo.DefaultBranch || strings.HasPrefix(branch, "release/"),
    90  	}
    91  
    92  	// create the started struct
    93  	started, err := job.NewStarted(
    94  		job.WithCommit(headsha),
    95  		job.WithMachine(machine),
    96  		job.WithRepo(repo),
    97  		job.WithTimestamp(timestamp),
    98  		job.WithVersion(ver.String()),
    99  		job.WithPull(fmt.Sprint(pr)),
   100  	)
   101  	if err != nil {
   102  		log.Error(err, "failed to create started struct")
   103  		return
   104  	}
   105  
   106  	// generate json
   107  	startedjson, err := started.ToJSON()
   108  	if err != nil {
   109  		log.Error(err, "failed to parse started to json")
   110  		return
   111  	}
   112  
   113  	log.Info("creating storage client")
   114  
   115  	// send started to all configured buckets
   116  	for _, job := range hp.Params.Uploadjob {
   117  		log.Info(fmt.Sprintf("uploading to the %s bucket", job.Bucket))
   118  		s, err := storage.New(ctx)
   119  		if err != nil {
   120  			log.Error(err, "failed to create storage client")
   121  			return
   122  		}
   123  
   124  		b, err := s.NewBucket(ctx, storage.WithBucket(job.Bucket))
   125  		if err != nil {
   126  			log.Error(err, "failed to create bucket handler")
   127  			return
   128  		}
   129  
   130  		log.Info(fmt.Sprintf("attempting to upload started file to %s", storage.BasePath(repo, workflow, number, jobName)))
   131  		// attempt to upload the tmp file
   132  		err = b.UploadStartedJSON(ctx, repo, workflow, number, jobName, startedjson)
   133  		if err != nil {
   134  			log.Error(err, "failed to upload started json")
   135  			return
   136  		}
   137  	}
   138  }
   139  
   140  // handleWorkflowRunEvent handles workflow events when they have finished
   141  func handleWorkflowRunEvent(hp plugin.HandlerParams, wfr github.WorkflowRunEvent) {
   142  	hp.Log.WithName(constants.PluginUploadJob)
   143  	// exit if the action isnt completed
   144  	if wfr.GetAction() != "completed" {
   145  		return
   146  	}
   147  
   148  	client := hp.Client
   149  	ctx := hp.Ctx
   150  	log := hp.Log
   151  	org := hp.Org
   152  	repo := hp.Repo
   153  	suiteID := wfr.WorkflowRun.GetCheckSuiteID()
   154  
   155  	log.Info("completed run detected")
   156  
   157  	// fetch the check suite run to get the jobID
   158  	opts := &github.ListCheckRunsOptions{}
   159  	checkrun, _, err := client.Checks().ListCheckRunsCheckSuite(ctx, org, repo, suiteID, opts)
   160  	if err != nil {
   161  		log.Error(err, "Failed to lists check runs for a check suite")
   162  		return
   163  	}
   164  
   165  	// go through each job in the checkrun
   166  	for _, run := range checkrun.CheckRuns {
   167  		jobName := run.GetName()
   168  		log.Info(fmt.Sprintf("attempting to upload finished data for job %+v", jobName))
   169  
   170  		// get the workflow logs
   171  		logs, err := getWorkflowRunLogs(hp, wfr, jobName)
   172  		if err != nil {
   173  			log.Error(err, "failed to get workflow logs")
   174  		}
   175  
   176  		// Determine if the run passed or failed
   177  		// possible states: action_required, cancelled, failure, neutral, success, skipped, stale, or timed_out
   178  		passed := false
   179  		outcome := "failed"
   180  		if run.GetConclusion() == "success" {
   181  			passed = true
   182  			outcome = "passed"
   183  		}
   184  
   185  		workflow := wfr.WorkflowRun.GetName()
   186  		runID := fmt.Sprint(wfr.WorkflowRun.GetID())
   187  		attempt := fmt.Sprint(wfr.WorkflowRun.GetRunAttempt())
   188  		number := fmt.Sprintf("%s-%s", runID, attempt)
   189  
   190  		if len(wfr.WorkflowRun.PullRequests) > 0 {
   191  			pr := wfr.WorkflowRun.PullRequests[0].Number
   192  			nameJobName := fmt.Sprintf("%s/%s", workflow, jobName)
   193  			nameJobNameID := fmt.Sprintf("%s/%s/%s", workflow, jobName, number)
   194  			msg := fmt.Sprintf("| %s | [%s](https://gridbug.edge-infra.dev/%s/%s)|", nameJobName, outcome, repo, nameJobNameID)
   195  
   196  			err := addCommentOnPR(hp, msg, constants.PrCommentIdentifier, nameJobName, *pr)
   197  			if err != nil {
   198  				log.Error(err, "Failed to add a comment on the PR")
   199  			}
   200  		}
   201  
   202  		timestamp := run.GetCompletedAt().UTC().Unix()
   203  
   204  		metadata := make(map[string]string)
   205  		metadata["job_name"] = jobName
   206  		metadata["workflow_name"] = workflow
   207  		metadata["run_id"] = runID
   208  		metadata["number"] = number
   209  		metadata["attempt"] = attempt
   210  		metadata["platform"] = "actions"
   211  
   212  		// create the finished struct
   213  		finished, err := job.NewFinished(
   214  			job.WithFinishedTimestamp(timestamp),
   215  			job.WithPassed(passed),
   216  			job.WithMetadata(metadata),
   217  		)
   218  		if err != nil {
   219  			log.Error(err, "failed to create finished struct")
   220  			return
   221  		}
   222  
   223  		// generate json
   224  		finishedJSON, err := finished.ToJSON()
   225  		if err != nil {
   226  			log.Error(err, "failed to generate finished json")
   227  			return
   228  		}
   229  
   230  		log.Info("creating storage client")
   231  
   232  		// send started to all configured buckets
   233  		for _, job := range hp.Params.Uploadjob {
   234  			log.Info(fmt.Sprintf("uploading to the %s bucket", job.Bucket))
   235  			s, err := storage.New(ctx)
   236  			if err != nil {
   237  				log.Error(err, "failed to create storage client")
   238  				return
   239  			}
   240  
   241  			b, err := s.NewBucket(ctx, storage.WithBucket(job.Bucket))
   242  			if err != nil {
   243  				log.Error(err, "failed to create bucket handler")
   244  				return
   245  			}
   246  
   247  			base := storage.BasePath(repo, workflow, number, jobName)
   248  			log.Info(fmt.Sprintf("attempting to upload finished file to %s", base))
   249  
   250  			err = b.UploadFinishedJSON(ctx, repo, workflow, number, jobName, finishedJSON)
   251  			if err != nil {
   252  				log.Error(err, "failed to upload finished json")
   253  				return
   254  			}
   255  			log.Info(fmt.Sprintf("attempting to upload logs to %s", storage.LogsPath(base)))
   256  
   257  			err = b.UploadLogs(ctx, repo, workflow, number, jobName, logs)
   258  			if err != nil {
   259  				log.Error(err, "failed to upload logs")
   260  				return
   261  			}
   262  		}
   263  	}
   264  }
   265  
   266  func getWorkflowRunLogs(hp plugin.HandlerParams, wfr github.WorkflowRunEvent, jobName string) ([]byte, error) {
   267  	// get the url for the logs
   268  	nurl, _, err := hp.Client.Actions().GetWorkflowRunLogs(hp.Ctx, hp.Org, hp.Repo, wfr.WorkflowRun.GetID(), true)
   269  	if err != nil {
   270  		return nil, err
   271  	}
   272  
   273  	// get the logs
   274  	getresp, err := http.Get(nurl.String())
   275  	if err != nil {
   276  		return nil, err
   277  	}
   278  	defer getresp.Body.Close()
   279  
   280  	// read the body into a byte array
   281  	body, err := io.ReadAll(getresp.Body)
   282  	if err != nil {
   283  		return nil, err
   284  	}
   285  
   286  	//  create a temp file and defer its removal
   287  	tmpfile, err := os.CreateTemp("", "logs-*.zip")
   288  	if err != nil {
   289  		return nil, err
   290  	}
   291  	defer os.Remove(tmpfile.Name())
   292  
   293  	// write the byte array to the temp file and close it
   294  	if _, err := tmpfile.Write(body); err != nil {
   295  		return nil, err
   296  	}
   297  	if err := tmpfile.Close(); err != nil {
   298  		return nil, err
   299  	}
   300  
   301  	// unzip the file and get the consolidated logs
   302  	logs, files, err := unzip(tmpfile.Name(), jobName)
   303  	hp.Log.Logger.Info("unzipped log file", "logs", string(logs), "files", files)
   304  	return logs, err
   305  }
   306  
   307  // unzip takes in a file path and unzips it
   308  // returns the body of a specific file within the zipped logs
   309  func unzip(path string, jobName string) ([]byte, []string, error) {
   310  	// open the zip file
   311  	z, err := zip.OpenReader(path)
   312  	if err != nil {
   313  		return nil, nil, err
   314  	}
   315  	defer z.Close()
   316  
   317  	filename := jobName + ".txt"
   318  
   319  	// search for the {jobname}.txt which holds the consolidated logs
   320  	var logs []byte
   321  	var files []string
   322  	for _, f := range z.File {
   323  		files = append(files, f.Name)
   324  		if strings.HasSuffix(f.Name, filename) {
   325  			rc, err := f.Open()
   326  			if err != nil {
   327  				return nil, files, err
   328  			}
   329  			b, err := io.ReadAll(rc)
   330  			if err != nil {
   331  				return nil, files, err
   332  			}
   333  			rc.Close()
   334  			logs = b
   335  		}
   336  	}
   337  	return logs, files, nil
   338  }
   339  
   340  // find the .version file in a given org/repo
   341  func getVersion(hp plugin.HandlerParams, fileName string) (string, error) {
   342  	return GetFile(hp, fileName, "")
   343  }
   344  
   345  func GetFile(hp plugin.HandlerParams, fileName string, commit string) (string, error) {
   346  	// fetch the given file name from the org/repo
   347  	opts := &github.RepositoryContentGetOptions{Ref: commit}
   348  	file, _, _, err := hp.Client.Repositories().GetContents(hp.Ctx, hp.Org, hp.Repo, fileName, opts)
   349  	if err != nil {
   350  		return "", err
   351  	}
   352  	// decode the base64 encoded file contents
   353  	data, err := base64.StdEncoding.DecodeString(*file.Content)
   354  	if err != nil {
   355  		return "", err
   356  	}
   357  	return strings.TrimSpace(string(data)), nil
   358  }
   359  
   360  // add a comment to a given pr
   361  func addCommentOnPR(hp plugin.HandlerParams, msg, id, namejob string, number int) error {
   362  	client := hp.Client
   363  	ctx := hp.Ctx
   364  	org := hp.Org
   365  	repo := hp.Repo
   366  
   367  	// get comments for the pr
   368  	opts := &github.IssueListCommentsOptions{}
   369  	comments, _, err := client.Issues().ListComments(ctx, org, repo, number, opts)
   370  	if err != nil {
   371  		return err
   372  	}
   373  
   374  	for _, comment := range comments {
   375  		// if the comment was not made by a bot or it doesnt have the jack link header continue
   376  		if !strings.Contains(comment.GetBody(), id) || !strings.HasSuffix(comment.GetUser().GetLogin(), "[bot]") {
   377  			continue
   378  		}
   379  
   380  		// if the comment doesnt contain a name/job pair append to the comment and update it
   381  		if !strings.Contains(comment.GetBody(), namejob) {
   382  			nmsg := comment.GetBody() + "\n" + msg
   383  			issueComment := github.IssueComment{Body: &nmsg}
   384  			if _, _, err := client.Issues().EditComment(ctx, org, repo, comment.GetID(), &issueComment); err != nil {
   385  				return err
   386  			}
   387  			return nil
   388  		}
   389  
   390  		// if the body contains the name/job combo then replace that line and update comment
   391  		updatedBody := []string{}
   392  		scanner := bufio.NewScanner(strings.NewReader(comment.GetBody()))
   393  		for scanner.Scan() {
   394  			text := scanner.Text()
   395  			if strings.Contains(text, namejob) {
   396  				updatedBody = append(updatedBody, msg)
   397  				continue
   398  			}
   399  			updatedBody = append(updatedBody, text)
   400  		}
   401  		nmsg := strings.Join(updatedBody[:], "\n")
   402  		issueComment := github.IssueComment{Body: &nmsg}
   403  		if _, _, err := client.Issues().EditComment(ctx, org, repo, comment.GetID(), &issueComment); err != nil {
   404  			return err
   405  		}
   406  		return nil
   407  	}
   408  
   409  	// if no comment was found create a new comment
   410  	msg = id + msg
   411  	issueComment := github.IssueComment{Body: &msg}
   412  	if _, _, err := client.Issues().CreateComment(ctx, org, repo, number, &issueComment); err != nil {
   413  		return err
   414  	}
   415  	return nil
   416  }
   417  

View as plain text