1 package storage
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "sync"
9 "time"
10
11 gstorage "cloud.google.com/go/storage"
12 "golang.org/x/oauth2/google"
13 "google.golang.org/api/googleapi"
14 "google.golang.org/api/iterator"
15 "google.golang.org/api/option"
16 )
17
18
19
20 func NewCloudStorageFS(bucket string, credentials *google.Credentials) FS {
21 return &cloudStorageFS{
22 bucketName: bucket,
23 credentials: credentials,
24 }
25 }
26
27
28
29 type cloudStorageFS struct {
30 bucketName string
31 credentials *google.Credentials
32
33 bucketLock sync.RWMutex
34 bucket *gstorage.BucketHandle
35 bucketScopes Scope
36 }
37
38 func (c *cloudStorageFS) URL(ctx context.Context, path string, options *SignedURLOptions) (string, error) {
39 if options == nil {
40 options = &SignedURLOptions{}
41 }
42 options.applyDefaults()
43
44 b, err := c.bucketHandle(ctx, ScopeSignURL)
45 if err != nil {
46 return "", err
47 }
48
49 return b.SignedURL(path, &gstorage.SignedURLOptions{
50 Method: options.Method,
51 Expires: time.Now().Add(options.Expiry),
52 })
53 }
54
55
56 func (c *cloudStorageFS) Open(ctx context.Context, path string, options *ReaderOptions) (*File, error) {
57 b, err := c.bucketHandle(ctx, ScopeRead)
58 if err != nil {
59 return nil, err
60 }
61
62 obj := b.Object(path)
63 if options != nil {
64 obj = obj.ReadCompressed(options.ReadCompressed)
65 }
66
67 f, err := obj.NewReader(ctx)
68 if err != nil {
69 if errors.Is(err, gstorage.ErrObjectNotExist) {
70 return nil, ¬ExistError{
71 Path: path,
72 }
73 }
74
75 return nil, err
76 }
77
78 return &File{
79 ReadCloser: f,
80 Attributes: Attributes{
81 ContentType: f.Attrs.ContentType,
82 ContentEncoding: f.Attrs.ContentEncoding,
83 ModTime: f.Attrs.LastModified,
84 Size: f.Attrs.Size,
85 },
86 }, nil
87 }
88
89
90 func (c *cloudStorageFS) Attributes(ctx context.Context, path string, _ *ReaderOptions) (*Attributes, error) {
91 b, err := c.bucketHandle(ctx, ScopeRead)
92 if err != nil {
93 return nil, err
94 }
95
96 a, err := b.Object(path).Attrs(ctx)
97 if err != nil {
98 return nil, err
99 }
100
101 return &Attributes{
102 ContentType: a.ContentType,
103 ContentEncoding: a.ContentEncoding,
104 Metadata: a.Metadata,
105 ModTime: a.Updated,
106 CreationTime: a.Created,
107 Size: a.Size,
108 }, nil
109 }
110
111
112 func (c *cloudStorageFS) Create(ctx context.Context, path string, options *WriterOptions) (io.WriteCloser, error) {
113 b, err := c.bucketHandle(ctx, ScopeWrite)
114 if err != nil {
115 return nil, err
116 }
117
118 w := b.Object(path).NewWriter(ctx)
119
120 if options != nil {
121 w.Metadata = options.Attributes.Metadata
122 w.ContentType = options.Attributes.ContentType
123 w.ContentEncoding = options.Attributes.ContentEncoding
124 w.ChunkSize = options.BufferSize
125 }
126 w.ChunkSize = c.chunkSize(w.ChunkSize)
127
128 return w, nil
129 }
130
131 func (c *cloudStorageFS) chunkSize(size int) int {
132 if size == 0 {
133 return googleapi.DefaultUploadChunkSize
134 } else if size > 0 {
135 return size
136 }
137
138 return 0
139 }
140
141
142 func (c *cloudStorageFS) Delete(ctx context.Context, path string) error {
143 b, err := c.bucketHandle(ctx, ScopeDelete)
144 if err != nil {
145 return err
146 }
147
148 return b.Object(path).Delete(ctx)
149 }
150
151
152 func (c *cloudStorageFS) Walk(ctx context.Context, path string, fn WalkFn) error {
153 bh, err := c.bucketHandle(ctx, ScopeRead)
154 if err != nil {
155 return err
156 }
157
158 it := bh.Objects(ctx, &gstorage.Query{
159 Prefix: path,
160 })
161
162 for {
163 r, err := it.Next()
164 if errors.Is(err, iterator.Done) {
165 break
166 }
167 if err != nil {
168
169 return err
170 }
171
172 if err = fn(r.Name); err != nil {
173 return err
174 }
175 }
176
177 return nil
178 }
179
180 func cloudStorageScope(scope Scope) string {
181 switch {
182 case scope.Has(ScopeDelete):
183 return gstorage.ScopeFullControl
184 case scope.Has(ScopeWrite):
185 return gstorage.ScopeReadWrite
186 case scope.Has(ScopeRead), scope.Has(ScopeSignURL):
187 return gstorage.ScopeReadOnly
188 default:
189 panic(fmt.Sprintf("unknown scope: '%s'", scope))
190 }
191 }
192
193 func ResolveCloudStorageScope(scope Scope) Scope {
194 switch cloudStorageScope(scope) {
195 case gstorage.ScopeFullControl:
196 return ScopeRWD | scope
197 case gstorage.ScopeReadWrite:
198 return ScopeRW | scope
199 case gstorage.ScopeReadOnly:
200 return ScopeRead | scope
201 default:
202 panic(fmt.Sprintf("unknown scope: '%s'", scope))
203 }
204 }
205
206 func (c *cloudStorageFS) findCredentials(ctx context.Context, scope string) (*google.Credentials, error) {
207 if c.credentials != nil {
208 return c.credentials, nil
209 }
210
211 return google.FindDefaultCredentials(ctx, scope)
212 }
213
214 func (c *cloudStorageFS) client(ctx context.Context, scope Scope) (*gstorage.Client, error) {
215 creds, err := c.findCredentials(ctx, cloudStorageScope(scope))
216 if err != nil {
217 return nil, fmt.Errorf("finding credentials: %w", err)
218 }
219
220 var options []option.ClientOption
221 options = append(options, option.WithCredentials(creds))
222 options = append(options, option.WithScopes(cloudStorageScope(scope)))
223
224 client, err := gstorage.NewClient(ctx, options...)
225 if err != nil {
226 return nil, fmt.Errorf("building client: %w", err)
227 }
228
229 return client, nil
230 }
231
232 func (c *cloudStorageFS) bucketHandle(ctx context.Context, scope Scope) (*gstorage.BucketHandle, error) {
233 c.bucketLock.RLock()
234 scope |= c.bucketScopes
235 if bucket := c.bucket; bucket != nil && c.bucketScopes.Has(scope) {
236 c.bucketLock.RUnlock()
237
238 return bucket, nil
239 }
240 c.bucketLock.RUnlock()
241
242 c.bucketLock.Lock()
243 defer c.bucketLock.Unlock()
244 if c.bucket != nil && c.bucketScopes.Has(scope) {
245 return c.bucket, nil
246 }
247
248
249
250
251 scope = ResolveCloudStorageScope(c.bucketScopes | scope)
252
253 client, err := c.client(ctx, scope)
254 if err != nil {
255 return nil, err
256 }
257
258 c.bucket = client.Bucket(c.bucketName)
259 c.bucketScopes = scope
260
261 return c.bucket, nil
262 }
263
View as plain text