1 package notifications
2
3 import (
4 "context"
5 "net/http"
6
7 "github.com/docker/distribution"
8
9 "github.com/distribution/reference"
10 dcontext "github.com/docker/distribution/context"
11 "github.com/opencontainers/go-digest"
12 )
13
14
15 type ManifestListener interface {
16 ManifestPushed(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
17 ManifestPulled(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
18 ManifestDeleted(repo reference.Named, dgst digest.Digest) error
19 }
20
21
22 type BlobListener interface {
23 BlobPushed(repo reference.Named, desc distribution.Descriptor) error
24 BlobPulled(repo reference.Named, desc distribution.Descriptor) error
25 BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error
26 BlobDeleted(repo reference.Named, desc digest.Digest) error
27 }
28
29
30 type RepoListener interface {
31 TagDeleted(repo reference.Named, tag string) error
32 RepoDeleted(repo reference.Named) error
33 }
34
35
36 type Listener interface {
37 ManifestListener
38 BlobListener
39 RepoListener
40 }
41
42 type repositoryListener struct {
43 distribution.Repository
44 listener Listener
45 }
46
47 type removerListener struct {
48 distribution.RepositoryRemover
49 listener Listener
50 }
51
52
53 func Listen(repo distribution.Repository, remover distribution.RepositoryRemover, listener Listener) (distribution.Repository, distribution.RepositoryRemover) {
54 return &repositoryListener{
55 Repository: repo,
56 listener: listener,
57 }, &removerListener{
58 RepositoryRemover: remover,
59 listener: listener,
60 }
61 }
62
63 func (nl *removerListener) Remove(ctx context.Context, name reference.Named) error {
64 err := nl.RepositoryRemover.Remove(ctx, name)
65 if err != nil {
66 return err
67 }
68 return nl.listener.RepoDeleted(name)
69 }
70
71 func (rl *repositoryListener) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
72 manifests, err := rl.Repository.Manifests(ctx, options...)
73 if err != nil {
74 return nil, err
75 }
76 return &manifestServiceListener{
77 ManifestService: manifests,
78 parent: rl,
79 }, nil
80 }
81
82 func (rl *repositoryListener) Blobs(ctx context.Context) distribution.BlobStore {
83 return &blobServiceListener{
84 BlobStore: rl.Repository.Blobs(ctx),
85 parent: rl,
86 }
87 }
88
89 type manifestServiceListener struct {
90 distribution.ManifestService
91 parent *repositoryListener
92 }
93
94 func (msl *manifestServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
95 err := msl.ManifestService.Delete(ctx, dgst)
96 if err == nil {
97 if err := msl.parent.listener.ManifestDeleted(msl.parent.Repository.Named(), dgst); err != nil {
98 dcontext.GetLogger(ctx).Errorf("error dispatching manifest delete to listener: %v", err)
99 }
100 }
101
102 return err
103 }
104
105 func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
106 sm, err := msl.ManifestService.Get(ctx, dgst, options...)
107 if err == nil {
108 if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Named(), sm, options...); err != nil {
109 dcontext.GetLogger(ctx).Errorf("error dispatching manifest pull to listener: %v", err)
110 }
111 }
112
113 return sm, err
114 }
115
116 func (msl *manifestServiceListener) Put(ctx context.Context, sm distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
117 dgst, err := msl.ManifestService.Put(ctx, sm, options...)
118
119 if err == nil {
120 if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Named(), sm, options...); err != nil {
121 dcontext.GetLogger(ctx).Errorf("error dispatching manifest push to listener: %v", err)
122 }
123 }
124
125 return dgst, err
126 }
127
128 type blobServiceListener struct {
129 distribution.BlobStore
130 parent *repositoryListener
131 }
132
133 var _ distribution.BlobStore = &blobServiceListener{}
134
135 func (bsl *blobServiceListener) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
136 p, err := bsl.BlobStore.Get(ctx, dgst)
137 if err == nil {
138 if desc, err := bsl.Stat(ctx, dgst); err != nil {
139 dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
140 } else {
141 if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
142 dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
143 }
144 }
145 }
146
147 return p, err
148 }
149
150 func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
151 rc, err := bsl.BlobStore.Open(ctx, dgst)
152 if err == nil {
153 if desc, err := bsl.Stat(ctx, dgst); err != nil {
154 dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
155 } else {
156 if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
157 dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
158 }
159 }
160 }
161
162 return rc, err
163 }
164
165 func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
166 err := bsl.BlobStore.ServeBlob(ctx, w, r, dgst)
167 if err == nil {
168 if desc, err := bsl.Stat(ctx, dgst); err != nil {
169 dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
170 } else {
171 if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
172 dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
173 }
174 }
175 }
176
177 return err
178 }
179
180 func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
181 desc, err := bsl.BlobStore.Put(ctx, mediaType, p)
182 if err == nil {
183 if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository.Named(), desc); err != nil {
184 dcontext.GetLogger(ctx).Errorf("error dispatching layer push to listener: %v", err)
185 }
186 }
187
188 return desc, err
189 }
190
191 func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
192 wr, err := bsl.BlobStore.Create(ctx, options...)
193 switch err := err.(type) {
194 case distribution.ErrBlobMounted:
195 if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Named(), err.Descriptor, err.From); err != nil {
196 dcontext.GetLogger(ctx).Errorf("error dispatching blob mount to listener: %v", err)
197 }
198 return nil, err
199 }
200 return bsl.decorateWriter(wr), err
201 }
202
203 func (bsl *blobServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
204 err := bsl.BlobStore.Delete(ctx, dgst)
205 if err == nil {
206 if err := bsl.parent.listener.BlobDeleted(bsl.parent.Repository.Named(), dgst); err != nil {
207 dcontext.GetLogger(ctx).Errorf("error dispatching layer delete to listener: %v", err)
208 }
209 }
210
211 return err
212 }
213
214 func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
215 wr, err := bsl.BlobStore.Resume(ctx, id)
216 return bsl.decorateWriter(wr), err
217 }
218
219 func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter {
220 return &blobWriterListener{
221 BlobWriter: wr,
222 parent: bsl,
223 }
224 }
225
226 type blobWriterListener struct {
227 distribution.BlobWriter
228 parent *blobServiceListener
229 }
230
231 func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
232 committed, err := bwl.BlobWriter.Commit(ctx, desc)
233 if err == nil {
234 if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository.Named(), committed); err != nil {
235 dcontext.GetLogger(ctx).Errorf("error dispatching blob push to listener: %v", err)
236 }
237 }
238
239 return committed, err
240 }
241
242 type tagServiceListener struct {
243 distribution.TagService
244 parent *repositoryListener
245 }
246
247 func (rl *repositoryListener) Tags(ctx context.Context) distribution.TagService {
248 return &tagServiceListener{
249 TagService: rl.Repository.Tags(ctx),
250 parent: rl,
251 }
252 }
253
254 func (tagSL *tagServiceListener) Untag(ctx context.Context, tag string) error {
255 if err := tagSL.TagService.Untag(ctx, tag); err != nil {
256 return err
257 }
258 if err := tagSL.parent.listener.TagDeleted(tagSL.parent.Repository.Named(), tag); err != nil {
259 dcontext.GetLogger(ctx).Errorf("error dispatching tag deleted to listener: %v", err)
260 return err
261 }
262 return nil
263 }
264
View as plain text