1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package longtest_test
16
17 import (
18 "context"
19 "fmt"
20 "log"
21 "math/rand"
22 "strconv"
23 "strings"
24 "sync"
25 "testing"
26 "time"
27
28 "cloud.google.com/go/internal/testutil"
29 "cloud.google.com/go/pubsub"
30 "google.golang.org/api/iterator"
31 "google.golang.org/api/option"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/status"
34 )
35
36 const (
37 timeout = time.Minute * 10
38 ackDeadline = time.Second * 10
39 nMessages = 1e4
40 acceptableDupPercentage = 1
41 numAcceptableDups = int(nMessages * acceptableDupPercentage / 100)
42 resourcePrefix = "endtoend"
43 )
44
45
46
47
48 func TestEndToEnd_Dupes(t *testing.T) {
49 t.Skip("https://github.com/googleapis/google-cloud-go/issues/1752")
50
51 ctx, cancel := context.WithTimeout(context.Background(), timeout)
52 defer cancel()
53 client, topic, cleanup := prepareEndToEndTest(ctx, t)
54 defer cleanup()
55 subPrefix := fmt.Sprintf("%s-%d", resourcePrefix, time.Now().UnixNano())
56
57
58 var err error
59 var subs [2]*pubsub.Subscription
60 for i := 0; i < len(subs); i++ {
61 subs[i], err = client.CreateSubscription(ctx, fmt.Sprintf("%s-%d", subPrefix, i), pubsub.SubscriptionConfig{
62 Topic: topic,
63 AckDeadline: ackDeadline,
64 })
65 if err != nil {
66 t.Fatalf("CreateSub error: %v", err)
67 }
68 defer subs[i].Delete(ctx)
69 }
70
71 err = publish(ctx, topic, nMessages)
72 topic.Stop()
73 if err != nil {
74 t.Fatalf("publish: %v", err)
75 }
76
77
78 recv := make(chan struct{})
79
80
81 var wg sync.WaitGroup
82 cctx, cancel := context.WithTimeout(ctx, timeout)
83 defer cancel()
84
85 consumers := []*consumer{
86 {
87 counts: make(map[string]int),
88 recv: recv,
89 durations: []time.Duration{time.Hour},
90 done: make(chan struct{}),
91 },
92 {
93 counts: make(map[string]int),
94 recv: recv,
95 durations: []time.Duration{ackDeadline, ackDeadline, ackDeadline / 2, ackDeadline / 2, time.Hour},
96 done: make(chan struct{}),
97 },
98 }
99 for i, con := range consumers {
100 con := con
101 sub := subs[i]
102 wg.Add(1)
103 go func() {
104 defer wg.Done()
105 con.consume(ctx, t, sub)
106 }()
107 }
108
109
110
111
112
113
114 quiescenceDur := ackDeadline * 6
115 quiescenceTimer := time.NewTimer(quiescenceDur)
116
117 loop:
118 for {
119 select {
120 case <-recv:
121
122
123
124 if !quiescenceTimer.Stop() {
125 <-quiescenceTimer.C
126 }
127 quiescenceTimer.Reset(quiescenceDur)
128
129 case <-quiescenceTimer.C:
130 cancel()
131 log.Println("quiesced")
132 break loop
133
134 case <-cctx.Done():
135 t.Fatal("timed out")
136 }
137 }
138 wg.Wait()
139 close(recv)
140 for i, con := range consumers {
141 var numDups int
142 var zeroes int
143 for _, v := range con.counts {
144 if v == 0 {
145 zeroes++
146 }
147 numDups += v - 1
148 }
149
150 if zeroes > 0 {
151 t.Errorf("Consumer %d: %d messages never arrived", i, zeroes)
152 } else if numDups > numAcceptableDups {
153 t.Errorf("Consumer %d: Willing to accept %d dups (%v%% duplicated of %d messages), but got %d", i, numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
154 }
155 }
156
157 for i, con := range consumers {
158 select {
159 case <-con.done:
160 case <-time.After(15 * time.Second):
161 t.Fatalf("timed out waiting for consumer %d to finish", i)
162 }
163 }
164 }
165
166 func TestEndToEnd_LongProcessingTime(t *testing.T) {
167 ctx, cancel := context.WithTimeout(context.Background(), timeout)
168 defer cancel()
169 client, topic, cleanup := prepareEndToEndTest(ctx, t)
170 defer cleanup()
171 subPrefix := fmt.Sprintf("%s-%d", resourcePrefix, time.Now().UnixNano())
172
173
174 sub, err := client.CreateSubscription(ctx, subPrefix+"-00", pubsub.SubscriptionConfig{
175 Topic: topic,
176 AckDeadline: ackDeadline,
177 })
178 if err != nil {
179 t.Fatalf("CreateSub error: %v", err)
180 }
181 defer sub.Delete(ctx)
182
183
184 sub.ReceiveSettings.Synchronous = true
185 sub.ReceiveSettings.MaxOutstandingMessages = 500
186
187 err = publish(ctx, topic, 500)
188 topic.Stop()
189 if err != nil {
190 t.Fatalf("publish: %v", err)
191 }
192
193
194 recv := make(chan struct{})
195 consumer := consumer{
196 counts: make(map[string]int),
197 recv: recv,
198 durations: []time.Duration{time.Hour},
199 processingDelay: func() time.Duration {
200 return time.Duration(1+rand.Int63n(120)) * time.Second
201 },
202 done: make(chan struct{}),
203 }
204 go consumer.consume(ctx, t, sub)
205
206
207
208
209
210
211 quiescenceDur := 12 * ackDeadline
212 quiescenceTimer := time.NewTimer(quiescenceDur)
213 loop:
214 for {
215 select {
216 case <-recv:
217
218
219
220 if !quiescenceTimer.Stop() {
221 <-quiescenceTimer.C
222 }
223 quiescenceTimer.Reset(quiescenceDur)
224
225 case <-quiescenceTimer.C:
226 cancel()
227 log.Println("quiesced")
228 break loop
229
230 case <-ctx.Done():
231 t.Fatal("timed out")
232 }
233 }
234 close(recv)
235 var numDups int
236 var zeroes int
237 for _, v := range consumer.counts {
238 if v == 0 {
239 zeroes++
240 }
241 numDups += v - 1
242 }
243
244 if zeroes > 0 {
245 t.Errorf("%d messages never arrived", zeroes)
246 } else if numDups > numAcceptableDups {
247 t.Errorf("Willing to accept %d dups (%v duplicated of %d messages), but got %d", numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
248 }
249
250 select {
251 case <-consumer.done:
252 case <-time.After(15 * time.Second):
253 t.Fatal("timed out waiting for consumer to finish")
254 }
255 }
256
257
258 func publish(ctx context.Context, topic *pubsub.Topic, n int) error {
259 var rs []*pubsub.PublishResult
260 for i := 0; i < n; i++ {
261 m := &pubsub.Message{Data: []byte(fmt.Sprintf("msg %d", i))}
262 rs = append(rs, topic.Publish(ctx, m))
263 }
264 for _, r := range rs {
265 _, err := r.Get(ctx)
266 if err != nil {
267 return err
268 }
269 }
270 return nil
271 }
272
273
274 type consumer struct {
275
276
277
278 durations []time.Duration
279
280
281 recv chan struct{}
282
283
284 processingDelay func() time.Duration
285
286 mu sync.Mutex
287 counts map[string]int
288 totalRecvd int
289
290
291 done chan struct{}
292 }
293
294
295
296 func (c *consumer) consume(ctx context.Context, t *testing.T, sub *pubsub.Subscription) {
297 defer close(c.done)
298 for _, dur := range c.durations {
299 ctx2, cancel := context.WithTimeout(ctx, dur)
300 defer cancel()
301 id := sub.String()[len(sub.String())-1:]
302 t.Logf("%s: start receive", id)
303 prev := c.totalRecvd
304 err := sub.Receive(ctx2, c.process)
305 t.Logf("%s: end receive; read %d", id, c.totalRecvd-prev)
306 if serr, _ := status.FromError(err); err != nil && serr.Code() != codes.Canceled {
307 panic(err)
308 }
309 select {
310 case <-ctx.Done():
311 return
312 default:
313 }
314 }
315 }
316
317
318 func (c *consumer) process(_ context.Context, m *pubsub.Message) {
319 c.mu.Lock()
320 c.counts[m.ID]++
321 c.totalRecvd++
322 c.mu.Unlock()
323 c.recv <- struct{}{}
324
325 var delay time.Duration
326 if c.processingDelay == nil {
327 delay = time.Duration(rand.Intn(int(ackDeadline * 3)))
328 } else {
329 delay = c.processingDelay()
330 }
331
332
333
334 time.AfterFunc(delay, func() {
335 m.Ack()
336 })
337 }
338
339
340 func prepareEndToEndTest(ctx context.Context, t *testing.T) (*pubsub.Client, *pubsub.Topic, func()) {
341 if testing.Short() {
342 t.Skip("Integration tests skipped in short mode")
343 }
344 ts := testutil.TokenSource(ctx, pubsub.ScopePubSub, pubsub.ScopeCloudPlatform)
345 if ts == nil {
346 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
347 }
348
349 now := time.Now()
350 topicName := fmt.Sprintf("%s-%d", resourcePrefix, now.UnixNano())
351
352 client, err := pubsub.NewClient(ctx, testutil.ProjID(), option.WithTokenSource(ts))
353 if err != nil {
354 t.Fatalf("Creating client error: %v", err)
355 }
356
357
358 if err := cleanupSubscription(ctx, client); err != nil {
359 t.Logf("Pre-test subscription cleanup failed: %v", err)
360 }
361 if err := cleanupTopic(ctx, client); err != nil {
362 t.Logf("Pre-test topic cleanup failed: %v", err)
363 }
364
365 var topic *pubsub.Topic
366 if topic, err = client.CreateTopic(ctx, topicName); err != nil {
367 t.Fatalf("CreateTopic error: %v", err)
368 }
369
370 return client, topic, func() {
371 topic.Delete(ctx)
372 client.Close()
373 }
374 }
375
376
377 func cleanupTopic(ctx context.Context, client *pubsub.Client) error {
378 if testing.Short() {
379 return nil
380 }
381
382 const expireAge = 24 * time.Hour
383
384 it := client.Topics(ctx)
385 for {
386 t, err := it.Next()
387 if err == iterator.Done {
388 break
389 }
390 if err != nil {
391 return err
392 }
393
394 tID := t.ID()
395 p := strings.Split(tID, "-")
396
397
398
399 if p[0] == resourcePrefix {
400 tCreated := p[len(p)-1]
401 timestamp, err := strconv.ParseInt(tCreated, 10, 64)
402 if err != nil {
403 continue
404 }
405 timeTCreated := time.Unix(0, timestamp)
406 if time.Since(timeTCreated) > expireAge {
407 log.Printf("deleting topic %q", tID)
408 if err := t.Delete(ctx); err != nil {
409 return fmt.Errorf("Delete topic: %v: %v", t.String(), err)
410 }
411 }
412 }
413 }
414 return nil
415 }
416
417
418 func cleanupSubscription(ctx context.Context, client *pubsub.Client) error {
419 if testing.Short() {
420 return nil
421 }
422
423 const expireAge = 24 * time.Hour
424
425 it := client.Subscriptions(ctx)
426 for {
427 s, err := it.Next()
428 if err == iterator.Done {
429 break
430 }
431 if err != nil {
432 return err
433 }
434 sID := s.ID()
435 p := strings.Split(sID, "-")
436
437
438
439 if p[0] == resourcePrefix {
440 sCreated := p[len(p)-2]
441 timestamp, err := strconv.ParseInt(sCreated, 10, 64)
442 if err != nil {
443 continue
444 }
445 timeSCreated := time.Unix(0, timestamp)
446 if time.Since(timeSCreated) > expireAge {
447 log.Printf("deleting subscription %q", sID)
448 if err := s.Delete(ctx); err != nil {
449 return fmt.Errorf("Delete subscription: %v: %v", s.String(), err)
450 }
451 }
452 }
453 }
454 return nil
455 }
456
View as plain text