// Package storage is concerned with storing and accessing job data in Google // Cloud Storage. package storage import ( "bytes" "context" "fmt" "io" "io/fs" "log" "os" "path/filepath" "strings" goErrors "errors" gcs "cloud.google.com/go/storage" "google.golang.org/api/iterator" "edge-infra.dev/pkg/lib/cli/rags" "edge-infra.dev/pkg/lib/errors" "edge-infra.dev/pkg/lib/fog" ) const ( DefaultBucket = "edge-test-jobs" DefaultTestingBucket = "jack-testing-bucket" StartedFilename = "started.json" // const for started filename FinishedFilename = "finished.json" // const for finished filename LogsFilename = "logs.txt" // const for logs. filename ) // BindStorageBucketFlag binds a flag for the jobs storage bucket name to the // provided string pointer. Use this instead of implementing your own flags func BindStorageBucketFlag(rs *rags.RagSet, d *string) { rs.StringVar(d, "job-storage-bucket", DefaultBucket, "the storage bucket to upload job results to") } // Artifact defines a singular test file (generally xml) type Artifact struct { Attributes *gcs.ObjectAttrs Contents []byte } // Files is a collection of maps for all files found in a GCP bucket type Files struct { XML map[string][]byte JSON map[string][]byte Logs map[string][]byte } func (a *Artifact) ToString() string { return string(a.Contents) } // Option allows customizing the Storage struct during instantiation type Option func(*storageOpts) type BucketOption func(*bucketOpts) // New creates a Storage struct with the provided options. If no bucket name // is provided via WithBucket or WithFlags, the default bucket name will be used. func New(ctx context.Context, opts ...Option) (*Storage, error) { o := &storageOpts{} for _, opt := range opts { opt(o) } if o.client == nil { // if client wasnt provided via options, create one using default app creds client, err := gcs.NewClient(ctx) if err != nil { return nil, errors.Wrap(err) } o.client = client } return &Storage{Client: o.client}, nil } func (s *Storage) NewBucket(ctx context.Context, opts ...BucketOption) (*Bucket, error) { o := &bucketOpts{bucketName: DefaultBucket} for _, opt := range opts { opt(o) } // create bucket handle bucket := s.Client.Bucket(o.bucketName) // check that the bucket exists _, err := bucket.Attrs(ctx) // provide specific error message when the bucket doesn't exist if err == gcs.ErrBucketNotExist { return nil, errors.New(fmt.Sprintf("bucket %s does not exist, please create first", o.bucketName), err) } // handle other errors generically if err != nil { return nil, errors.New("failed to check if bucket exists", err) } return &Bucket{bucket}, nil } // BasePath creates the base path for a job ran on GitHub Actions, the result // should be passed to Storage.UploadArtifacts as the path to use for storing // artifacts, logs, etc // // e.g. actions/repo/workflow/runid/jobname // // actions/edge-infra/presubmit/4823894443/ci func BasePath(repo, workflow, runid, jobname string) string { return fmt.Sprintf("actions/%s/%s/%s/%s", repo, workflow, runid, jobname) } // TODO: just merge this with BasePath eventually // // e.g. argo/repo/workflow/runid // // argo/edge-infra/hourly-integration/3c7a2367-acbd-428a-8706-c9738a8f46b9 func ArgoBasePath(repo, wf, runid string) string { return fmt.Sprintf("argo/%s/%s/%s", repo, wf, runid) } // ArtifactsPath returns the path artifacts should be uploaded to within a // given job storage path. This function exists to ensure that the content // _within_ a specific job storage directory is consistently structured regardless // of how the specific job storage directory path was constructed. func ArtifactsPath(storagePath string) string { return filepath.Join(storagePath, "artifacts") } // LogsPath takes in the BasePath and returns the correct path to the logs func LogsPath(storagePath string) string { return filepath.Join(storagePath, "logs") } var ( ErrEmptyDir = goErrors.New("directory is empty") ) // UploadArtifacts uploads the files in dir to the provided job storage path // It does not recurse into subdirectories. func (b *Bucket) UploadArtifacts(ctx context.Context, storagePath, dir string) error { files, err := os.ReadDir(dir) if err != nil { return errors.New(fmt.Sprintf("failed to read dir %s", dir), err) } if len(files) == 0 { return errors.Wrap(ErrEmptyDir) } // upload everything except started and finished files // so xml files are in place before finished.json notifies middlechild var finished fs.DirEntry for _, file := range files { name := file.Name() if name == StartedFilename { err = b.write(ctx, file.Name(), storagePath, dir, file) if err != nil { return err } continue } // store the finished file for later if name == FinishedFilename { finished = file continue } err = b.write(ctx, name, ArtifactsPath(storagePath), dir, file) if err != nil { return err } } // upload finished last to be sure all files are in place before pubsub is notified if finished != nil { err = b.write(ctx, finished.Name(), storagePath, dir, finished) if err != nil { return err } } return nil } func (b *Bucket) write(ctx context.Context, name, path, dir string, file fs.DirEntry) error { // create writer for writing data to GCS for the current file w := b.Object(filepath.Join(path, name)).NewWriter(ctx) // create reader for reading local file r, err := os.Open(filepath.Join(dir, file.Name())) if err != nil { return errors.New(fmt.Sprintf("failed to open file %s", name), err) } defer r.Close() // stream local data to GCS if _, err := io.Copy(w, r); err != nil { return errors.New(fmt.Sprintf("failed to upload file %s", name), err) } if err := w.Close(); err != nil { return errors.New(fmt.Sprintf("failed to close file %s", name), err) } return nil } // UploadStartedJSON creates a storagepath and uploads started.json to a gcp bucket func (b *Bucket) UploadStartedJSON(ctx context.Context, repo, workflow, id, job string, file []byte) error { storagePath := BasePath(repo, workflow, id, job) return b.upload(ctx, storagePath, StartedFilename, file) } // UploadFinishedJSON creates a storagepath and uploads finished.json to a gcp bucket func (b *Bucket) UploadFinishedJSON(ctx context.Context, repo, workflow, id, job string, file []byte) error { storagePath := BasePath(repo, workflow, id, job) return b.upload(ctx, storagePath, FinishedFilename, file) } // UploadLogs creates a storagepath and uploads logs.txt to a gcp bucket func (b *Bucket) UploadLogs(ctx context.Context, repo, workflow, id, job string, file []byte) error { storagePath := BasePath(repo, workflow, id, job) return b.upload(ctx, LogsPath(storagePath), LogsFilename, file) } // UploadJSON uploads files to the provided storagePath func (b *Bucket) upload(ctx context.Context, storagePath, name string, file []byte) error { // create writer for writing data to GCS for the current file w := b.Object(filepath.Join(storagePath, name)).NewWriter(ctx) // create reader for reading byte[] r := bytes.NewReader(file) // stream local data to GCS if _, err := io.Copy(w, r); err != nil { return errors.New(fmt.Sprintf("failed to upload file %s", name), err) } if err := w.Close(); err != nil { return errors.New(fmt.Sprintf("failed to close file %s", name), err) } return nil } // RetrieveArtifacts gets all (generally xml) files within a given bucket storagepath/artifacts func (b *Bucket) RetrieveArtifacts(ctx context.Context, storagePath string) ([]Artifact, error) { query := &gcs.Query{Prefix: ArtifactsPath(storagePath)} var result []Artifact it := b.Objects(ctx, query) for { attrs, err := it.Next() if err == iterator.Done { break } if err != nil { log.Fatal(err) } artifact := Artifact{Attributes: attrs} data, err := b.readFile(ctx, attrs.Name) // data content of a file, can be type casted to string and printed if err != nil { return nil, fmt.Errorf("storage.RetrieveArtifacts: unable to retrieve artifact: %w", err) } artifact.Contents = data result = append(result, artifact) } return result, nil } // RetrieveLogs grabs all the logs from a given path func (b *Bucket) RetrieveLogs(ctx context.Context, storagePath string) (map[string][]byte, error) { query := &gcs.Query{Prefix: LogsPath(storagePath)} logs := make(map[string][]byte) it := b.Objects(ctx, query) for { attrs, err := it.Next() if err == iterator.Done { break } if err != nil { log.Fatal(err) } data, err := b.readFile(ctx, attrs.Name) if err != nil { return logs, fmt.Errorf("storage.RetrieveArtifacts: unable to retrieve artifact: %w", err) } // break the path down to the file name and the extension nameArr := strings.SplitAfter(attrs.Name, "/") name := nameArr[len(nameArr)-1] logs[name] = data } return logs, nil } // RetrieveAllFiles scans a given path and sub directories and returns all files found func (b *Bucket) RetrieveAllFiles(ctx context.Context, storagePath string) (Files, error) { query := &gcs.Query{Prefix: storagePath} files := Files{} files.JSON = make(map[string][]byte) files.XML = make(map[string][]byte) files.Logs = make(map[string][]byte) log := fog.FromContext(ctx) it := b.Objects(ctx, query) names := []string{} for { attrs, err := it.Next() if err == iterator.Done { break } if err != nil { return files, err } // if the current item doesnt have an extension we care about continue ext := filepath.Ext(attrs.Name) if ext != ".json" && ext != ".xml" && ext != ".txt" && ext != ".logs" && ext != ".log" { continue } data, err := b.readFile(ctx, attrs.Name) if err != nil { log.Error(err, "storage.RetrieveArtifacts: unable to retrieve artifact") continue } // break the path down to the file name and the extension nameArr := strings.SplitAfter(attrs.Name, "/") name := nameArr[len(nameArr)-1] names = append(names, name) switch ext { case ".json": files.JSON[name] = data case ".log", ".logs", ".txt": files.Logs[name] = data case ".xml": files.XML[name] = data } } log.Info("files found", "files", names) return files, nil } // readFile reads the contents of a given file and returns it as a byte array func (b *Bucket) readFile(ctx context.Context, fileName string) ([]byte, error) { rc, err := b.Object(fileName).NewReader(ctx) if err != nil { return nil, fmt.Errorf("failed to get handle on object %s: %w", fileName, err) } defer rc.Close() slurp, err := io.ReadAll(rc) if err != nil { fmt.Printf("%v", err) return nil, fmt.Errorf("failed to read file %s: %w", fileName, err) } return slurp, nil }