1 package filesystem
2
3 import (
4 "bufio"
5 "bytes"
6 "context"
7 "fmt"
8 "io"
9 "io/ioutil"
10 "os"
11 "path"
12 "time"
13
14 storagedriver "github.com/docker/distribution/registry/storage/driver"
15 "github.com/docker/distribution/registry/storage/driver/base"
16 "github.com/docker/distribution/registry/storage/driver/factory"
17 )
18
19 const (
20 driverName = "filesystem"
21 defaultRootDirectory = "/var/lib/registry"
22 defaultMaxThreads = uint64(100)
23
24
25
26
27 minThreads = uint64(25)
28 )
29
30
31
32 type DriverParameters struct {
33 RootDirectory string
34 MaxThreads uint64
35 }
36
37 func init() {
38 factory.Register(driverName, &filesystemDriverFactory{})
39 }
40
41
42 type filesystemDriverFactory struct{}
43
44 func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
45 return FromParameters(parameters)
46 }
47
48 type driver struct {
49 rootDirectory string
50 }
51
52 type baseEmbed struct {
53 base.Base
54 }
55
56
57
58 type Driver struct {
59 baseEmbed
60 }
61
62
63
64
65
66 func FromParameters(parameters map[string]interface{}) (*Driver, error) {
67 params, err := fromParametersImpl(parameters)
68 if err != nil || params == nil {
69 return nil, err
70 }
71 return New(*params), nil
72 }
73
74 func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) {
75 var (
76 err error
77 maxThreads = defaultMaxThreads
78 rootDirectory = defaultRootDirectory
79 )
80
81 if parameters != nil {
82 if rootDir, ok := parameters["rootdirectory"]; ok {
83 rootDirectory = fmt.Sprint(rootDir)
84 }
85
86 maxThreads, err = base.GetLimitFromParameter(parameters["maxthreads"], minThreads, defaultMaxThreads)
87 if err != nil {
88 return nil, fmt.Errorf("maxthreads config error: %s", err.Error())
89 }
90 }
91
92 params := &DriverParameters{
93 RootDirectory: rootDirectory,
94 MaxThreads: maxThreads,
95 }
96 return params, nil
97 }
98
99
100 func New(params DriverParameters) *Driver {
101 fsDriver := &driver{rootDirectory: params.RootDirectory}
102
103 return &Driver{
104 baseEmbed: baseEmbed{
105 Base: base.Base{
106 StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads),
107 },
108 },
109 }
110 }
111
112
113
114 func (d *driver) Name() string {
115 return driverName
116 }
117
118
119 func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
120 rc, err := d.Reader(ctx, path, 0)
121 if err != nil {
122 return nil, err
123 }
124 defer rc.Close()
125
126 p, err := ioutil.ReadAll(rc)
127 if err != nil {
128 return nil, err
129 }
130
131 return p, nil
132 }
133
134
135 func (d *driver) PutContent(ctx context.Context, subPath string, contents []byte) error {
136 writer, err := d.Writer(ctx, subPath, false)
137 if err != nil {
138 return err
139 }
140 defer writer.Close()
141 _, err = io.Copy(writer, bytes.NewReader(contents))
142 if err != nil {
143 writer.Cancel()
144 return err
145 }
146 return writer.Commit()
147 }
148
149
150
151 func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
152 file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644)
153 if err != nil {
154 if os.IsNotExist(err) {
155 return nil, storagedriver.PathNotFoundError{Path: path}
156 }
157
158 return nil, err
159 }
160
161 seekPos, err := file.Seek(offset, io.SeekStart)
162 if err != nil {
163 file.Close()
164 return nil, err
165 } else if seekPos < offset {
166 file.Close()
167 return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
168 }
169
170 return file, nil
171 }
172
173 func (d *driver) Writer(ctx context.Context, subPath string, append bool) (storagedriver.FileWriter, error) {
174 fullPath := d.fullPath(subPath)
175 parentDir := path.Dir(fullPath)
176 if err := os.MkdirAll(parentDir, 0777); err != nil {
177 return nil, err
178 }
179
180 fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0666)
181 if err != nil {
182 return nil, err
183 }
184
185 var offset int64
186
187 if !append {
188 err := fp.Truncate(0)
189 if err != nil {
190 fp.Close()
191 return nil, err
192 }
193 } else {
194 n, err := fp.Seek(0, io.SeekEnd)
195 if err != nil {
196 fp.Close()
197 return nil, err
198 }
199 offset = n
200 }
201
202 return newFileWriter(fp, offset), nil
203 }
204
205
206
207 func (d *driver) Stat(ctx context.Context, subPath string) (storagedriver.FileInfo, error) {
208 fullPath := d.fullPath(subPath)
209
210 fi, err := os.Stat(fullPath)
211 if err != nil {
212 if os.IsNotExist(err) {
213 return nil, storagedriver.PathNotFoundError{Path: subPath}
214 }
215
216 return nil, err
217 }
218
219 return fileInfo{
220 path: subPath,
221 FileInfo: fi,
222 }, nil
223 }
224
225
226
227 func (d *driver) List(ctx context.Context, subPath string) ([]string, error) {
228 fullPath := d.fullPath(subPath)
229
230 dir, err := os.Open(fullPath)
231 if err != nil {
232 if os.IsNotExist(err) {
233 return nil, storagedriver.PathNotFoundError{Path: subPath}
234 }
235 return nil, err
236 }
237
238 defer dir.Close()
239
240 fileNames, err := dir.Readdirnames(0)
241 if err != nil {
242 return nil, err
243 }
244
245 keys := make([]string, 0, len(fileNames))
246 for _, fileName := range fileNames {
247 keys = append(keys, path.Join(subPath, fileName))
248 }
249
250 return keys, nil
251 }
252
253
254
255 func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
256 source := d.fullPath(sourcePath)
257 dest := d.fullPath(destPath)
258
259 if _, err := os.Stat(source); os.IsNotExist(err) {
260 return storagedriver.PathNotFoundError{Path: sourcePath}
261 }
262
263 if err := os.MkdirAll(path.Dir(dest), 0755); err != nil {
264 return err
265 }
266
267 err := os.Rename(source, dest)
268 return err
269 }
270
271
272 func (d *driver) Delete(ctx context.Context, subPath string) error {
273 fullPath := d.fullPath(subPath)
274
275 _, err := os.Stat(fullPath)
276 if err != nil && !os.IsNotExist(err) {
277 return err
278 } else if err != nil {
279 return storagedriver.PathNotFoundError{Path: subPath}
280 }
281
282 err = os.RemoveAll(fullPath)
283 return err
284 }
285
286
287
288 func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
289 return "", storagedriver.ErrUnsupportedMethod{}
290 }
291
292
293
294 func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
295 return storagedriver.WalkFallback(ctx, d, path, f)
296 }
297
298
299 func (d *driver) fullPath(subPath string) string {
300 return path.Join(d.rootDirectory, subPath)
301 }
302
303 type fileInfo struct {
304 os.FileInfo
305 path string
306 }
307
308 var _ storagedriver.FileInfo = fileInfo{}
309
310
311 func (fi fileInfo) Path() string {
312 return fi.path
313 }
314
315
316
317
318 func (fi fileInfo) Size() int64 {
319 if fi.IsDir() {
320 return 0
321 }
322
323 return fi.FileInfo.Size()
324 }
325
326
327
328 func (fi fileInfo) ModTime() time.Time {
329 return fi.FileInfo.ModTime()
330 }
331
332
333 func (fi fileInfo) IsDir() bool {
334 return fi.FileInfo.IsDir()
335 }
336
337 type fileWriter struct {
338 file *os.File
339 size int64
340 bw *bufio.Writer
341 closed bool
342 committed bool
343 cancelled bool
344 }
345
346 func newFileWriter(file *os.File, size int64) *fileWriter {
347 return &fileWriter{
348 file: file,
349 size: size,
350 bw: bufio.NewWriter(file),
351 }
352 }
353
354 func (fw *fileWriter) Write(p []byte) (int, error) {
355 if fw.closed {
356 return 0, fmt.Errorf("already closed")
357 } else if fw.committed {
358 return 0, fmt.Errorf("already committed")
359 } else if fw.cancelled {
360 return 0, fmt.Errorf("already cancelled")
361 }
362 n, err := fw.bw.Write(p)
363 fw.size += int64(n)
364 return n, err
365 }
366
367 func (fw *fileWriter) Size() int64 {
368 return fw.size
369 }
370
371 func (fw *fileWriter) Close() error {
372 if fw.closed {
373 return fmt.Errorf("already closed")
374 }
375
376 if err := fw.bw.Flush(); err != nil {
377 return err
378 }
379
380 if err := fw.file.Sync(); err != nil {
381 return err
382 }
383
384 if err := fw.file.Close(); err != nil {
385 return err
386 }
387 fw.closed = true
388 return nil
389 }
390
391 func (fw *fileWriter) Cancel() error {
392 if fw.closed {
393 return fmt.Errorf("already closed")
394 }
395
396 fw.cancelled = true
397 fw.file.Close()
398 return os.Remove(fw.file.Name())
399 }
400
401 func (fw *fileWriter) Commit() error {
402 if fw.closed {
403 return fmt.Errorf("already closed")
404 } else if fw.committed {
405 return fmt.Errorf("already committed")
406 } else if fw.cancelled {
407 return fmt.Errorf("already cancelled")
408 }
409
410 if err := fw.bw.Flush(); err != nil {
411 return err
412 }
413
414 if err := fw.file.Sync(); err != nil {
415 return err
416 }
417
418 fw.committed = true
419 return nil
420 }
421
View as plain text