1 package chariot
2
3 import (
4 "context"
5 "crypto/md5"
6 "errors"
7 "fmt"
8 "io"
9 "os"
10 "regexp"
11 "strings"
12
13 "cloud.google.com/go/storage"
14 "google.golang.org/api/googleapi"
15
16 "edge-infra.dev/pkg/edge/chariot/errorsnode"
17 )
18
19
20 type StorageObject struct {
21 Location string `json:"location"`
22 Content string `json:"content,omitempty"`
23 }
24
25
26 type StorageInfo struct {
27
28 ObjectsEmpty bool `json:"objects_empty,omitempty"`
29
30
31 ObjectsPut []StorageObject `json:"objects_put,omitempty"`
32
33
34 ObjectsDeleted []StorageObject `json:"objects_deleted,omitempty"`
35
36
37 ObjectsDoNotExist []StorageObject `json:"objects_do_not_exist,omitempty"`
38
39
40 Errors []StorageObjectError `json:"errors,omitempty"`
41 }
42
43 type StorageObjectError struct {
44 Object StorageObject `json:"object"`
45 Error string `json:"error"`
46 }
47
48
49 type Storer interface {
50 Put(ctx context.Context, objects ...StorageObject) (StorageInfo, error)
51 Delete(ctx context.Context, objects ...StorageObject) (StorageInfo, error)
52 }
53
54
55 type GoogleCloudStorage struct {
56 c *storage.Client
57 }
58
59 var reTrimPathSuffix = regexp.MustCompile("/.*$")
60
61 func (so *StorageObject) getGcsBucket() string {
62 var b = strings.TrimPrefix(so.Location, "gs://")
63 return reTrimPathSuffix.ReplaceAllString(b, "")
64 }
65
66 var reTrimBucketPrefix = regexp.MustCompile("^gs://[^/]+/")
67
68 func (so *StorageObject) getGcsPath() string {
69 return reTrimBucketPrefix.ReplaceAllString(so.Location, "")
70 }
71
72
73 func NewGoogleCloudStorage(client *storage.Client) *GoogleCloudStorage {
74 return &GoogleCloudStorage{
75 c: client,
76 }
77 }
78
79
80 func IsStorerErrorRetryable(err error) bool {
81 return isErrBucketNotExist(err) || errors.Is(err, context.DeadlineExceeded)
82 }
83
84
85 func isErrBucketNotExist(err error) bool {
86 if err == nil {
87 return false
88 }
89
90 if err == storage.ErrBucketNotExist {
91 return true
92 }
93
94 var gerr *googleapi.Error
95 if ok := errors.As(err, &gerr); ok {
96 switch gerr.Code {
97 case 0:
98
99 case 404:
100
101 return true
102 case 503:
103
104
105 return true
106 default:
107
108 }
109 }
110 return false
111 }
112
113 const MetadataKeyWriterHostnameAndChecksumMD5 = "writerHostnameAndChecksumMD5"
114
115 var metadataHostname = os.Getenv("HOSTNAME")
116
117 func (s *GoogleCloudStorage) Put(ctx context.Context, objects ...StorageObject) (StorageInfo, error) {
118 var si StorageInfo
119 if len(objects) == 0 {
120 si.ObjectsEmpty = true
121 return si, nil
122 }
123
124 var hostname = "chariot2"
125 if metadataHostname != "" {
126 hostname = metadataHostname
127 }
128
129 var errnode errorsnode.Node
130 for _, object := range objects {
131 var bucket = object.getGcsBucket()
132 var path = object.getGcsPath()
133 var w = s.c.Bucket(bucket).Object(path).NewWriter(ctx)
134
135
136 var md5sum = calcContentMD5(object)
137 w.ObjectAttrs.MD5 = md5sum
138
139
140 var metaValue = fmt.Sprintf("%s-%x", hostname, md5sum)
141 if w.ObjectAttrs.Metadata == nil {
142 w.ObjectAttrs.Metadata = make(map[string]string)
143 }
144 w.ObjectAttrs.Metadata[MetadataKeyWriterHostnameAndChecksumMD5] = metaValue
145
146
147 var err error
148 if _, err = io.Copy(w, strings.NewReader(object.Content)); err != nil {
149 errnode.Errs = append(errnode.Errs, err)
150 si.Errors = append(si.Errors, StorageObjectError{
151 Object: object,
152 Error: err.Error(),
153 })
154 } else if err = w.Close(); err != nil {
155 errnode.Errs = append(errnode.Errs, err)
156 si.Errors = append(si.Errors, StorageObjectError{
157 Object: object,
158 Error: err.Error(),
159 })
160 } else {
161 si.ObjectsPut = append(si.ObjectsPut, object)
162 }
163
164
165 if isErrBucketNotExist(err) {
166
167 return si, err
168 }
169 }
170 var reterr error
171 if len(errnode.Errs) > 0 {
172 errnode.Message = fmt.Sprintf("Got errors putting objects: count=%d", len(errnode.Errs))
173 reterr = &errnode
174 }
175 return si, reterr
176 }
177
178 func calcContentMD5(object StorageObject) []byte {
179 var sum = md5.Sum([]byte(object.Content))
180 return sum[:]
181 }
182
183
184 func (s *GoogleCloudStorage) Delete(ctx context.Context, objects ...StorageObject) (StorageInfo, error) {
185 if len(objects) == 0 {
186 return StorageInfo{ObjectsEmpty: true}, nil
187 }
188
189 var errnode errorsnode.Node
190 var si StorageInfo
191 for _, o := range objects {
192 var object = o
193 var bucket = object.getGcsBucket()
194 var path = object.getGcsPath()
195 err := s.c.Bucket(bucket).Object(path).Delete(ctx)
196 if err != nil && err == storage.ErrObjectNotExist {
197
198 si.ObjectsDoNotExist = append(si.ObjectsDoNotExist, object)
199 } else if err != nil {
200 errnode.Errs = append(errnode.Errs, err)
201 si.Errors = append(si.Errors, StorageObjectError{Object: object, Error: err.Error()})
202 } else {
203 si.ObjectsDeleted = append(si.ObjectsDeleted, object)
204 }
205 }
206 var reterr error
207 if len(errnode.Errs) > 0 {
208 errnode.Message = fmt.Sprintf("Got errors deleting objects: count=%d", len(errnode.Errs))
209 reterr = &errnode
210 }
211 return si, reterr
212 }
213
View as plain text