package argo import ( "context" "encoding/base64" "encoding/json" "errors" "flag" "fmt" "log" "math/rand" "os" "path/filepath" "strings" "time" "github.com/cli/go-gh/v2/pkg/api" "github.com/peterbourgon/ff/v3" "edge-infra.dev/pkg/f8n/devinfra/gcp/job/storage" "edge-infra.dev/pkg/f8n/devinfra/job" "edge-infra.dev/pkg/lib/build" "edge-infra.dev/pkg/lib/cli/rags" "edge-infra.dev/pkg/lib/cli/sink" "edge-infra.dev/pkg/lib/fog" ) var ( // storage bucket to upload to bucket string dry bool ) // info needed for publishing artifacts for jobs ran on github actions type argo struct { workflow string // workflow name runID string // uuid of the workflow filepath string branch string // started startedTimestamp string machine string pull string commit string repo string rosaVersionString string // finished finishedTimestamp string passed string metadata string } func argoFlags(a *argo) []*rags.Rag { rs := rags.New("argo", flag.ExitOnError) storage.BindStorageBucketFlag(rs, &bucket) rs.StringVar(&a.workflow, "workflow", "", "argo workflow name", rags.WithRequired()) rs.StringVar(&a.runID, "run-id", "", "unique run id", rags.WithRequired()) rs.StringVar(&a.filepath, "filepath", "", "filepath to upload", rags.WithRequired()) rs.StringVar(&a.branch, "branch", "", "branch associated with the run ") // started rs.StringVar(&a.commit, "commit", "", "hash of the commit associated with this workflow", rags.WithRequired()) rs.StringVar(&a.startedTimestamp, "started-time", "", "timestamp of when the workflow started", rags.WithRequired()) rs.StringVar(&a.machine, "machine", "", "name of the machine the job was ran on", rags.WithRequired()) rs.StringVar(&a.pull, "pull", "", "pull number associated with this workflow") rs.StringVar(&a.repo, "repo", "", "name of the repo associated with this workflow", rags.WithRequired()) rs.StringVar(&a.rosaVersionString, "rosa-version", "", "rosa version associated with this run") // finished rs.StringVar(&a.passed, "passed", "", "whether or not the workflow passed", rags.WithRequired()) rs.StringVar(&a.finishedTimestamp, "finished-time", "", "timestamp of when the workflow ended", rags.WithRequired()) rs.StringVar(&a.metadata, "metadata", "", "JSON string with key value pairs of any extra relevant info") rs.BoolVar(&dry, "dry-run", false, "print out paths being uploaded but dont upload them", rags.WithShort("d")) return rs.Rags() } const letterBytes = "123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" func randStringBytes(n int) string { b := make([]byte, n) for i := range b { timeNow := time.Now().UnixNano() r := rand.New(rand.NewSource(timeNow)) // #nosec G404 - cosmetic id string only b[i] = letterBytes[r.Intn(len(letterBytes))] } return string(b) } func New() *sink.Command { argo := &argo{} cmd := &sink.Command{ Use: "argo [flags] ", Short: "upload artifacts from Argo workflows to google cloud storage", Extensions: []sink.Extension{}, Flags: argoFlags(argo), Options: []ff.Option{ ff.WithEnvVarNoPrefix(), }, Exec: func(ctx context.Context, r sink.Run) error { run := fmt.Sprintf("%s-%s", argo.runID, randStringBytes(6)) r.Log.V(0).Info("generated new runID", "runID", run) err := os.WriteFile(filepath.Join(argo.filepath, "run-id.txt"), []byte(run), 0644) if err != nil { return err } storagePath := storage.ArgoBasePath( argo.repo, argo.workflow, run, ) started, err := argo.generateStarted() if err != nil { return err } r.Log.V(0).Info("created started.json", "file", started) finished, err := argo.generateFinished(run) if err != nil { return err } r.Log.V(0).Info("created finished.json", "file", finished) return upload(ctx, storagePath, r.Args()) }, } return cmd } func (a *argo) generateStarted() (string, error) { err := os.MkdirAll(filepath.Dir(a.filepath), 0644) if err != nil { return "", err } t, err := time.Parse(time.RFC3339, a.startedTimestamp) if err != nil { return "", err } version := a.rosaVersionString if a.rosaVersionString == "" { // check if a branch was given if a.branch == "" { return "", fmt.Errorf("if rosa version isnt supplied you must supply a branch name") } ver, err := a.fetchVersionString() if err != nil { return "", nil } version = ver } s, err := job.NewStarted( job.WithCommit(a.commit), job.WithMachine(a.machine), job.WithRepo(a.repo), job.WithVersion(version), job.WithTimestamp(t.Unix()), job.WithPull(a.pull), ) if err != nil { return "", err } sj, err := s.ToJSON() if err != nil { return "", err } if a.filepath != "" { err = os.WriteFile(filepath.Join(a.filepath, "started.json"), sj, 0644) if err != nil { return "", err } } return string(sj), nil } type Response struct { Commit Commit `json:"commit"` } type Commit struct { Committer Committer `json:"committer"` } type Committer struct { Name string `json:"name"` Email string `json:"email"` Date string `json:"date"` } func (a *argo) fetchVersionString() (string, error) { token := os.Getenv("TOKEN") if token == "" { return "", fmt.Errorf("github auth token was not found in env vars") } opts := api.ClientOptions{ AuthToken: token, } client, err := api.NewRESTClient(opts) if err != nil { return "", err } response := &Response{} err = client.Get( fmt.Sprintf("repos/ncrvoyix-swt-retail/%s/commits/%s", a.repo, a.commit), &response, ) if err != nil { return "", err } version := struct { Content string `json:"content"` }{} err = client.Get( fmt.Sprintf("repos/ncrvoyix-swt-retail/%s/contents/.version?ref=%s", a.repo, a.commit), &version, ) if err != nil { return "", err } data, err := base64.StdEncoding.DecodeString(version.Content) if err != nil { log.Fatal("error:", err) } ts, err := time.Parse(time.RFC3339, response.Commit.Committer.Date) if err != nil { return "", err } ver := &build.Version{ SemVer: strings.TrimSpace(string(data)), Commit: a.commit, Timestamp: ts.UTC().Unix(), ReleaseCandidate: a.branch == "master" || strings.HasPrefix(a.branch, "release/"), } return ver.String(), nil } func (a *argo) generateFinished(run string) (string, error) { result := make(map[string]string) err := json.Unmarshal([]byte(a.metadata), &result) if err != nil { return "", err } // add some extra metadata result["run_id"] = a.runID result["number"] = run result["platform"] = "argo" result["machine"] = a.machine result["workflow_name"] = a.workflow err = os.MkdirAll(filepath.Dir(a.filepath), 0644) if err != nil { return "", err } t, err := time.Parse(time.RFC3339, a.finishedTimestamp) if err != nil { return "", err } var p bool switch a.passed { case "Failed", "Error": p = false case "Succeeded": p = true } f, err := job.NewFinished( job.WithFinishedTimestamp(t.Unix()), job.WithPassed(p), job.WithMetadata(result), ) if err != nil { return "", err } fj, err := f.ToJSON() if err != nil { return "", err } if a.filepath != "" { err = os.WriteFile(filepath.Join(a.filepath, "finished.json"), fj, 0644) if err != nil { return "", err } } return string(fj), nil } func upload(ctx context.Context, storagePath string, dirs []string) error { log := fog.FromContext(ctx) s, err := storage.New(ctx) if err != nil { return fmt.Errorf("failed to create storage client: %w", err) } b, err := s.NewBucket(ctx, storage.WithBucket(bucket)) if err != nil { return fmt.Errorf("failed to create bucket handler: %w", err) } log.Info("uploading job artifacts", "bucket", bucket, "storagePath", storagePath) for _, dir := range dirs { // dont upload results when dry running if dry { log.Info("[dry run] uploading", "dir", dir) continue } log.Info("uploading", "dir", dir) err := b.UploadArtifacts(ctx, storagePath, dir) if !errors.Is(err, storage.ErrEmptyDir) && err != nil { return fmt.Errorf("failed to upload artifacts: %w", err) } } return nil }