...

Source file src/github.com/docker/distribution/notifications/listener.go

Documentation: github.com/docker/distribution/notifications

     1  package notifications
     2  
     3  import (
     4  	"context"
     5  	"net/http"
     6  
     7  	"github.com/docker/distribution"
     8  
     9  	"github.com/distribution/reference"
    10  	dcontext "github.com/docker/distribution/context"
    11  	"github.com/opencontainers/go-digest"
    12  )
    13  
    14  // ManifestListener describes a set of methods for listening to events related to manifests.
    15  type ManifestListener interface {
    16  	ManifestPushed(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
    17  	ManifestPulled(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
    18  	ManifestDeleted(repo reference.Named, dgst digest.Digest) error
    19  }
    20  
    21  // BlobListener describes a listener that can respond to layer related events.
    22  type BlobListener interface {
    23  	BlobPushed(repo reference.Named, desc distribution.Descriptor) error
    24  	BlobPulled(repo reference.Named, desc distribution.Descriptor) error
    25  	BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error
    26  	BlobDeleted(repo reference.Named, desc digest.Digest) error
    27  }
    28  
    29  // RepoListener provides repository methods that respond to repository lifecycle
    30  type RepoListener interface {
    31  	TagDeleted(repo reference.Named, tag string) error
    32  	RepoDeleted(repo reference.Named) error
    33  }
    34  
    35  // Listener combines all repository events into a single interface.
    36  type Listener interface {
    37  	ManifestListener
    38  	BlobListener
    39  	RepoListener
    40  }
    41  
    42  type repositoryListener struct {
    43  	distribution.Repository
    44  	listener Listener
    45  }
    46  
    47  type removerListener struct {
    48  	distribution.RepositoryRemover
    49  	listener Listener
    50  }
    51  
    52  // Listen dispatches events on the repository to the listener.
    53  func Listen(repo distribution.Repository, remover distribution.RepositoryRemover, listener Listener) (distribution.Repository, distribution.RepositoryRemover) {
    54  	return &repositoryListener{
    55  			Repository: repo,
    56  			listener:   listener,
    57  		}, &removerListener{
    58  			RepositoryRemover: remover,
    59  			listener:          listener,
    60  		}
    61  }
    62  
    63  func (nl *removerListener) Remove(ctx context.Context, name reference.Named) error {
    64  	err := nl.RepositoryRemover.Remove(ctx, name)
    65  	if err != nil {
    66  		return err
    67  	}
    68  	return nl.listener.RepoDeleted(name)
    69  }
    70  
    71  func (rl *repositoryListener) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
    72  	manifests, err := rl.Repository.Manifests(ctx, options...)
    73  	if err != nil {
    74  		return nil, err
    75  	}
    76  	return &manifestServiceListener{
    77  		ManifestService: manifests,
    78  		parent:          rl,
    79  	}, nil
    80  }
    81  
    82  func (rl *repositoryListener) Blobs(ctx context.Context) distribution.BlobStore {
    83  	return &blobServiceListener{
    84  		BlobStore: rl.Repository.Blobs(ctx),
    85  		parent:    rl,
    86  	}
    87  }
    88  
    89  type manifestServiceListener struct {
    90  	distribution.ManifestService
    91  	parent *repositoryListener
    92  }
    93  
    94  func (msl *manifestServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
    95  	err := msl.ManifestService.Delete(ctx, dgst)
    96  	if err == nil {
    97  		if err := msl.parent.listener.ManifestDeleted(msl.parent.Repository.Named(), dgst); err != nil {
    98  			dcontext.GetLogger(ctx).Errorf("error dispatching manifest delete to listener: %v", err)
    99  		}
   100  	}
   101  
   102  	return err
   103  }
   104  
   105  func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
   106  	sm, err := msl.ManifestService.Get(ctx, dgst, options...)
   107  	if err == nil {
   108  		if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Named(), sm, options...); err != nil {
   109  			dcontext.GetLogger(ctx).Errorf("error dispatching manifest pull to listener: %v", err)
   110  		}
   111  	}
   112  
   113  	return sm, err
   114  }
   115  
   116  func (msl *manifestServiceListener) Put(ctx context.Context, sm distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
   117  	dgst, err := msl.ManifestService.Put(ctx, sm, options...)
   118  
   119  	if err == nil {
   120  		if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Named(), sm, options...); err != nil {
   121  			dcontext.GetLogger(ctx).Errorf("error dispatching manifest push to listener: %v", err)
   122  		}
   123  	}
   124  
   125  	return dgst, err
   126  }
   127  
   128  type blobServiceListener struct {
   129  	distribution.BlobStore
   130  	parent *repositoryListener
   131  }
   132  
   133  var _ distribution.BlobStore = &blobServiceListener{}
   134  
   135  func (bsl *blobServiceListener) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
   136  	p, err := bsl.BlobStore.Get(ctx, dgst)
   137  	if err == nil {
   138  		if desc, err := bsl.Stat(ctx, dgst); err != nil {
   139  			dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
   140  		} else {
   141  			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
   142  				dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
   143  			}
   144  		}
   145  	}
   146  
   147  	return p, err
   148  }
   149  
   150  func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
   151  	rc, err := bsl.BlobStore.Open(ctx, dgst)
   152  	if err == nil {
   153  		if desc, err := bsl.Stat(ctx, dgst); err != nil {
   154  			dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
   155  		} else {
   156  			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
   157  				dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
   158  			}
   159  		}
   160  	}
   161  
   162  	return rc, err
   163  }
   164  
   165  func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
   166  	err := bsl.BlobStore.ServeBlob(ctx, w, r, dgst)
   167  	if err == nil {
   168  		if desc, err := bsl.Stat(ctx, dgst); err != nil {
   169  			dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
   170  		} else {
   171  			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
   172  				dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
   173  			}
   174  		}
   175  	}
   176  
   177  	return err
   178  }
   179  
   180  func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
   181  	desc, err := bsl.BlobStore.Put(ctx, mediaType, p)
   182  	if err == nil {
   183  		if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository.Named(), desc); err != nil {
   184  			dcontext.GetLogger(ctx).Errorf("error dispatching layer push to listener: %v", err)
   185  		}
   186  	}
   187  
   188  	return desc, err
   189  }
   190  
   191  func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
   192  	wr, err := bsl.BlobStore.Create(ctx, options...)
   193  	switch err := err.(type) {
   194  	case distribution.ErrBlobMounted:
   195  		if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Named(), err.Descriptor, err.From); err != nil {
   196  			dcontext.GetLogger(ctx).Errorf("error dispatching blob mount to listener: %v", err)
   197  		}
   198  		return nil, err
   199  	}
   200  	return bsl.decorateWriter(wr), err
   201  }
   202  
   203  func (bsl *blobServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
   204  	err := bsl.BlobStore.Delete(ctx, dgst)
   205  	if err == nil {
   206  		if err := bsl.parent.listener.BlobDeleted(bsl.parent.Repository.Named(), dgst); err != nil {
   207  			dcontext.GetLogger(ctx).Errorf("error dispatching layer delete to listener: %v", err)
   208  		}
   209  	}
   210  
   211  	return err
   212  }
   213  
   214  func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
   215  	wr, err := bsl.BlobStore.Resume(ctx, id)
   216  	return bsl.decorateWriter(wr), err
   217  }
   218  
   219  func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter {
   220  	return &blobWriterListener{
   221  		BlobWriter: wr,
   222  		parent:     bsl,
   223  	}
   224  }
   225  
   226  type blobWriterListener struct {
   227  	distribution.BlobWriter
   228  	parent *blobServiceListener
   229  }
   230  
   231  func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
   232  	committed, err := bwl.BlobWriter.Commit(ctx, desc)
   233  	if err == nil {
   234  		if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository.Named(), committed); err != nil {
   235  			dcontext.GetLogger(ctx).Errorf("error dispatching blob push to listener: %v", err)
   236  		}
   237  	}
   238  
   239  	return committed, err
   240  }
   241  
   242  type tagServiceListener struct {
   243  	distribution.TagService
   244  	parent *repositoryListener
   245  }
   246  
   247  func (rl *repositoryListener) Tags(ctx context.Context) distribution.TagService {
   248  	return &tagServiceListener{
   249  		TagService: rl.Repository.Tags(ctx),
   250  		parent:     rl,
   251  	}
   252  }
   253  
   254  func (tagSL *tagServiceListener) Untag(ctx context.Context, tag string) error {
   255  	if err := tagSL.TagService.Untag(ctx, tag); err != nil {
   256  		return err
   257  	}
   258  	if err := tagSL.parent.listener.TagDeleted(tagSL.parent.Repository.Named(), tag); err != nil {
   259  		dcontext.GetLogger(ctx).Errorf("error dispatching tag deleted to listener: %v", err)
   260  		return err
   261  	}
   262  	return nil
   263  }
   264  

View as plain text