1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package gcsfs
18
19 import (
20 "context"
21 "fmt"
22 "io"
23 "log"
24 "os"
25 "path/filepath"
26 "sort"
27 "syscall"
28
29 "github.com/googleapis/google-cloud-go-testing/storage/stiface"
30
31 "cloud.google.com/go/storage"
32
33 "google.golang.org/api/iterator"
34 )
35
36
37 type GcsFile struct {
38 openFlags int
39 fhOffset int64
40 closed bool
41 ReadDirIt stiface.ObjectIterator
42 resource *gcsFileResource
43 }
44
45 func NewGcsFile(
46 ctx context.Context,
47 fs *Fs,
48 obj stiface.ObjectHandle,
49 openFlags int,
50
51 fileMode os.FileMode,
52 name string,
53 ) *GcsFile {
54 return &GcsFile{
55 openFlags: openFlags,
56 fhOffset: 0,
57 closed: false,
58 ReadDirIt: nil,
59 resource: &gcsFileResource{
60 ctx: ctx,
61 fs: fs,
62
63 obj: obj,
64 name: name,
65 fileMode: fileMode,
66
67 currentGcsSize: 0,
68
69 offset: 0,
70 reader: nil,
71 writer: nil,
72 },
73 }
74 }
75
76 func NewGcsFileFromOldFH(
77 openFlags int,
78 fileMode os.FileMode,
79 oldFile *gcsFileResource,
80 ) *GcsFile {
81 res := &GcsFile{
82 openFlags: openFlags,
83 fhOffset: 0,
84 closed: false,
85 ReadDirIt: nil,
86
87 resource: oldFile,
88 }
89 res.resource.fileMode = fileMode
90
91 return res
92 }
93
94 func (o *GcsFile) Close() error {
95 if o.closed {
96
97 return ErrFileClosed
98 }
99 o.closed = true
100 return o.resource.Close()
101 }
102
103 func (o *GcsFile) Seek(newOffset int64, whence int) (int64, error) {
104 if o.closed {
105 return 0, ErrFileClosed
106 }
107
108
109 if (whence == 0 && newOffset == o.fhOffset) || (whence == 1 && newOffset == 0) {
110 return o.fhOffset, nil
111 }
112 log.Printf("WARNING: Seek behavior triggered, highly inefficent. Offset before seek is at %d\n", o.fhOffset)
113
114
115 err := o.Sync()
116 if err != nil {
117 return 0, err
118 }
119 stat, err := o.Stat()
120 if err != nil {
121 return 0, nil
122 }
123
124 switch whence {
125 case 0:
126 o.fhOffset = newOffset
127 case 1:
128 o.fhOffset += newOffset
129 case 2:
130 o.fhOffset = stat.Size() + newOffset
131 }
132 return o.fhOffset, nil
133 }
134
135 func (o *GcsFile) Read(p []byte) (n int, err error) {
136 return o.ReadAt(p, o.fhOffset)
137 }
138
139 func (o *GcsFile) ReadAt(p []byte, off int64) (n int, err error) {
140 if o.closed {
141 return 0, ErrFileClosed
142 }
143
144 read, err := o.resource.ReadAt(p, off)
145 o.fhOffset += int64(read)
146 return read, err
147 }
148
149 func (o *GcsFile) Write(p []byte) (n int, err error) {
150 return o.WriteAt(p, o.fhOffset)
151 }
152
153 func (o *GcsFile) WriteAt(b []byte, off int64) (n int, err error) {
154 if o.closed {
155 return 0, ErrFileClosed
156 }
157
158 if o.openFlags&os.O_RDONLY != 0 {
159 return 0, fmt.Errorf("file is opend as read only")
160 }
161
162 _, err = o.resource.obj.Attrs(o.resource.ctx)
163 if err != nil {
164 if err == storage.ErrObjectNotExist {
165 if o.openFlags&os.O_CREATE == 0 {
166 return 0, ErrFileNotFound
167 }
168 } else {
169 return 0, fmt.Errorf("error getting file attributes: %v", err)
170 }
171 }
172
173 written, err := o.resource.WriteAt(b, off)
174 o.fhOffset += int64(written)
175 return written, err
176 }
177
178 func (o *GcsFile) Name() string {
179 return filepath.FromSlash(o.resource.name)
180 }
181
182 func (o *GcsFile) readdirImpl(count int) ([]*FileInfo, error) {
183 err := o.Sync()
184 if err != nil {
185 return nil, err
186 }
187
188 var ownInfo os.FileInfo
189 ownInfo, err = o.Stat()
190 if err != nil {
191 return nil, err
192 }
193
194 if !ownInfo.IsDir() {
195 return nil, syscall.ENOTDIR
196 }
197
198 path := o.resource.fs.ensureTrailingSeparator(o.resource.name)
199 if o.ReadDirIt == nil {
200
201 bucketName, bucketPath := o.resource.fs.splitName(path)
202
203 o.ReadDirIt = o.resource.fs.client.Bucket(bucketName).Objects(
204 o.resource.ctx, &storage.Query{Delimiter: o.resource.fs.separator, Prefix: bucketPath, Versions: false})
205 }
206 var res []*FileInfo
207 for {
208 object, err := o.ReadDirIt.Next()
209 if err == iterator.Done {
210
211 o.ReadDirIt = nil
212
213 if len(res) > 0 || count <= 0 {
214 return res, nil
215 }
216
217 return res, io.EOF
218 }
219 if err != nil {
220 return res, err
221 }
222
223 tmp := newFileInfoFromAttrs(object, o.resource.fileMode)
224
225 if tmp.Name() == "" {
226
227 continue
228 }
229
230 if object.Name == "" && object.Prefix == "" {
231 continue
232 }
233
234 if tmp.Name() == ownInfo.Name() {
235
236 continue
237 }
238
239 res = append(res, tmp)
240
241
242
243
244
245
246
247
248 }
249
250 }
251
252 func (o *GcsFile) Readdir(count int) ([]os.FileInfo, error) {
253 fi, err := o.readdirImpl(count)
254 if len(fi) > 0 {
255 sort.Sort(ByName(fi))
256 }
257
258 if count > 0 {
259 fi = fi[:count]
260 }
261
262 var res []os.FileInfo
263 for _, f := range fi {
264 res = append(res, f)
265 }
266 return res, err
267 }
268
269 func (o *GcsFile) Readdirnames(n int) ([]string, error) {
270 fi, err := o.Readdir(n)
271 if err != nil && err != io.EOF {
272 return nil, err
273 }
274 names := make([]string, len(fi))
275
276 for i, f := range fi {
277 names[i] = f.Name()
278 }
279 return names, err
280 }
281
282 func (o *GcsFile) Stat() (os.FileInfo, error) {
283 err := o.Sync()
284 if err != nil {
285 return nil, err
286 }
287
288 return newFileInfo(o.resource.name, o.resource.fs, o.resource.fileMode)
289 }
290
291 func (o *GcsFile) Sync() error {
292 return o.resource.maybeCloseIo()
293 }
294
295 func (o *GcsFile) Truncate(wantedSize int64) error {
296 if o.closed {
297 return ErrFileClosed
298 }
299 if o.openFlags == os.O_RDONLY {
300 return fmt.Errorf("file was opened as read only")
301 }
302 return o.resource.Truncate(wantedSize)
303 }
304
305 func (o *GcsFile) WriteString(s string) (ret int, err error) {
306 return o.Write([]byte(s))
307 }
308
View as plain text