1 package notifications
2
3 import (
4 "fmt"
5 "math/rand"
6 "reflect"
7 "sync"
8 "time"
9
10 "github.com/sirupsen/logrus"
11
12 "testing"
13 )
14
15 func TestBroadcaster(t *testing.T) {
16 const nEvents = 1000
17 var sinks []Sink
18
19 for i := 0; i < 10; i++ {
20 sinks = append(sinks, &testSink{})
21 }
22
23 b := NewBroadcaster(sinks...)
24
25 var block []Event
26 var wg sync.WaitGroup
27 for i := 1; i <= nEvents; i++ {
28 block = append(block, createTestEvent("push", "library/test", "blob"))
29
30 if i%10 == 0 && i > 0 {
31 wg.Add(1)
32 go func(block ...Event) {
33 if err := b.Write(block...); err != nil {
34 t.Errorf("error writing block of length %d: %v", len(block), err)
35 }
36 wg.Done()
37 }(block...)
38
39 block = nil
40 }
41 }
42
43 wg.Wait()
44 if t.Failed() {
45 t.FailNow()
46 }
47 checkClose(t, b)
48
49
50 for _, sink := range sinks {
51 ts := sink.(*testSink)
52 ts.mu.Lock()
53 defer ts.mu.Unlock()
54
55 if len(ts.events) != nEvents {
56 t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
57 }
58
59 if !ts.closed {
60 t.Fatalf("sink should have been closed")
61 }
62 }
63
64 }
65
66 func TestEventQueue(t *testing.T) {
67 const nevents = 1000
68 var ts testSink
69 metrics := newSafeMetrics()
70 eq := newEventQueue(
71
72 &delayedSink{
73 Sink: &ts,
74 delay: time.Millisecond * 1,
75 }, metrics.eventQueueListener())
76
77 var wg sync.WaitGroup
78 var block []Event
79 for i := 1; i <= nevents; i++ {
80 block = append(block, createTestEvent("push", "library/test", "blob"))
81 if i%10 == 0 && i > 0 {
82 wg.Add(1)
83 go func(block ...Event) {
84 if err := eq.Write(block...); err != nil {
85 t.Errorf("error writing event block: %v", err)
86 }
87 wg.Done()
88 }(block...)
89
90 block = nil
91 }
92 }
93
94 wg.Wait()
95 if t.Failed() {
96 t.FailNow()
97 }
98 checkClose(t, eq)
99
100 ts.mu.Lock()
101 defer ts.mu.Unlock()
102 metrics.Lock()
103 defer metrics.Unlock()
104
105 if len(ts.events) != nevents {
106 t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
107 }
108
109 if !ts.closed {
110 t.Fatalf("sink should have been closed")
111 }
112
113 if metrics.Events != nevents {
114 t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
115 }
116
117 if metrics.Pending != 0 {
118 t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
119 }
120 }
121
122 func TestIgnoredSink(t *testing.T) {
123 blob := createTestEvent("push", "library/test", "blob")
124 manifest := createTestEvent("pull", "library/test", "manifest")
125
126 type testcase struct {
127 ignoreMediaTypes []string
128 ignoreActions []string
129 expected []Event
130 }
131
132 cases := []testcase{
133 {nil, nil, []Event{blob, manifest}},
134 {[]string{"other"}, []string{"other"}, []Event{blob, manifest}},
135 {[]string{"blob"}, []string{"other"}, []Event{manifest}},
136 {[]string{"blob", "manifest"}, []string{"other"}, nil},
137 {[]string{"other"}, []string{"push"}, []Event{manifest}},
138 {[]string{"other"}, []string{"pull"}, []Event{blob}},
139 {[]string{"other"}, []string{"pull", "push"}, nil},
140 }
141
142 for _, c := range cases {
143 ts := &testSink{}
144 s := newIgnoredSink(ts, c.ignoreMediaTypes, c.ignoreActions)
145
146 if err := s.Write(blob, manifest); err != nil {
147 t.Fatalf("error writing event: %v", err)
148 }
149
150 ts.mu.Lock()
151 if !reflect.DeepEqual(ts.events, c.expected) {
152 t.Fatalf("unexpected events: %#v != %#v", ts.events, c.expected)
153 }
154 ts.mu.Unlock()
155 }
156 }
157
158 func TestRetryingSink(t *testing.T) {
159
160
161
162 var ts testSink
163 flaky := &flakySink{
164 rate: 1.0,
165 Sink: &ts,
166 }
167 s := newRetryingSink(flaky, 3, 10*time.Millisecond)
168
169 var wg sync.WaitGroup
170 var block []Event
171 for i := 1; i <= 100; i++ {
172 block = append(block, createTestEvent("push", "library/test", "blob"))
173
174
175 if i > 50 {
176 s.mu.Lock()
177 flaky.rate = 0.90
178 s.mu.Unlock()
179 }
180
181 if i%10 == 0 && i > 0 {
182 wg.Add(1)
183 go func(block ...Event) {
184 defer wg.Done()
185 if err := s.Write(block...); err != nil {
186 t.Errorf("error writing event block: %v", err)
187 }
188 }(block...)
189
190 block = nil
191 }
192 }
193
194 wg.Wait()
195 if t.Failed() {
196 t.FailNow()
197 }
198 checkClose(t, s)
199
200 ts.mu.Lock()
201 defer ts.mu.Unlock()
202
203 if len(ts.events) != 100 {
204 t.Fatalf("events not propagated: %d != %d", len(ts.events), 100)
205 }
206 }
207
208 type testSink struct {
209 events []Event
210 mu sync.Mutex
211 closed bool
212 }
213
214 func (ts *testSink) Write(events ...Event) error {
215 ts.mu.Lock()
216 defer ts.mu.Unlock()
217 ts.events = append(ts.events, events...)
218 return nil
219 }
220
221 func (ts *testSink) Close() error {
222 ts.mu.Lock()
223 defer ts.mu.Unlock()
224 ts.closed = true
225
226 logrus.Infof("closing testSink")
227 return nil
228 }
229
230 type delayedSink struct {
231 Sink
232 delay time.Duration
233 }
234
235 func (ds *delayedSink) Write(events ...Event) error {
236 time.Sleep(ds.delay)
237 return ds.Sink.Write(events...)
238 }
239
240 type flakySink struct {
241 Sink
242 rate float64
243 }
244
245 func (fs *flakySink) Write(events ...Event) error {
246 if rand.Float64() < fs.rate {
247 return fmt.Errorf("error writing %d events", len(events))
248 }
249
250 return fs.Sink.Write(events...)
251 }
252
253 func checkClose(t *testing.T, sink Sink) {
254 if err := sink.Close(); err != nil {
255 t.Fatalf("unexpected error closing: %v", err)
256 }
257
258
259 if err := sink.Close(); err == nil {
260 t.Fatalf("no error on double close")
261 }
262
263
264 if err := sink.Write([]Event{}...); err == nil {
265 t.Fatalf("write after closed did not have an error")
266 } else if err != ErrSinkClosed {
267 t.Fatalf("error should be ErrSinkClosed")
268 }
269 }
270
View as plain text