...

Source file src/github.com/docker/distribution/registry/storage/driver/base/regulator.go

Documentation: github.com/docker/distribution/registry/storage/driver/base

     1  package base
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"io"
     7  	"reflect"
     8  	"strconv"
     9  	"sync"
    10  
    11  	storagedriver "github.com/docker/distribution/registry/storage/driver"
    12  )
    13  
    14  type regulator struct {
    15  	storagedriver.StorageDriver
    16  	*sync.Cond
    17  
    18  	available uint64
    19  }
    20  
    21  // GetLimitFromParameter takes an interface type as decoded from the YAML
    22  // configuration and returns a uint64 representing the maximum number of
    23  // concurrent calls given a minimum limit and default.
    24  //
    25  // If the parameter supplied is of an invalid type this returns an error.
    26  func GetLimitFromParameter(param interface{}, min, def uint64) (uint64, error) {
    27  	limit := def
    28  
    29  	switch v := param.(type) {
    30  	case string:
    31  		var err error
    32  		if limit, err = strconv.ParseUint(v, 0, 64); err != nil {
    33  			return limit, fmt.Errorf("parameter must be an integer, '%v' invalid", param)
    34  		}
    35  	case uint64:
    36  		limit = v
    37  	case int, int32, int64:
    38  		val := reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Int()
    39  		// if param is negative casting to uint64 will wrap around and
    40  		// give you the hugest thread limit ever. Let's be sensible, here
    41  		if val > 0 {
    42  			limit = uint64(val)
    43  		} else {
    44  			limit = min
    45  		}
    46  	case uint, uint32:
    47  		limit = reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Uint()
    48  	case nil:
    49  		// use the default
    50  	default:
    51  		return 0, fmt.Errorf("invalid value '%#v'", param)
    52  	}
    53  
    54  	if limit < min {
    55  		return min, nil
    56  	}
    57  
    58  	return limit, nil
    59  }
    60  
    61  // NewRegulator wraps the given driver and is used to regulate concurrent calls
    62  // to the given storage driver to a maximum of the given limit. This is useful
    63  // for storage drivers that would otherwise create an unbounded number of OS
    64  // threads if allowed to be called unregulated.
    65  func NewRegulator(driver storagedriver.StorageDriver, limit uint64) storagedriver.StorageDriver {
    66  	return &regulator{
    67  		StorageDriver: driver,
    68  		Cond:          sync.NewCond(&sync.Mutex{}),
    69  		available:     limit,
    70  	}
    71  }
    72  
    73  func (r *regulator) enter() {
    74  	r.L.Lock()
    75  	for r.available == 0 {
    76  		r.Wait()
    77  	}
    78  	r.available--
    79  	r.L.Unlock()
    80  }
    81  
    82  func (r *regulator) exit() {
    83  	r.L.Lock()
    84  	r.Signal()
    85  	r.available++
    86  	r.L.Unlock()
    87  }
    88  
    89  // Name returns the human-readable "name" of the driver, useful in error
    90  // messages and logging. By convention, this will just be the registration
    91  // name, but drivers may provide other information here.
    92  func (r *regulator) Name() string {
    93  	r.enter()
    94  	defer r.exit()
    95  
    96  	return r.StorageDriver.Name()
    97  }
    98  
    99  // GetContent retrieves the content stored at "path" as a []byte.
   100  // This should primarily be used for small objects.
   101  func (r *regulator) GetContent(ctx context.Context, path string) ([]byte, error) {
   102  	r.enter()
   103  	defer r.exit()
   104  
   105  	return r.StorageDriver.GetContent(ctx, path)
   106  }
   107  
   108  // PutContent stores the []byte content at a location designated by "path".
   109  // This should primarily be used for small objects.
   110  func (r *regulator) PutContent(ctx context.Context, path string, content []byte) error {
   111  	r.enter()
   112  	defer r.exit()
   113  
   114  	return r.StorageDriver.PutContent(ctx, path, content)
   115  }
   116  
   117  // Reader retrieves an io.ReadCloser for the content stored at "path"
   118  // with a given byte offset.
   119  // May be used to resume reading a stream by providing a nonzero offset.
   120  func (r *regulator) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
   121  	r.enter()
   122  	defer r.exit()
   123  
   124  	return r.StorageDriver.Reader(ctx, path, offset)
   125  }
   126  
   127  // Writer stores the contents of the provided io.ReadCloser at a
   128  // location designated by the given path.
   129  // May be used to resume writing a stream by providing a nonzero offset.
   130  // The offset must be no larger than the CurrentSize for this path.
   131  func (r *regulator) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
   132  	r.enter()
   133  	defer r.exit()
   134  
   135  	return r.StorageDriver.Writer(ctx, path, append)
   136  }
   137  
   138  // Stat retrieves the FileInfo for the given path, including the current
   139  // size in bytes and the creation time.
   140  func (r *regulator) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
   141  	r.enter()
   142  	defer r.exit()
   143  
   144  	return r.StorageDriver.Stat(ctx, path)
   145  }
   146  
   147  // List returns a list of the objects that are direct descendants of the
   148  // given path.
   149  func (r *regulator) List(ctx context.Context, path string) ([]string, error) {
   150  	r.enter()
   151  	defer r.exit()
   152  
   153  	return r.StorageDriver.List(ctx, path)
   154  }
   155  
   156  // Move moves an object stored at sourcePath to destPath, removing the
   157  // original object.
   158  // Note: This may be no more efficient than a copy followed by a delete for
   159  // many implementations.
   160  func (r *regulator) Move(ctx context.Context, sourcePath string, destPath string) error {
   161  	r.enter()
   162  	defer r.exit()
   163  
   164  	return r.StorageDriver.Move(ctx, sourcePath, destPath)
   165  }
   166  
   167  // Delete recursively deletes all objects stored at "path" and its subpaths.
   168  func (r *regulator) Delete(ctx context.Context, path string) error {
   169  	r.enter()
   170  	defer r.exit()
   171  
   172  	return r.StorageDriver.Delete(ctx, path)
   173  }
   174  
   175  // URLFor returns a URL which may be used to retrieve the content stored at
   176  // the given path, possibly using the given options.
   177  // May return an ErrUnsupportedMethod in certain StorageDriver
   178  // implementations.
   179  func (r *regulator) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
   180  	r.enter()
   181  	defer r.exit()
   182  
   183  	return r.StorageDriver.URLFor(ctx, path, options)
   184  }
   185  

View as plain text