...

Source file src/edge-infra.dev/pkg/f8n/devinfra/gcp/job/storage/storage.go

Documentation: edge-infra.dev/pkg/f8n/devinfra/gcp/job/storage

     1  // Package storage is concerned with storing and accessing job data in Google
     2  // Cloud Storage.
     3  package storage
     4  
     5  import (
     6  	"bytes"
     7  	"context"
     8  	"fmt"
     9  	"io"
    10  	"io/fs"
    11  	"log"
    12  	"os"
    13  	"path/filepath"
    14  	"strings"
    15  
    16  	goErrors "errors"
    17  
    18  	gcs "cloud.google.com/go/storage"
    19  	"google.golang.org/api/iterator"
    20  
    21  	"edge-infra.dev/pkg/lib/cli/rags"
    22  	"edge-infra.dev/pkg/lib/errors"
    23  	"edge-infra.dev/pkg/lib/fog"
    24  )
    25  
    26  const (
    27  	DefaultBucket        = "edge-test-jobs"
    28  	DefaultTestingBucket = "jack-testing-bucket"
    29  	StartedFilename      = "started.json"  // const for started filename
    30  	FinishedFilename     = "finished.json" // const for finished filename
    31  	LogsFilename         = "logs.txt"      // const for logs. filename
    32  )
    33  
    34  // BindStorageBucketFlag binds a flag for the jobs storage bucket name to the
    35  // provided string pointer.  Use this instead of implementing your own flags
    36  func BindStorageBucketFlag(rs *rags.RagSet, d *string) {
    37  	rs.StringVar(d, "job-storage-bucket", DefaultBucket, "the storage bucket to upload job results to")
    38  }
    39  
    40  // Artifact defines a singular test file (generally xml)
    41  type Artifact struct {
    42  	Attributes *gcs.ObjectAttrs
    43  	Contents   []byte
    44  }
    45  
    46  // Files is a collection of maps for all files found in a GCP bucket
    47  type Files struct {
    48  	XML  map[string][]byte
    49  	JSON map[string][]byte
    50  	Logs map[string][]byte
    51  }
    52  
    53  func (a *Artifact) ToString() string {
    54  	return string(a.Contents)
    55  }
    56  
    57  // Option allows customizing the Storage struct during instantiation
    58  type Option func(*storageOpts)
    59  
    60  type BucketOption func(*bucketOpts)
    61  
    62  // New creates a Storage struct with the provided options.  If no bucket name
    63  // is provided via WithBucket or WithFlags, the default bucket name will be used.
    64  func New(ctx context.Context, opts ...Option) (*Storage, error) {
    65  	o := &storageOpts{}
    66  	for _, opt := range opts {
    67  		opt(o)
    68  	}
    69  
    70  	if o.client == nil {
    71  		// if client wasnt provided via options, create one using default app creds
    72  		client, err := gcs.NewClient(ctx)
    73  		if err != nil {
    74  			return nil, errors.Wrap(err)
    75  		}
    76  		o.client = client
    77  	}
    78  
    79  	return &Storage{Client: o.client}, nil
    80  }
    81  
    82  func (s *Storage) NewBucket(ctx context.Context, opts ...BucketOption) (*Bucket, error) {
    83  	o := &bucketOpts{bucketName: DefaultBucket}
    84  	for _, opt := range opts {
    85  		opt(o)
    86  	}
    87  
    88  	// create bucket handle
    89  	bucket := s.Client.Bucket(o.bucketName)
    90  	// check that the bucket exists
    91  	_, err := bucket.Attrs(ctx)
    92  	// provide specific error message when the bucket doesn't exist
    93  	if err == gcs.ErrBucketNotExist {
    94  		return nil, errors.New(fmt.Sprintf("bucket %s does not exist, please create first", o.bucketName), err)
    95  	}
    96  	// handle other errors generically
    97  	if err != nil {
    98  		return nil, errors.New("failed to check if bucket exists", err)
    99  	}
   100  	return &Bucket{bucket}, nil
   101  }
   102  
   103  // BasePath creates the base path for a job ran on GitHub Actions, the result
   104  // should be passed to Storage.UploadArtifacts as the path to use for storing
   105  // artifacts, logs, etc
   106  //
   107  // e.g. actions/repo/workflow/runid/jobname
   108  //
   109  // actions/edge-infra/presubmit/4823894443/ci
   110  func BasePath(repo, workflow, runid, jobname string) string {
   111  	return fmt.Sprintf("actions/%s/%s/%s/%s", repo, workflow, runid, jobname)
   112  }
   113  
   114  // TODO: just merge this with BasePath eventually
   115  //
   116  // e.g. argo/repo/workflow/runid
   117  //
   118  // argo/edge-infra/hourly-integration/3c7a2367-acbd-428a-8706-c9738a8f46b9
   119  func ArgoBasePath(repo, wf, runid string) string {
   120  	return fmt.Sprintf("argo/%s/%s/%s", repo, wf, runid)
   121  }
   122  
   123  // ArtifactsPath returns the path artifacts should be uploaded to within a
   124  // given job storage path.  This function exists to ensure that the content
   125  // _within_ a specific job storage directory is consistently structured regardless
   126  // of how the specific job storage directory path was constructed.
   127  func ArtifactsPath(storagePath string) string {
   128  	return filepath.Join(storagePath, "artifacts")
   129  }
   130  
   131  // LogsPath takes in the BasePath and returns the correct path to the logs
   132  func LogsPath(storagePath string) string {
   133  	return filepath.Join(storagePath, "logs")
   134  }
   135  
   136  var (
   137  	ErrEmptyDir = goErrors.New("directory is empty")
   138  )
   139  
   140  // UploadArtifacts uploads the files in dir to the provided job storage path
   141  // It does not recurse into subdirectories.
   142  func (b *Bucket) UploadArtifacts(ctx context.Context, storagePath, dir string) error {
   143  	files, err := os.ReadDir(dir)
   144  	if err != nil {
   145  		return errors.New(fmt.Sprintf("failed to read dir %s", dir), err)
   146  	}
   147  	if len(files) == 0 {
   148  		return errors.Wrap(ErrEmptyDir)
   149  	}
   150  
   151  	// upload everything except started and finished files
   152  	// so xml files are in place before finished.json notifies middlechild
   153  	var finished fs.DirEntry
   154  	for _, file := range files {
   155  		name := file.Name()
   156  
   157  		if name == StartedFilename {
   158  			err = b.write(ctx, file.Name(), storagePath, dir, file)
   159  			if err != nil {
   160  				return err
   161  			}
   162  			continue
   163  		}
   164  
   165  		// store the finished file for later
   166  		if name == FinishedFilename {
   167  			finished = file
   168  			continue
   169  		}
   170  
   171  		err = b.write(ctx, name, ArtifactsPath(storagePath), dir, file)
   172  		if err != nil {
   173  			return err
   174  		}
   175  	}
   176  
   177  	// upload finished last to be sure all files are in place before pubsub is notified
   178  	if finished != nil {
   179  		err = b.write(ctx, finished.Name(), storagePath, dir, finished)
   180  		if err != nil {
   181  			return err
   182  		}
   183  	}
   184  
   185  	return nil
   186  }
   187  
   188  func (b *Bucket) write(ctx context.Context, name, path, dir string, file fs.DirEntry) error {
   189  	// create writer for writing data to GCS for the current file
   190  	w := b.Object(filepath.Join(path, name)).NewWriter(ctx)
   191  	// create reader for reading local file
   192  	r, err := os.Open(filepath.Join(dir, file.Name()))
   193  	if err != nil {
   194  		return errors.New(fmt.Sprintf("failed to open file %s", name), err)
   195  	}
   196  	defer r.Close()
   197  	// stream local data to GCS
   198  	if _, err := io.Copy(w, r); err != nil {
   199  		return errors.New(fmt.Sprintf("failed to upload file %s", name), err)
   200  	}
   201  	if err := w.Close(); err != nil {
   202  		return errors.New(fmt.Sprintf("failed to close file %s", name), err)
   203  	}
   204  	return nil
   205  }
   206  
   207  // UploadStartedJSON creates a storagepath and uploads started.json to a gcp bucket
   208  func (b *Bucket) UploadStartedJSON(ctx context.Context, repo, workflow, id, job string, file []byte) error {
   209  	storagePath := BasePath(repo, workflow, id, job)
   210  	return b.upload(ctx, storagePath, StartedFilename, file)
   211  }
   212  
   213  // UploadFinishedJSON creates a storagepath and uploads finished.json to a gcp bucket
   214  func (b *Bucket) UploadFinishedJSON(ctx context.Context, repo, workflow, id, job string, file []byte) error {
   215  	storagePath := BasePath(repo, workflow, id, job)
   216  	return b.upload(ctx, storagePath, FinishedFilename, file)
   217  }
   218  
   219  // UploadLogs creates a storagepath and uploads logs.txt to a gcp bucket
   220  func (b *Bucket) UploadLogs(ctx context.Context, repo, workflow, id, job string, file []byte) error {
   221  	storagePath := BasePath(repo, workflow, id, job)
   222  	return b.upload(ctx, LogsPath(storagePath), LogsFilename, file)
   223  }
   224  
   225  // UploadJSON uploads files to the provided storagePath
   226  func (b *Bucket) upload(ctx context.Context, storagePath, name string, file []byte) error {
   227  	// create writer for writing data to GCS for the current file
   228  	w := b.Object(filepath.Join(storagePath, name)).NewWriter(ctx)
   229  	// create reader for reading byte[]
   230  	r := bytes.NewReader(file)
   231  
   232  	// stream local data to GCS
   233  	if _, err := io.Copy(w, r); err != nil {
   234  		return errors.New(fmt.Sprintf("failed to upload file %s", name), err)
   235  	}
   236  	if err := w.Close(); err != nil {
   237  		return errors.New(fmt.Sprintf("failed to close file %s", name), err)
   238  	}
   239  	return nil
   240  }
   241  
   242  // RetrieveArtifacts gets all (generally xml) files within a given bucket storagepath/artifacts
   243  func (b *Bucket) RetrieveArtifacts(ctx context.Context, storagePath string) ([]Artifact, error) {
   244  	query := &gcs.Query{Prefix: ArtifactsPath(storagePath)}
   245  
   246  	var result []Artifact
   247  	it := b.Objects(ctx, query)
   248  
   249  	for {
   250  		attrs, err := it.Next()
   251  		if err == iterator.Done {
   252  			break
   253  		}
   254  		if err != nil {
   255  			log.Fatal(err)
   256  		}
   257  
   258  		artifact := Artifact{Attributes: attrs}
   259  
   260  		data, err := b.readFile(ctx, attrs.Name) // data content of a file, can be type casted to string and printed
   261  		if err != nil {
   262  			return nil, fmt.Errorf("storage.RetrieveArtifacts: unable to retrieve artifact: %w", err)
   263  		}
   264  		artifact.Contents = data
   265  		result = append(result, artifact)
   266  	}
   267  
   268  	return result, nil
   269  }
   270  
   271  // RetrieveLogs grabs all the logs from a given path
   272  func (b *Bucket) RetrieveLogs(ctx context.Context, storagePath string) (map[string][]byte, error) {
   273  	query := &gcs.Query{Prefix: LogsPath(storagePath)}
   274  	logs := make(map[string][]byte)
   275  
   276  	it := b.Objects(ctx, query)
   277  
   278  	for {
   279  		attrs, err := it.Next()
   280  		if err == iterator.Done {
   281  			break
   282  		}
   283  		if err != nil {
   284  			log.Fatal(err)
   285  		}
   286  
   287  		data, err := b.readFile(ctx, attrs.Name)
   288  		if err != nil {
   289  			return logs, fmt.Errorf("storage.RetrieveArtifacts: unable to retrieve artifact: %w", err)
   290  		}
   291  
   292  		// break the path down to the file name and the extension
   293  		nameArr := strings.SplitAfter(attrs.Name, "/")
   294  		name := nameArr[len(nameArr)-1]
   295  		logs[name] = data
   296  	}
   297  
   298  	return logs, nil
   299  }
   300  
   301  // RetrieveAllFiles scans a given path and sub directories and returns all files found
   302  func (b *Bucket) RetrieveAllFiles(ctx context.Context, storagePath string) (Files, error) {
   303  	query := &gcs.Query{Prefix: storagePath}
   304  
   305  	files := Files{}
   306  	files.JSON = make(map[string][]byte)
   307  	files.XML = make(map[string][]byte)
   308  	files.Logs = make(map[string][]byte)
   309  
   310  	log := fog.FromContext(ctx)
   311  	it := b.Objects(ctx, query)
   312  
   313  	names := []string{}
   314  
   315  	for {
   316  		attrs, err := it.Next()
   317  		if err == iterator.Done {
   318  			break
   319  		}
   320  		if err != nil {
   321  			return files, err
   322  		}
   323  
   324  		// if the current item doesnt have an extension we care about continue
   325  		ext := filepath.Ext(attrs.Name)
   326  		if ext != ".json" && ext != ".xml" && ext != ".txt" && ext != ".logs" && ext != ".log" {
   327  			continue
   328  		}
   329  
   330  		data, err := b.readFile(ctx, attrs.Name)
   331  		if err != nil {
   332  			log.Error(err, "storage.RetrieveArtifacts: unable to retrieve artifact")
   333  			continue
   334  		}
   335  
   336  		// break the path down to the file name and the extension
   337  		nameArr := strings.SplitAfter(attrs.Name, "/")
   338  		name := nameArr[len(nameArr)-1]
   339  		names = append(names, name)
   340  
   341  		switch ext {
   342  		case ".json":
   343  			files.JSON[name] = data
   344  		case ".log", ".logs", ".txt":
   345  			files.Logs[name] = data
   346  		case ".xml":
   347  			files.XML[name] = data
   348  		}
   349  	}
   350  
   351  	log.Info("files found", "files", names)
   352  
   353  	return files, nil
   354  }
   355  
   356  // readFile reads the contents of a given file and returns it as a byte array
   357  func (b *Bucket) readFile(ctx context.Context, fileName string) ([]byte, error) {
   358  	rc, err := b.Object(fileName).NewReader(ctx)
   359  	if err != nil {
   360  		return nil, fmt.Errorf("failed to get handle on object %s: %w", fileName, err)
   361  	}
   362  	defer rc.Close()
   363  
   364  	slurp, err := io.ReadAll(rc)
   365  	if err != nil {
   366  		fmt.Printf("%v", err)
   367  		return nil, fmt.Errorf("failed to read file %s: %w", fileName, err)
   368  	}
   369  
   370  	return slurp, nil
   371  }
   372  

View as plain text