...

Source file src/github.com/docker/distribution/notifications/bridge.go

Documentation: github.com/docker/distribution/notifications

     1  package notifications
     2  
     3  import (
     4  	"net/http"
     5  	"time"
     6  
     7  	"github.com/distribution/reference"
     8  	"github.com/docker/distribution"
     9  	"github.com/docker/distribution/context"
    10  	"github.com/docker/distribution/uuid"
    11  	"github.com/opencontainers/go-digest"
    12  )
    13  
    14  type bridge struct {
    15  	ub                URLBuilder
    16  	includeReferences bool
    17  	actor             ActorRecord
    18  	source            SourceRecord
    19  	request           RequestRecord
    20  	sink              Sink
    21  }
    22  
    23  var _ Listener = &bridge{}
    24  
    25  // URLBuilder defines a subset of url builder to be used by the event listener.
    26  type URLBuilder interface {
    27  	BuildManifestURL(name reference.Named) (string, error)
    28  	BuildBlobURL(ref reference.Canonical) (string, error)
    29  }
    30  
    31  // NewBridge returns a notification listener that writes records to sink,
    32  // using the actor and source. Any urls populated in the events created by
    33  // this bridge will be created using the URLBuilder.
    34  // TODO(stevvooe): Update this to simply take a context.Context object.
    35  func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink Sink, includeReferences bool) Listener {
    36  	return &bridge{
    37  		ub:                ub,
    38  		includeReferences: includeReferences,
    39  		actor:             actor,
    40  		source:            source,
    41  		request:           request,
    42  		sink:              sink,
    43  	}
    44  }
    45  
    46  // NewRequestRecord builds a RequestRecord for use in NewBridge from an
    47  // http.Request, associating it with a request id.
    48  func NewRequestRecord(id string, r *http.Request) RequestRecord {
    49  	return RequestRecord{
    50  		ID:        id,
    51  		Addr:      context.RemoteAddr(r),
    52  		Host:      r.Host,
    53  		Method:    r.Method,
    54  		UserAgent: r.UserAgent(),
    55  	}
    56  }
    57  
    58  func (b *bridge) ManifestPushed(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error {
    59  	manifestEvent, err := b.createManifestEvent(EventActionPush, repo, sm)
    60  	if err != nil {
    61  		return err
    62  	}
    63  
    64  	for _, option := range options {
    65  		if opt, ok := option.(distribution.WithTagOption); ok {
    66  			manifestEvent.Target.Tag = opt.Tag
    67  			break
    68  		}
    69  	}
    70  	return b.sink.Write(*manifestEvent)
    71  }
    72  
    73  func (b *bridge) ManifestPulled(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error {
    74  	manifestEvent, err := b.createManifestEvent(EventActionPull, repo, sm)
    75  	if err != nil {
    76  		return err
    77  	}
    78  
    79  	for _, option := range options {
    80  		if opt, ok := option.(distribution.WithTagOption); ok {
    81  			manifestEvent.Target.Tag = opt.Tag
    82  			break
    83  		}
    84  	}
    85  	return b.sink.Write(*manifestEvent)
    86  }
    87  
    88  func (b *bridge) ManifestDeleted(repo reference.Named, dgst digest.Digest) error {
    89  	return b.createManifestDeleteEventAndWrite(EventActionDelete, repo, dgst)
    90  }
    91  
    92  func (b *bridge) BlobPushed(repo reference.Named, desc distribution.Descriptor) error {
    93  	return b.createBlobEventAndWrite(EventActionPush, repo, desc)
    94  }
    95  
    96  func (b *bridge) BlobPulled(repo reference.Named, desc distribution.Descriptor) error {
    97  	return b.createBlobEventAndWrite(EventActionPull, repo, desc)
    98  }
    99  
   100  func (b *bridge) BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error {
   101  	event, err := b.createBlobEvent(EventActionMount, repo, desc)
   102  	if err != nil {
   103  		return err
   104  	}
   105  	event.Target.FromRepository = fromRepo.Name()
   106  	return b.sink.Write(*event)
   107  }
   108  
   109  func (b *bridge) BlobDeleted(repo reference.Named, dgst digest.Digest) error {
   110  	return b.createBlobDeleteEventAndWrite(EventActionDelete, repo, dgst)
   111  }
   112  
   113  func (b *bridge) TagDeleted(repo reference.Named, tag string) error {
   114  	event := b.createEvent(EventActionDelete)
   115  	event.Target.Repository = repo.Name()
   116  	event.Target.Tag = tag
   117  
   118  	return b.sink.Write(*event)
   119  }
   120  
   121  func (b *bridge) RepoDeleted(repo reference.Named) error {
   122  	event := b.createEvent(EventActionDelete)
   123  	event.Target.Repository = repo.Name()
   124  
   125  	return b.sink.Write(*event)
   126  }
   127  
   128  func (b *bridge) createManifestDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error {
   129  	event := b.createEvent(action)
   130  	event.Target.Repository = repo.Name()
   131  	event.Target.Digest = dgst
   132  
   133  	return b.sink.Write(*event)
   134  }
   135  
   136  func (b *bridge) createManifestEvent(action string, repo reference.Named, sm distribution.Manifest) (*Event, error) {
   137  	event := b.createEvent(action)
   138  	event.Target.Repository = repo.Name()
   139  
   140  	mt, p, err := sm.Payload()
   141  	if err != nil {
   142  		return nil, err
   143  	}
   144  
   145  	// Ensure we have the canonical manifest descriptor here
   146  	manifest, desc, err := distribution.UnmarshalManifest(mt, p)
   147  	if err != nil {
   148  		return nil, err
   149  	}
   150  
   151  	event.Target.MediaType = mt
   152  	event.Target.Length = desc.Size
   153  	event.Target.Size = desc.Size
   154  	event.Target.Digest = desc.Digest
   155  	if b.includeReferences {
   156  		event.Target.References = append(event.Target.References, manifest.References()...)
   157  	}
   158  
   159  	ref, err := reference.WithDigest(repo, event.Target.Digest)
   160  	if err != nil {
   161  		return nil, err
   162  	}
   163  
   164  	event.Target.URL, err = b.ub.BuildManifestURL(ref)
   165  	if err != nil {
   166  		return nil, err
   167  	}
   168  
   169  	return event, nil
   170  }
   171  
   172  func (b *bridge) createBlobDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error {
   173  	event := b.createEvent(action)
   174  	event.Target.Digest = dgst
   175  	event.Target.Repository = repo.Name()
   176  
   177  	return b.sink.Write(*event)
   178  }
   179  
   180  func (b *bridge) createBlobEventAndWrite(action string, repo reference.Named, desc distribution.Descriptor) error {
   181  	event, err := b.createBlobEvent(action, repo, desc)
   182  	if err != nil {
   183  		return err
   184  	}
   185  
   186  	return b.sink.Write(*event)
   187  }
   188  
   189  func (b *bridge) createBlobEvent(action string, repo reference.Named, desc distribution.Descriptor) (*Event, error) {
   190  	event := b.createEvent(action)
   191  	event.Target.Descriptor = desc
   192  	event.Target.Length = desc.Size
   193  	event.Target.Repository = repo.Name()
   194  
   195  	ref, err := reference.WithDigest(repo, desc.Digest)
   196  	if err != nil {
   197  		return nil, err
   198  	}
   199  
   200  	event.Target.URL, err = b.ub.BuildBlobURL(ref)
   201  	if err != nil {
   202  		return nil, err
   203  	}
   204  
   205  	return event, nil
   206  }
   207  
   208  // createEvent creates an event with actor and source populated.
   209  func (b *bridge) createEvent(action string) *Event {
   210  	event := createEvent(action)
   211  	event.Source = b.source
   212  	event.Actor = b.actor
   213  	event.Request = b.request
   214  
   215  	return event
   216  }
   217  
   218  // createEvent returns a new event, timestamped, with the specified action.
   219  func createEvent(action string) *Event {
   220  	return &Event{
   221  		ID:        uuid.Generate().String(),
   222  		Timestamp: time.Now(),
   223  		Action:    action,
   224  	}
   225  }
   226  

View as plain text