...

Source file src/edge-infra.dev/cmd/tools/art/argo/argo.go

Documentation: edge-infra.dev/cmd/tools/art/argo

     1  package argo
     2  
     3  import (
     4  	"context"
     5  	"encoding/base64"
     6  	"encoding/json"
     7  	"errors"
     8  	"flag"
     9  	"fmt"
    10  	"log"
    11  	"math/rand"
    12  	"os"
    13  	"path/filepath"
    14  	"strings"
    15  	"time"
    16  
    17  	"github.com/cli/go-gh/v2/pkg/api"
    18  	"github.com/peterbourgon/ff/v3"
    19  
    20  	"edge-infra.dev/pkg/f8n/devinfra/gcp/job/storage"
    21  	"edge-infra.dev/pkg/f8n/devinfra/job"
    22  	"edge-infra.dev/pkg/lib/build"
    23  	"edge-infra.dev/pkg/lib/cli/rags"
    24  	"edge-infra.dev/pkg/lib/cli/sink"
    25  	"edge-infra.dev/pkg/lib/fog"
    26  )
    27  
    28  var (
    29  	// storage bucket to upload to
    30  	bucket string
    31  	dry    bool
    32  )
    33  
    34  // info needed for publishing artifacts for jobs ran on github actions
    35  type argo struct {
    36  	workflow string // workflow name
    37  	runID    string // uuid of the workflow
    38  	filepath string
    39  	branch   string
    40  
    41  	// started
    42  	startedTimestamp  string
    43  	machine           string
    44  	pull              string
    45  	commit            string
    46  	repo              string
    47  	rosaVersionString string
    48  
    49  	// finished
    50  	finishedTimestamp string
    51  	passed            string
    52  	metadata          string
    53  }
    54  
    55  func argoFlags(a *argo) []*rags.Rag {
    56  	rs := rags.New("argo", flag.ExitOnError)
    57  	storage.BindStorageBucketFlag(rs, &bucket)
    58  
    59  	rs.StringVar(&a.workflow, "workflow", "", "argo workflow name", rags.WithRequired())
    60  	rs.StringVar(&a.runID, "run-id", "", "unique run id", rags.WithRequired())
    61  	rs.StringVar(&a.filepath, "filepath", "", "filepath to upload", rags.WithRequired())
    62  	rs.StringVar(&a.branch, "branch", "", "branch associated with the run ")
    63  
    64  	// started
    65  	rs.StringVar(&a.commit, "commit", "", "hash of the commit associated with this workflow", rags.WithRequired())
    66  	rs.StringVar(&a.startedTimestamp, "started-time", "", "timestamp of when the workflow started", rags.WithRequired())
    67  	rs.StringVar(&a.machine, "machine", "", "name of the machine the job was ran on", rags.WithRequired())
    68  	rs.StringVar(&a.pull, "pull", "", "pull number associated with this workflow")
    69  	rs.StringVar(&a.repo, "repo", "", "name of the repo associated with this workflow", rags.WithRequired())
    70  	rs.StringVar(&a.rosaVersionString, "rosa-version", "", "rosa version associated with this run")
    71  
    72  	// finished
    73  	rs.StringVar(&a.passed, "passed", "", "whether or not the workflow passed", rags.WithRequired())
    74  	rs.StringVar(&a.finishedTimestamp, "finished-time", "", "timestamp of when the workflow ended", rags.WithRequired())
    75  	rs.StringVar(&a.metadata, "metadata", "", "JSON string with key value pairs of any extra relevant info")
    76  
    77  	rs.BoolVar(&dry, "dry-run", false, "print out paths being uploaded but dont upload them", rags.WithShort("d"))
    78  	return rs.Rags()
    79  }
    80  
    81  const letterBytes = "123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
    82  
    83  func randStringBytes(n int) string {
    84  	b := make([]byte, n)
    85  	for i := range b {
    86  		timeNow := time.Now().UnixNano()
    87  		r := rand.New(rand.NewSource(timeNow)) // #nosec G404 - cosmetic id string only
    88  		b[i] = letterBytes[r.Intn(len(letterBytes))]
    89  	}
    90  	return string(b)
    91  }
    92  
    93  func New() *sink.Command {
    94  	argo := &argo{}
    95  
    96  	cmd := &sink.Command{
    97  		Use:        "argo [flags] <directories>",
    98  		Short:      "upload artifacts from Argo workflows to google cloud storage",
    99  		Extensions: []sink.Extension{},
   100  		Flags:      argoFlags(argo),
   101  		Options: []ff.Option{
   102  			ff.WithEnvVarNoPrefix(),
   103  		},
   104  		Exec: func(ctx context.Context, r sink.Run) error {
   105  			run := fmt.Sprintf("%s-%s", argo.runID, randStringBytes(6))
   106  			r.Log.V(0).Info("generated new runID", "runID", run)
   107  
   108  			err := os.WriteFile(filepath.Join(argo.filepath, "run-id.txt"), []byte(run), 0644)
   109  			if err != nil {
   110  				return err
   111  			}
   112  
   113  			storagePath := storage.ArgoBasePath(
   114  				argo.repo,
   115  				argo.workflow,
   116  				run,
   117  			)
   118  
   119  			started, err := argo.generateStarted()
   120  			if err != nil {
   121  				return err
   122  			}
   123  			r.Log.V(0).Info("created started.json", "file", started)
   124  
   125  			finished, err := argo.generateFinished(run)
   126  			if err != nil {
   127  				return err
   128  			}
   129  			r.Log.V(0).Info("created finished.json", "file", finished)
   130  
   131  			return upload(ctx, storagePath, r.Args())
   132  		},
   133  	}
   134  	return cmd
   135  }
   136  
   137  func (a *argo) generateStarted() (string, error) {
   138  	err := os.MkdirAll(filepath.Dir(a.filepath), 0644)
   139  	if err != nil {
   140  		return "", err
   141  	}
   142  
   143  	t, err := time.Parse(time.RFC3339, a.startedTimestamp)
   144  	if err != nil {
   145  		return "", err
   146  	}
   147  
   148  	version := a.rosaVersionString
   149  	if a.rosaVersionString == "" {
   150  		// check if a branch was given
   151  		if a.branch == "" {
   152  			return "", fmt.Errorf("if rosa version isnt supplied you must supply a branch name")
   153  		}
   154  
   155  		ver, err := a.fetchVersionString()
   156  		if err != nil {
   157  			return "", nil
   158  		}
   159  		version = ver
   160  	}
   161  
   162  	s, err := job.NewStarted(
   163  		job.WithCommit(a.commit),
   164  		job.WithMachine(a.machine),
   165  		job.WithRepo(a.repo),
   166  		job.WithVersion(version),
   167  		job.WithTimestamp(t.Unix()),
   168  		job.WithPull(a.pull),
   169  	)
   170  	if err != nil {
   171  		return "", err
   172  	}
   173  
   174  	sj, err := s.ToJSON()
   175  	if err != nil {
   176  		return "", err
   177  	}
   178  
   179  	if a.filepath != "" {
   180  		err = os.WriteFile(filepath.Join(a.filepath, "started.json"), sj, 0644)
   181  		if err != nil {
   182  			return "", err
   183  		}
   184  	}
   185  
   186  	return string(sj), nil
   187  }
   188  
   189  type Response struct {
   190  	Commit Commit `json:"commit"`
   191  }
   192  
   193  type Commit struct {
   194  	Committer Committer `json:"committer"`
   195  }
   196  
   197  type Committer struct {
   198  	Name  string `json:"name"`
   199  	Email string `json:"email"`
   200  	Date  string `json:"date"`
   201  }
   202  
   203  func (a *argo) fetchVersionString() (string, error) {
   204  	token := os.Getenv("TOKEN")
   205  	if token == "" {
   206  		return "", fmt.Errorf("github auth token was not found in env vars")
   207  	}
   208  
   209  	opts := api.ClientOptions{
   210  		AuthToken: token,
   211  	}
   212  	client, err := api.NewRESTClient(opts)
   213  	if err != nil {
   214  		return "", err
   215  	}
   216  
   217  	response := &Response{}
   218  	err = client.Get(
   219  		fmt.Sprintf("repos/ncrvoyix-swt-retail/%s/commits/%s", a.repo, a.commit),
   220  		&response,
   221  	)
   222  	if err != nil {
   223  		return "", err
   224  	}
   225  
   226  	version := struct {
   227  		Content string `json:"content"`
   228  	}{}
   229  	err = client.Get(
   230  		fmt.Sprintf("repos/ncrvoyix-swt-retail/%s/contents/.version?ref=%s", a.repo, a.commit),
   231  		&version,
   232  	)
   233  	if err != nil {
   234  		return "", err
   235  	}
   236  
   237  	data, err := base64.StdEncoding.DecodeString(version.Content)
   238  	if err != nil {
   239  		log.Fatal("error:", err)
   240  	}
   241  
   242  	ts, err := time.Parse(time.RFC3339, response.Commit.Committer.Date)
   243  	if err != nil {
   244  		return "", err
   245  	}
   246  
   247  	ver := &build.Version{
   248  		SemVer:           strings.TrimSpace(string(data)),
   249  		Commit:           a.commit,
   250  		Timestamp:        ts.UTC().Unix(),
   251  		ReleaseCandidate: a.branch == "master" || strings.HasPrefix(a.branch, "release/"),
   252  	}
   253  
   254  	return ver.String(), nil
   255  }
   256  
   257  func (a *argo) generateFinished(run string) (string, error) {
   258  	result := make(map[string]string)
   259  	err := json.Unmarshal([]byte(a.metadata), &result)
   260  	if err != nil {
   261  		return "", err
   262  	}
   263  
   264  	// add some extra metadata
   265  	result["run_id"] = a.runID
   266  	result["number"] = run
   267  	result["platform"] = "argo"
   268  	result["machine"] = a.machine
   269  	result["workflow_name"] = a.workflow
   270  
   271  	err = os.MkdirAll(filepath.Dir(a.filepath), 0644)
   272  	if err != nil {
   273  		return "", err
   274  	}
   275  
   276  	t, err := time.Parse(time.RFC3339, a.finishedTimestamp)
   277  	if err != nil {
   278  		return "", err
   279  	}
   280  
   281  	var p bool
   282  	switch a.passed {
   283  	case "Failed", "Error":
   284  		p = false
   285  	case "Succeeded":
   286  		p = true
   287  	}
   288  
   289  	f, err := job.NewFinished(
   290  		job.WithFinishedTimestamp(t.Unix()),
   291  		job.WithPassed(p),
   292  		job.WithMetadata(result),
   293  	)
   294  	if err != nil {
   295  		return "", err
   296  	}
   297  
   298  	fj, err := f.ToJSON()
   299  	if err != nil {
   300  		return "", err
   301  	}
   302  
   303  	if a.filepath != "" {
   304  		err = os.WriteFile(filepath.Join(a.filepath, "finished.json"), fj, 0644)
   305  		if err != nil {
   306  			return "", err
   307  		}
   308  	}
   309  
   310  	return string(fj), nil
   311  }
   312  
   313  func upload(ctx context.Context, storagePath string, dirs []string) error {
   314  	log := fog.FromContext(ctx)
   315  
   316  	s, err := storage.New(ctx)
   317  	if err != nil {
   318  		return fmt.Errorf("failed to create storage client: %w", err)
   319  	}
   320  
   321  	b, err := s.NewBucket(ctx, storage.WithBucket(bucket))
   322  	if err != nil {
   323  		return fmt.Errorf("failed to create bucket handler: %w", err)
   324  	}
   325  
   326  	log.Info("uploading job artifacts", "bucket", bucket, "storagePath", storagePath)
   327  	for _, dir := range dirs {
   328  		// dont upload results when dry running
   329  		if dry {
   330  			log.Info("[dry run] uploading", "dir", dir)
   331  			continue
   332  		}
   333  
   334  		log.Info("uploading", "dir", dir)
   335  		err := b.UploadArtifacts(ctx, storagePath, dir)
   336  		if !errors.Is(err, storage.ErrEmptyDir) && err != nil {
   337  			return fmt.Errorf("failed to upload artifacts: %w", err)
   338  		}
   339  	}
   340  
   341  	return nil
   342  }
   343  

View as plain text