package chariot import ( "context" "crypto/md5" //nolint:gosec // md5 is trusted. "errors" "fmt" "io" "os" "regexp" "strings" "cloud.google.com/go/storage" "google.golang.org/api/googleapi" "edge-infra.dev/pkg/edge/chariot/errorsnode" ) // StorageObject is a simple structure for creating objects in storage type StorageObject struct { Location string `json:"location"` Content string `json:"content,omitempty"` } // StorageInfo is a struct that contains information about storage operations. type StorageInfo struct { // ObjectsEmpty is true when the Put/Delete operation wasn't provided any objects to Put/Delete. ObjectsEmpty bool `json:"objects_empty,omitempty"` // ObjectsPut contains successfully put objects. ObjectsPut []StorageObject `json:"objects_put,omitempty"` // ObjectsDeleted contains successfully deleted objects. ObjectsDeleted []StorageObject `json:"objects_deleted,omitempty"` // ObjectsDoNotExist contains objects that did not exist when deleted. ObjectsDoNotExist []StorageObject `json:"objects_do_not_exist,omitempty"` // Errors contains any errors that occur when putting/deleting an object. Errors []StorageObjectError `json:"errors,omitempty"` } type StorageObjectError struct { Object StorageObject `json:"object"` Error string `json:"error"` } // Storer is a simple interface for putting and deleting storage objects. type Storer interface { Put(ctx context.Context, objects ...StorageObject) (StorageInfo, error) Delete(ctx context.Context, objects ...StorageObject) (StorageInfo, error) } // GoogleCloudStorage wraps a storage.Client object to satisfy the Storer interface. type GoogleCloudStorage struct { c *storage.Client } var reTrimPathSuffix = regexp.MustCompile("/.*$") func (so *StorageObject) getGcsBucket() string { var b = strings.TrimPrefix(so.Location, "gs://") return reTrimPathSuffix.ReplaceAllString(b, "") } var reTrimBucketPrefix = regexp.MustCompile("^gs://[^/]+/") func (so *StorageObject) getGcsPath() string { return reTrimBucketPrefix.ReplaceAllString(so.Location, "") } // NewGoogleCloudStorage wraps the provided client in order to satisfy the Storer interface. func NewGoogleCloudStorage(client *storage.Client) *GoogleCloudStorage { return &GoogleCloudStorage{ c: client, } } // IsStorerErrorRetryable returns true if the storer operation should be retried. func IsStorerErrorRetryable(err error) bool { return isErrBucketNotExist(err) || errors.Is(err, context.DeadlineExceeded) } // isErrBucketNotExist returns true when the storage error indicates the bucket does not exist. func isErrBucketNotExist(err error) bool { if err == nil { return false } if err == storage.ErrBucketNotExist { return true } var gerr *googleapi.Error if ok := errors.As(err, &gerr); ok { switch gerr.Code { case 0: // no code provided? case 404: // The fake-gcs-server emulator sends a 404 response when buckets do not exist. return true case 503: // Seeing many 503 Service Unavailable errors when buckets do not exist yet. // TODO check the `e.Header` for `Retry-After`. return true default: // Some other HTTP Status Code? } } return false } const MetadataKeyWriterHostnameAndChecksumMD5 = "writerHostnameAndChecksumMD5" var metadataHostname = os.Getenv("HOSTNAME") func (s *GoogleCloudStorage) Put(ctx context.Context, objects ...StorageObject) (StorageInfo, error) { var si StorageInfo if len(objects) == 0 { si.ObjectsEmpty = true return si, nil } var hostname = "chariot2" if metadataHostname != "" { hostname = metadataHostname } var errnode errorsnode.Node for _, object := range objects { var bucket = object.getGcsBucket() var path = object.getGcsPath() var w = s.c.Bucket(bucket).Object(path).NewWriter(ctx) // md5 hash is verified by google on writes. Very neat. var md5sum = calcContentMD5(object) w.ObjectAttrs.MD5 = md5sum // Set the metadata on the object. var metaValue = fmt.Sprintf("%s-%x", hostname, md5sum) if w.ObjectAttrs.Metadata == nil { w.ObjectAttrs.Metadata = make(map[string]string) } w.ObjectAttrs.Metadata[MetadataKeyWriterHostnameAndChecksumMD5] = metaValue // Writes are applied upon close. var err error if _, err = io.Copy(w, strings.NewReader(object.Content)); err != nil { errnode.Errs = append(errnode.Errs, err) si.Errors = append(si.Errors, StorageObjectError{ Object: object, Error: err.Error(), }) } else if err = w.Close(); err != nil { errnode.Errs = append(errnode.Errs, err) si.Errors = append(si.Errors, StorageObjectError{ Object: object, Error: err.Error(), }) } else { si.ObjectsPut = append(si.ObjectsPut, object) } // Check the error to see if chariot should retry. if isErrBucketNotExist(err) { // Exit early since we need to retry. return si, err } } var reterr error if len(errnode.Errs) > 0 { errnode.Message = fmt.Sprintf("Got errors putting objects: count=%d", len(errnode.Errs)) reterr = &errnode } return si, reterr } func calcContentMD5(object StorageObject) []byte { var sum = md5.Sum([]byte(object.Content)) //nolint:gosec // google expects md5 return sum[:] } // Delete deletes objects at the provided StorageObject.Location fields. func (s *GoogleCloudStorage) Delete(ctx context.Context, objects ...StorageObject) (StorageInfo, error) { if len(objects) == 0 { return StorageInfo{ObjectsEmpty: true}, nil } var errnode errorsnode.Node var si StorageInfo for _, o := range objects { var object = o // recast for loop paranoia var bucket = object.getGcsBucket() var path = object.getGcsPath() err := s.c.Bucket(bucket).Object(path).Delete(ctx) if err != nil && err == storage.ErrObjectNotExist { // Do not return an error when objects do not exist. si.ObjectsDoNotExist = append(si.ObjectsDoNotExist, object) } else if err != nil { errnode.Errs = append(errnode.Errs, err) si.Errors = append(si.Errors, StorageObjectError{Object: object, Error: err.Error()}) } else { si.ObjectsDeleted = append(si.ObjectsDeleted, object) } } var reterr error if len(errnode.Errs) > 0 { errnode.Message = fmt.Sprintf("Got errors deleting objects: count=%d", len(errnode.Errs)) reterr = &errnode } return si, reterr }