...

Source file src/github.com/docker/distribution/registry/storage/cache/redis/redis.go

Documentation: github.com/docker/distribution/registry/storage/cache/redis

     1  package redis
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	"github.com/distribution/reference"
     8  	"github.com/docker/distribution"
     9  	"github.com/docker/distribution/registry/storage/cache"
    10  	"github.com/garyburd/redigo/redis"
    11  	"github.com/opencontainers/go-digest"
    12  )
    13  
    14  // redisBlobStatService provides an implementation of
    15  // BlobDescriptorCacheProvider based on redis. Blob descriptors are stored in
    16  // two parts. The first provide fast access to repository membership through a
    17  // redis set for each repo. The second is a redis hash keyed by the digest of
    18  // the layer, providing path, length and mediatype information. There is also
    19  // a per-repository redis hash of the blob descriptor, allowing override of
    20  // data. This is currently used to override the mediatype on a per-repository
    21  // basis.
    22  //
    23  // Note that there is no implied relationship between these two caches. The
    24  // layer may exist in one, both or none and the code must be written this way.
    25  type redisBlobDescriptorService struct {
    26  	pool *redis.Pool
    27  
    28  	// TODO(stevvooe): We use a pool because we don't have great control over
    29  	// the cache lifecycle to manage connections. A new connection if fetched
    30  	// for each operation. Once we have better lifecycle management of the
    31  	// request objects, we can change this to a connection.
    32  }
    33  
    34  // NewRedisBlobDescriptorCacheProvider returns a new redis-based
    35  // BlobDescriptorCacheProvider using the provided redis connection pool.
    36  func NewRedisBlobDescriptorCacheProvider(pool *redis.Pool) cache.BlobDescriptorCacheProvider {
    37  	return &redisBlobDescriptorService{
    38  		pool: pool,
    39  	}
    40  }
    41  
    42  // RepositoryScoped returns the scoped cache.
    43  func (rbds *redisBlobDescriptorService) RepositoryScoped(repo string) (distribution.BlobDescriptorService, error) {
    44  	if _, err := reference.ParseNormalizedNamed(repo); err != nil {
    45  		return nil, err
    46  	}
    47  
    48  	return &repositoryScopedRedisBlobDescriptorService{
    49  		repo:     repo,
    50  		upstream: rbds,
    51  	}, nil
    52  }
    53  
    54  // Stat retrieves the descriptor data from the redis hash entry.
    55  func (rbds *redisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
    56  	if err := dgst.Validate(); err != nil {
    57  		return distribution.Descriptor{}, err
    58  	}
    59  
    60  	conn := rbds.pool.Get()
    61  	defer conn.Close()
    62  
    63  	return rbds.stat(ctx, conn, dgst)
    64  }
    65  
    66  func (rbds *redisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
    67  	if err := dgst.Validate(); err != nil {
    68  		return err
    69  	}
    70  
    71  	conn := rbds.pool.Get()
    72  	defer conn.Close()
    73  
    74  	// Not atomic in redis <= 2.3
    75  	reply, err := conn.Do("HDEL", rbds.blobDescriptorHashKey(dgst), "digest", "size", "mediatype")
    76  	if err != nil {
    77  		return err
    78  	}
    79  
    80  	if reply == 0 {
    81  		return distribution.ErrBlobUnknown
    82  	}
    83  
    84  	return nil
    85  }
    86  
    87  // stat provides an internal stat call that takes a connection parameter. This
    88  // allows some internal management of the connection scope.
    89  func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn redis.Conn, dgst digest.Digest) (distribution.Descriptor, error) {
    90  	reply, err := redis.Values(conn.Do("HMGET", rbds.blobDescriptorHashKey(dgst), "digest", "size", "mediatype"))
    91  	if err != nil {
    92  		return distribution.Descriptor{}, err
    93  	}
    94  
    95  	// NOTE(stevvooe): The "size" field used to be "length". We treat a
    96  	// missing "size" field here as an unknown blob, which causes a cache
    97  	// miss, effectively migrating the field.
    98  	if len(reply) < 3 || reply[0] == nil || reply[1] == nil { // don't care if mediatype is nil
    99  		return distribution.Descriptor{}, distribution.ErrBlobUnknown
   100  	}
   101  
   102  	var desc distribution.Descriptor
   103  	if _, err := redis.Scan(reply, &desc.Digest, &desc.Size, &desc.MediaType); err != nil {
   104  		return distribution.Descriptor{}, err
   105  	}
   106  
   107  	return desc, nil
   108  }
   109  
   110  // SetDescriptor sets the descriptor data for the given digest using a redis
   111  // hash. A hash is used here since we may store unrelated fields about a layer
   112  // in the future.
   113  func (rbds *redisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
   114  	if err := dgst.Validate(); err != nil {
   115  		return err
   116  	}
   117  
   118  	if err := cache.ValidateDescriptor(desc); err != nil {
   119  		return err
   120  	}
   121  
   122  	conn := rbds.pool.Get()
   123  	defer conn.Close()
   124  
   125  	return rbds.setDescriptor(ctx, conn, dgst, desc)
   126  }
   127  
   128  func (rbds *redisBlobDescriptorService) setDescriptor(ctx context.Context, conn redis.Conn, dgst digest.Digest, desc distribution.Descriptor) error {
   129  	if _, err := conn.Do("HMSET", rbds.blobDescriptorHashKey(dgst),
   130  		"digest", desc.Digest,
   131  		"size", desc.Size); err != nil {
   132  		return err
   133  	}
   134  
   135  	// Only set mediatype if not already set.
   136  	if _, err := conn.Do("HSETNX", rbds.blobDescriptorHashKey(dgst),
   137  		"mediatype", desc.MediaType); err != nil {
   138  		return err
   139  	}
   140  
   141  	return nil
   142  }
   143  
   144  func (rbds *redisBlobDescriptorService) blobDescriptorHashKey(dgst digest.Digest) string {
   145  	return "blobs::" + dgst.String()
   146  }
   147  
   148  type repositoryScopedRedisBlobDescriptorService struct {
   149  	repo     string
   150  	upstream *redisBlobDescriptorService
   151  }
   152  
   153  var _ distribution.BlobDescriptorService = &repositoryScopedRedisBlobDescriptorService{}
   154  
   155  // Stat ensures that the digest is a member of the specified repository and
   156  // forwards the descriptor request to the global blob store. If the media type
   157  // differs for the repository, we override it.
   158  func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
   159  	if err := dgst.Validate(); err != nil {
   160  		return distribution.Descriptor{}, err
   161  	}
   162  
   163  	conn := rsrbds.upstream.pool.Get()
   164  	defer conn.Close()
   165  
   166  	// Check membership to repository first
   167  	member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
   168  	if err != nil {
   169  		return distribution.Descriptor{}, err
   170  	}
   171  
   172  	if !member {
   173  		return distribution.Descriptor{}, distribution.ErrBlobUnknown
   174  	}
   175  
   176  	upstream, err := rsrbds.upstream.stat(ctx, conn, dgst)
   177  	if err != nil {
   178  		return distribution.Descriptor{}, err
   179  	}
   180  
   181  	// We allow a per repository mediatype, let's look it up here.
   182  	mediatype, err := redis.String(conn.Do("HGET", rsrbds.blobDescriptorHashKey(dgst), "mediatype"))
   183  	if err != nil {
   184  		return distribution.Descriptor{}, err
   185  	}
   186  
   187  	if mediatype != "" {
   188  		upstream.MediaType = mediatype
   189  	}
   190  
   191  	return upstream, nil
   192  }
   193  
   194  // Clear removes the descriptor from the cache and forwards to the upstream descriptor store
   195  func (rsrbds *repositoryScopedRedisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
   196  	if err := dgst.Validate(); err != nil {
   197  		return err
   198  	}
   199  
   200  	conn := rsrbds.upstream.pool.Get()
   201  	defer conn.Close()
   202  
   203  	// Check membership to repository first
   204  	member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
   205  	if err != nil {
   206  		return err
   207  	}
   208  
   209  	if !member {
   210  		return distribution.ErrBlobUnknown
   211  	}
   212  
   213  	return rsrbds.upstream.Clear(ctx, dgst)
   214  }
   215  
   216  func (rsrbds *repositoryScopedRedisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
   217  	if err := dgst.Validate(); err != nil {
   218  		return err
   219  	}
   220  
   221  	if err := cache.ValidateDescriptor(desc); err != nil {
   222  		return err
   223  	}
   224  
   225  	if dgst != desc.Digest {
   226  		if dgst.Algorithm() == desc.Digest.Algorithm() {
   227  			return fmt.Errorf("redis cache: digest for descriptors differ but algorithm does not: %q != %q", dgst, desc.Digest)
   228  		}
   229  	}
   230  
   231  	conn := rsrbds.upstream.pool.Get()
   232  	defer conn.Close()
   233  
   234  	return rsrbds.setDescriptor(ctx, conn, dgst, desc)
   235  }
   236  
   237  func (rsrbds *repositoryScopedRedisBlobDescriptorService) setDescriptor(ctx context.Context, conn redis.Conn, dgst digest.Digest, desc distribution.Descriptor) error {
   238  	if _, err := conn.Do("SADD", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst); err != nil {
   239  		return err
   240  	}
   241  
   242  	if err := rsrbds.upstream.setDescriptor(ctx, conn, dgst, desc); err != nil {
   243  		return err
   244  	}
   245  
   246  	// Override repository mediatype.
   247  	if _, err := conn.Do("HSET", rsrbds.blobDescriptorHashKey(dgst), "mediatype", desc.MediaType); err != nil {
   248  		return err
   249  	}
   250  
   251  	// Also set the values for the primary descriptor, if they differ by
   252  	// algorithm (ie sha256 vs sha512).
   253  	if desc.Digest != "" && dgst != desc.Digest && dgst.Algorithm() != desc.Digest.Algorithm() {
   254  		if err := rsrbds.setDescriptor(ctx, conn, desc.Digest, desc); err != nil {
   255  			return err
   256  		}
   257  	}
   258  
   259  	return nil
   260  }
   261  
   262  func (rsrbds *repositoryScopedRedisBlobDescriptorService) blobDescriptorHashKey(dgst digest.Digest) string {
   263  	return "repository::" + rsrbds.repo + "::blobs::" + dgst.String()
   264  }
   265  
   266  func (rsrbds *repositoryScopedRedisBlobDescriptorService) repositoryBlobSetKey(repo string) string {
   267  	return "repository::" + rsrbds.repo + "::blobs"
   268  }
   269  

View as plain text