...

Source file src/edge-infra.dev/pkg/edge/chariot/storage.go

Documentation: edge-infra.dev/pkg/edge/chariot

     1  package chariot
     2  
     3  import (
     4  	"context"
     5  	"crypto/md5" //nolint:gosec // md5 is trusted.
     6  	"errors"
     7  	"fmt"
     8  	"io"
     9  	"os"
    10  	"regexp"
    11  	"strings"
    12  
    13  	"cloud.google.com/go/storage"
    14  	"google.golang.org/api/googleapi"
    15  
    16  	"edge-infra.dev/pkg/edge/chariot/errorsnode"
    17  )
    18  
    19  // StorageObject is a simple structure for creating objects in storage
    20  type StorageObject struct {
    21  	Location string `json:"location"`
    22  	Content  string `json:"content,omitempty"`
    23  }
    24  
    25  // StorageInfo is a struct that contains information about storage operations.
    26  type StorageInfo struct {
    27  	// ObjectsEmpty is true when the Put/Delete operation wasn't provided any objects to Put/Delete.
    28  	ObjectsEmpty bool `json:"objects_empty,omitempty"`
    29  
    30  	// ObjectsPut contains successfully put objects.
    31  	ObjectsPut []StorageObject `json:"objects_put,omitempty"`
    32  
    33  	// ObjectsDeleted contains successfully deleted objects.
    34  	ObjectsDeleted []StorageObject `json:"objects_deleted,omitempty"`
    35  
    36  	// ObjectsDoNotExist contains objects that did not exist when deleted.
    37  	ObjectsDoNotExist []StorageObject `json:"objects_do_not_exist,omitempty"`
    38  
    39  	// Errors contains any errors that occur when putting/deleting an object.
    40  	Errors []StorageObjectError `json:"errors,omitempty"`
    41  }
    42  
    43  type StorageObjectError struct {
    44  	Object StorageObject `json:"object"`
    45  	Error  string        `json:"error"`
    46  }
    47  
    48  // Storer is a simple interface for putting and deleting storage objects.
    49  type Storer interface {
    50  	Put(ctx context.Context, objects ...StorageObject) (StorageInfo, error)
    51  	Delete(ctx context.Context, objects ...StorageObject) (StorageInfo, error)
    52  }
    53  
    54  // GoogleCloudStorage wraps a storage.Client object to satisfy the Storer interface.
    55  type GoogleCloudStorage struct {
    56  	c *storage.Client
    57  }
    58  
    59  var reTrimPathSuffix = regexp.MustCompile("/.*$")
    60  
    61  func (so *StorageObject) getGcsBucket() string {
    62  	var b = strings.TrimPrefix(so.Location, "gs://")
    63  	return reTrimPathSuffix.ReplaceAllString(b, "")
    64  }
    65  
    66  var reTrimBucketPrefix = regexp.MustCompile("^gs://[^/]+/")
    67  
    68  func (so *StorageObject) getGcsPath() string {
    69  	return reTrimBucketPrefix.ReplaceAllString(so.Location, "")
    70  }
    71  
    72  // NewGoogleCloudStorage wraps the provided client in order to satisfy the Storer interface.
    73  func NewGoogleCloudStorage(client *storage.Client) *GoogleCloudStorage {
    74  	return &GoogleCloudStorage{
    75  		c: client,
    76  	}
    77  }
    78  
    79  // IsStorerErrorRetryable returns true if the storer operation should be retried.
    80  func IsStorerErrorRetryable(err error) bool {
    81  	return isErrBucketNotExist(err) || errors.Is(err, context.DeadlineExceeded)
    82  }
    83  
    84  // isErrBucketNotExist returns true when the storage error indicates the bucket does not exist.
    85  func isErrBucketNotExist(err error) bool {
    86  	if err == nil {
    87  		return false
    88  	}
    89  
    90  	if err == storage.ErrBucketNotExist {
    91  		return true
    92  	}
    93  
    94  	var gerr *googleapi.Error
    95  	if ok := errors.As(err, &gerr); ok {
    96  		switch gerr.Code {
    97  		case 0:
    98  			// no code provided?
    99  		case 404:
   100  			// The fake-gcs-server emulator sends a 404 response when buckets do not exist.
   101  			return true
   102  		case 503:
   103  			// Seeing many 503 Service Unavailable errors when buckets do not exist yet.
   104  			// TODO check the `e.Header` for `Retry-After`.
   105  			return true
   106  		default:
   107  			// Some other HTTP Status Code?
   108  		}
   109  	}
   110  	return false
   111  }
   112  
   113  const MetadataKeyWriterHostnameAndChecksumMD5 = "writerHostnameAndChecksumMD5"
   114  
   115  var metadataHostname = os.Getenv("HOSTNAME")
   116  
   117  func (s *GoogleCloudStorage) Put(ctx context.Context, objects ...StorageObject) (StorageInfo, error) {
   118  	var si StorageInfo
   119  	if len(objects) == 0 {
   120  		si.ObjectsEmpty = true
   121  		return si, nil
   122  	}
   123  
   124  	var hostname = "chariot2"
   125  	if metadataHostname != "" {
   126  		hostname = metadataHostname
   127  	}
   128  
   129  	var errnode errorsnode.Node
   130  	for _, object := range objects {
   131  		var bucket = object.getGcsBucket()
   132  		var path = object.getGcsPath()
   133  		var w = s.c.Bucket(bucket).Object(path).NewWriter(ctx)
   134  
   135  		// md5 hash is verified by google on writes. Very neat.
   136  		var md5sum = calcContentMD5(object)
   137  		w.ObjectAttrs.MD5 = md5sum
   138  
   139  		// Set the metadata on the object.
   140  		var metaValue = fmt.Sprintf("%s-%x", hostname, md5sum)
   141  		if w.ObjectAttrs.Metadata == nil {
   142  			w.ObjectAttrs.Metadata = make(map[string]string)
   143  		}
   144  		w.ObjectAttrs.Metadata[MetadataKeyWriterHostnameAndChecksumMD5] = metaValue
   145  
   146  		// Writes are applied upon close.
   147  		var err error
   148  		if _, err = io.Copy(w, strings.NewReader(object.Content)); err != nil {
   149  			errnode.Errs = append(errnode.Errs, err)
   150  			si.Errors = append(si.Errors, StorageObjectError{
   151  				Object: object,
   152  				Error:  err.Error(),
   153  			})
   154  		} else if err = w.Close(); err != nil {
   155  			errnode.Errs = append(errnode.Errs, err)
   156  			si.Errors = append(si.Errors, StorageObjectError{
   157  				Object: object,
   158  				Error:  err.Error(),
   159  			})
   160  		} else {
   161  			si.ObjectsPut = append(si.ObjectsPut, object)
   162  		}
   163  
   164  		// Check the error to see if chariot should retry.
   165  		if isErrBucketNotExist(err) {
   166  			// Exit early since we need to retry.
   167  			return si, err
   168  		}
   169  	}
   170  	var reterr error
   171  	if len(errnode.Errs) > 0 {
   172  		errnode.Message = fmt.Sprintf("Got errors putting objects: count=%d", len(errnode.Errs))
   173  		reterr = &errnode
   174  	}
   175  	return si, reterr
   176  }
   177  
   178  func calcContentMD5(object StorageObject) []byte {
   179  	var sum = md5.Sum([]byte(object.Content)) //nolint:gosec // google expects md5
   180  	return sum[:]
   181  }
   182  
   183  // Delete deletes objects at the provided StorageObject.Location fields.
   184  func (s *GoogleCloudStorage) Delete(ctx context.Context, objects ...StorageObject) (StorageInfo, error) {
   185  	if len(objects) == 0 {
   186  		return StorageInfo{ObjectsEmpty: true}, nil
   187  	}
   188  
   189  	var errnode errorsnode.Node
   190  	var si StorageInfo
   191  	for _, o := range objects {
   192  		var object = o // recast for loop paranoia
   193  		var bucket = object.getGcsBucket()
   194  		var path = object.getGcsPath()
   195  		err := s.c.Bucket(bucket).Object(path).Delete(ctx)
   196  		if err != nil && err == storage.ErrObjectNotExist {
   197  			// Do not return an error when objects do not exist.
   198  			si.ObjectsDoNotExist = append(si.ObjectsDoNotExist, object)
   199  		} else if err != nil {
   200  			errnode.Errs = append(errnode.Errs, err)
   201  			si.Errors = append(si.Errors, StorageObjectError{Object: object, Error: err.Error()})
   202  		} else {
   203  			si.ObjectsDeleted = append(si.ObjectsDeleted, object)
   204  		}
   205  	}
   206  	var reterr error
   207  	if len(errnode.Errs) > 0 {
   208  		errnode.Message = fmt.Sprintf("Got errors deleting objects: count=%d", len(errnode.Errs))
   209  		reterr = &errnode
   210  	}
   211  	return si, reterr
   212  }
   213  

View as plain text