...

Source file src/github.com/docker/distribution/registry/storage/driver/filesystem/driver.go

Documentation: github.com/docker/distribution/registry/storage/driver/filesystem

     1  package filesystem
     2  
     3  import (
     4  	"bufio"
     5  	"bytes"
     6  	"context"
     7  	"fmt"
     8  	"io"
     9  	"io/ioutil"
    10  	"os"
    11  	"path"
    12  	"time"
    13  
    14  	storagedriver "github.com/docker/distribution/registry/storage/driver"
    15  	"github.com/docker/distribution/registry/storage/driver/base"
    16  	"github.com/docker/distribution/registry/storage/driver/factory"
    17  )
    18  
    19  const (
    20  	driverName           = "filesystem"
    21  	defaultRootDirectory = "/var/lib/registry"
    22  	defaultMaxThreads    = uint64(100)
    23  
    24  	// minThreads is the minimum value for the maxthreads configuration
    25  	// parameter. If the driver's parameters are less than this we set
    26  	// the parameters to minThreads
    27  	minThreads = uint64(25)
    28  )
    29  
    30  // DriverParameters represents all configuration options available for the
    31  // filesystem driver
    32  type DriverParameters struct {
    33  	RootDirectory string
    34  	MaxThreads    uint64
    35  }
    36  
    37  func init() {
    38  	factory.Register(driverName, &filesystemDriverFactory{})
    39  }
    40  
    41  // filesystemDriverFactory implements the factory.StorageDriverFactory interface
    42  type filesystemDriverFactory struct{}
    43  
    44  func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
    45  	return FromParameters(parameters)
    46  }
    47  
    48  type driver struct {
    49  	rootDirectory string
    50  }
    51  
    52  type baseEmbed struct {
    53  	base.Base
    54  }
    55  
    56  // Driver is a storagedriver.StorageDriver implementation backed by a local
    57  // filesystem. All provided paths will be subpaths of the RootDirectory.
    58  type Driver struct {
    59  	baseEmbed
    60  }
    61  
    62  // FromParameters constructs a new Driver with a given parameters map
    63  // Optional Parameters:
    64  // - rootdirectory
    65  // - maxthreads
    66  func FromParameters(parameters map[string]interface{}) (*Driver, error) {
    67  	params, err := fromParametersImpl(parameters)
    68  	if err != nil || params == nil {
    69  		return nil, err
    70  	}
    71  	return New(*params), nil
    72  }
    73  
    74  func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) {
    75  	var (
    76  		err           error
    77  		maxThreads    = defaultMaxThreads
    78  		rootDirectory = defaultRootDirectory
    79  	)
    80  
    81  	if parameters != nil {
    82  		if rootDir, ok := parameters["rootdirectory"]; ok {
    83  			rootDirectory = fmt.Sprint(rootDir)
    84  		}
    85  
    86  		maxThreads, err = base.GetLimitFromParameter(parameters["maxthreads"], minThreads, defaultMaxThreads)
    87  		if err != nil {
    88  			return nil, fmt.Errorf("maxthreads config error: %s", err.Error())
    89  		}
    90  	}
    91  
    92  	params := &DriverParameters{
    93  		RootDirectory: rootDirectory,
    94  		MaxThreads:    maxThreads,
    95  	}
    96  	return params, nil
    97  }
    98  
    99  // New constructs a new Driver with a given rootDirectory
   100  func New(params DriverParameters) *Driver {
   101  	fsDriver := &driver{rootDirectory: params.RootDirectory}
   102  
   103  	return &Driver{
   104  		baseEmbed: baseEmbed{
   105  			Base: base.Base{
   106  				StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads),
   107  			},
   108  		},
   109  	}
   110  }
   111  
   112  // Implement the storagedriver.StorageDriver interface
   113  
   114  func (d *driver) Name() string {
   115  	return driverName
   116  }
   117  
   118  // GetContent retrieves the content stored at "path" as a []byte.
   119  func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
   120  	rc, err := d.Reader(ctx, path, 0)
   121  	if err != nil {
   122  		return nil, err
   123  	}
   124  	defer rc.Close()
   125  
   126  	p, err := ioutil.ReadAll(rc)
   127  	if err != nil {
   128  		return nil, err
   129  	}
   130  
   131  	return p, nil
   132  }
   133  
   134  // PutContent stores the []byte content at a location designated by "path".
   135  func (d *driver) PutContent(ctx context.Context, subPath string, contents []byte) error {
   136  	writer, err := d.Writer(ctx, subPath, false)
   137  	if err != nil {
   138  		return err
   139  	}
   140  	defer writer.Close()
   141  	_, err = io.Copy(writer, bytes.NewReader(contents))
   142  	if err != nil {
   143  		writer.Cancel()
   144  		return err
   145  	}
   146  	return writer.Commit()
   147  }
   148  
   149  // Reader retrieves an io.ReadCloser for the content stored at "path" with a
   150  // given byte offset.
   151  func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
   152  	file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644)
   153  	if err != nil {
   154  		if os.IsNotExist(err) {
   155  			return nil, storagedriver.PathNotFoundError{Path: path}
   156  		}
   157  
   158  		return nil, err
   159  	}
   160  
   161  	seekPos, err := file.Seek(offset, io.SeekStart)
   162  	if err != nil {
   163  		file.Close()
   164  		return nil, err
   165  	} else if seekPos < offset {
   166  		file.Close()
   167  		return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
   168  	}
   169  
   170  	return file, nil
   171  }
   172  
   173  func (d *driver) Writer(ctx context.Context, subPath string, append bool) (storagedriver.FileWriter, error) {
   174  	fullPath := d.fullPath(subPath)
   175  	parentDir := path.Dir(fullPath)
   176  	if err := os.MkdirAll(parentDir, 0777); err != nil {
   177  		return nil, err
   178  	}
   179  
   180  	fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0666)
   181  	if err != nil {
   182  		return nil, err
   183  	}
   184  
   185  	var offset int64
   186  
   187  	if !append {
   188  		err := fp.Truncate(0)
   189  		if err != nil {
   190  			fp.Close()
   191  			return nil, err
   192  		}
   193  	} else {
   194  		n, err := fp.Seek(0, io.SeekEnd)
   195  		if err != nil {
   196  			fp.Close()
   197  			return nil, err
   198  		}
   199  		offset = n
   200  	}
   201  
   202  	return newFileWriter(fp, offset), nil
   203  }
   204  
   205  // Stat retrieves the FileInfo for the given path, including the current size
   206  // in bytes and the creation time.
   207  func (d *driver) Stat(ctx context.Context, subPath string) (storagedriver.FileInfo, error) {
   208  	fullPath := d.fullPath(subPath)
   209  
   210  	fi, err := os.Stat(fullPath)
   211  	if err != nil {
   212  		if os.IsNotExist(err) {
   213  			return nil, storagedriver.PathNotFoundError{Path: subPath}
   214  		}
   215  
   216  		return nil, err
   217  	}
   218  
   219  	return fileInfo{
   220  		path:     subPath,
   221  		FileInfo: fi,
   222  	}, nil
   223  }
   224  
   225  // List returns a list of the objects that are direct descendants of the given
   226  // path.
   227  func (d *driver) List(ctx context.Context, subPath string) ([]string, error) {
   228  	fullPath := d.fullPath(subPath)
   229  
   230  	dir, err := os.Open(fullPath)
   231  	if err != nil {
   232  		if os.IsNotExist(err) {
   233  			return nil, storagedriver.PathNotFoundError{Path: subPath}
   234  		}
   235  		return nil, err
   236  	}
   237  
   238  	defer dir.Close()
   239  
   240  	fileNames, err := dir.Readdirnames(0)
   241  	if err != nil {
   242  		return nil, err
   243  	}
   244  
   245  	keys := make([]string, 0, len(fileNames))
   246  	for _, fileName := range fileNames {
   247  		keys = append(keys, path.Join(subPath, fileName))
   248  	}
   249  
   250  	return keys, nil
   251  }
   252  
   253  // Move moves an object stored at sourcePath to destPath, removing the original
   254  // object.
   255  func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
   256  	source := d.fullPath(sourcePath)
   257  	dest := d.fullPath(destPath)
   258  
   259  	if _, err := os.Stat(source); os.IsNotExist(err) {
   260  		return storagedriver.PathNotFoundError{Path: sourcePath}
   261  	}
   262  
   263  	if err := os.MkdirAll(path.Dir(dest), 0755); err != nil {
   264  		return err
   265  	}
   266  
   267  	err := os.Rename(source, dest)
   268  	return err
   269  }
   270  
   271  // Delete recursively deletes all objects stored at "path" and its subpaths.
   272  func (d *driver) Delete(ctx context.Context, subPath string) error {
   273  	fullPath := d.fullPath(subPath)
   274  
   275  	_, err := os.Stat(fullPath)
   276  	if err != nil && !os.IsNotExist(err) {
   277  		return err
   278  	} else if err != nil {
   279  		return storagedriver.PathNotFoundError{Path: subPath}
   280  	}
   281  
   282  	err = os.RemoveAll(fullPath)
   283  	return err
   284  }
   285  
   286  // URLFor returns a URL which may be used to retrieve the content stored at the given path.
   287  // May return an UnsupportedMethodErr in certain StorageDriver implementations.
   288  func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
   289  	return "", storagedriver.ErrUnsupportedMethod{}
   290  }
   291  
   292  // Walk traverses a filesystem defined within driver, starting
   293  // from the given path, calling f on each file
   294  func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
   295  	return storagedriver.WalkFallback(ctx, d, path, f)
   296  }
   297  
   298  // fullPath returns the absolute path of a key within the Driver's storage.
   299  func (d *driver) fullPath(subPath string) string {
   300  	return path.Join(d.rootDirectory, subPath)
   301  }
   302  
   303  type fileInfo struct {
   304  	os.FileInfo
   305  	path string
   306  }
   307  
   308  var _ storagedriver.FileInfo = fileInfo{}
   309  
   310  // Path provides the full path of the target of this file info.
   311  func (fi fileInfo) Path() string {
   312  	return fi.path
   313  }
   314  
   315  // Size returns current length in bytes of the file. The return value can
   316  // be used to write to the end of the file at path. The value is
   317  // meaningless if IsDir returns true.
   318  func (fi fileInfo) Size() int64 {
   319  	if fi.IsDir() {
   320  		return 0
   321  	}
   322  
   323  	return fi.FileInfo.Size()
   324  }
   325  
   326  // ModTime returns the modification time for the file. For backends that
   327  // don't have a modification time, the creation time should be returned.
   328  func (fi fileInfo) ModTime() time.Time {
   329  	return fi.FileInfo.ModTime()
   330  }
   331  
   332  // IsDir returns true if the path is a directory.
   333  func (fi fileInfo) IsDir() bool {
   334  	return fi.FileInfo.IsDir()
   335  }
   336  
   337  type fileWriter struct {
   338  	file      *os.File
   339  	size      int64
   340  	bw        *bufio.Writer
   341  	closed    bool
   342  	committed bool
   343  	cancelled bool
   344  }
   345  
   346  func newFileWriter(file *os.File, size int64) *fileWriter {
   347  	return &fileWriter{
   348  		file: file,
   349  		size: size,
   350  		bw:   bufio.NewWriter(file),
   351  	}
   352  }
   353  
   354  func (fw *fileWriter) Write(p []byte) (int, error) {
   355  	if fw.closed {
   356  		return 0, fmt.Errorf("already closed")
   357  	} else if fw.committed {
   358  		return 0, fmt.Errorf("already committed")
   359  	} else if fw.cancelled {
   360  		return 0, fmt.Errorf("already cancelled")
   361  	}
   362  	n, err := fw.bw.Write(p)
   363  	fw.size += int64(n)
   364  	return n, err
   365  }
   366  
   367  func (fw *fileWriter) Size() int64 {
   368  	return fw.size
   369  }
   370  
   371  func (fw *fileWriter) Close() error {
   372  	if fw.closed {
   373  		return fmt.Errorf("already closed")
   374  	}
   375  
   376  	if err := fw.bw.Flush(); err != nil {
   377  		return err
   378  	}
   379  
   380  	if err := fw.file.Sync(); err != nil {
   381  		return err
   382  	}
   383  
   384  	if err := fw.file.Close(); err != nil {
   385  		return err
   386  	}
   387  	fw.closed = true
   388  	return nil
   389  }
   390  
   391  func (fw *fileWriter) Cancel() error {
   392  	if fw.closed {
   393  		return fmt.Errorf("already closed")
   394  	}
   395  
   396  	fw.cancelled = true
   397  	fw.file.Close()
   398  	return os.Remove(fw.file.Name())
   399  }
   400  
   401  func (fw *fileWriter) Commit() error {
   402  	if fw.closed {
   403  		return fmt.Errorf("already closed")
   404  	} else if fw.committed {
   405  		return fmt.Errorf("already committed")
   406  	} else if fw.cancelled {
   407  		return fmt.Errorf("already cancelled")
   408  	}
   409  
   410  	if err := fw.bw.Flush(); err != nil {
   411  		return err
   412  	}
   413  
   414  	if err := fw.file.Sync(); err != nil {
   415  		return err
   416  	}
   417  
   418  	fw.committed = true
   419  	return nil
   420  }
   421  

View as plain text