1
2 package server_test
3
4 import (
5 "context"
6 "sync"
7 "testing"
8 "time"
9
10 server "edge-infra.dev/pkg/edge/psqlinjector"
11 "edge-infra.dev/test/f2"
12 "edge-infra.dev/test/f2/x/pstest"
13
14 "cloud.google.com/go/pubsub"
15 "github.com/stretchr/testify/require"
16 "google.golang.org/api/option"
17 )
18
19
20 func TestReceiverMuxHappy(t *testing.T) {
21 var cfg = new(server.Config)
22 var bannerProjectIDs = []string{"foo", "bar", "baz"}
23 var pollFunc = func(_ context.Context) ([]string, error) {
24 return bannerProjectIDs, nil
25 }
26
27 var got = struct {
28 sync.Mutex
29 sync.WaitGroup
30 attrs map[string]string
31 }{
32 attrs: make(map[string]string),
33 }
34 var handlerFunc = func(_ context.Context, msg *pubsub.Message) error {
35 t.Logf("message %q with attributes %v", msg.ID, msg.Attributes)
36 defer got.Done()
37 got.Lock()
38 defer got.Unlock()
39 for k, v := range msg.Attributes {
40 got.attrs[k] = v
41 }
42 return nil
43 }
44
45
46 var start = time.Now().UTC().Truncate(time.Microsecond)
47 t.Logf("start time %v", start)
48
49
50 var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
51 var runErrCh = make(chan error, 1)
52
53 var client *pubsub.Client
54
55 var feat = f2.NewFeature(t.Name()).
56 Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
57 return setupConfig(ctx, t, cfg)
58 }).
59 Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
60 return createKinformTopics(ctx, t, cfg, bannerProjectIDs...)
61 }).
62 Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
63 ps := pstest.FromContextT(ctx, t)
64 var err error
65 client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
66 require.NoError(t, err)
67 return ctx
68 }).
69 Test("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
70 var bmcfg = server.ReceiverMuxConfig{
71 ForemanProjectID: cfg.ForemanProjectID,
72 SubscriptionID: cfg.SubscriptionID,
73 TopicID: cfg.TopicID,
74 Conn: cfg.TestPubSubConn,
75 PollPeriod: cfg.PollBannersPeriod,
76 PollMaxRetries: cfg.PollBannersMaxRetries,
77 PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
78
79 PollFunc: pollFunc,
80 Handler: handlerFunc,
81 }
82 rm, err := server.NewReceiverMux(&bmcfg)
83 require.NoError(t, err)
84 go func() {
85 err := rm.Run(doneCtx)
86 runErrCh <- err
87 }()
88 return ctx
89 }).
90 Test("send message to each subscription", func(ctx f2.Context, t *testing.T) f2.Context {
91 for _, banner := range bannerProjectIDs {
92 got.Add(1)
93
94 var topic = client.TopicInProject(cfg.TopicID, banner)
95
96
97 exists, err := topic.Exists(ctx)
98 require.NoError(t, err)
99 require.True(t, exists)
100
101 topic.Publish(ctx, &pubsub.Message{
102 Data: []byte(banner),
103 Attributes: map[string]string{
104 banner: "",
105 },
106 })
107 }
108 return ctx
109 }).
110 Test("verify message from each subscription", func(ctx f2.Context, t *testing.T) f2.Context {
111 got.Wait()
112 got.Lock()
113 defer got.Unlock()
114 for _, banner := range bannerProjectIDs {
115 _, found := got.attrs[banner]
116 require.True(t, found, "did not find attribute with banner")
117 }
118 return ctx
119 }).
120 Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
121 stopReceiverMux()
122 require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
123 return ctx
124 }).
125 Feature()
126
127 f2f.Test(t, feat)
128 }
129
130
131 func TestReceiverMuxDynamicAdd(t *testing.T) {
132 var cfg = new(server.Config)
133
134 var bannerProjectIDs = struct {
135 sync.Mutex
136 ids []string
137 }{
138 ids: []string{"foo", "bar", "baz"},
139 }
140 var pollFunc = func(_ context.Context) ([]string, error) {
141 bannerProjectIDs.Lock()
142 defer bannerProjectIDs.Unlock()
143 return bannerProjectIDs.ids, nil
144 }
145
146 var got = struct {
147 sync.Mutex
148 sync.WaitGroup
149 attrs map[string]string
150 }{
151 attrs: make(map[string]string),
152 }
153 var handlerFunc = func(_ context.Context, msg *pubsub.Message) error {
154 t.Logf("message %q with attributes %v", msg.ID, msg.Attributes)
155 defer got.Done()
156 got.Lock()
157 defer got.Unlock()
158 for k, v := range msg.Attributes {
159 got.attrs[k] = v
160 }
161 return nil
162 }
163
164
165 var start = time.Now().UTC().Truncate(time.Microsecond)
166 t.Logf("start time %v", start)
167
168
169 var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
170 var runErrCh = make(chan error, 1)
171
172 var client *pubsub.Client
173
174 var feat = f2.NewFeature(t.Name()).
175 Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
176 return setupConfig(ctx, t, cfg)
177 }).
178 Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
179 return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...)
180 }).
181 Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
182 ps := pstest.FromContextT(ctx, t)
183 var err error
184 client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
185 require.NoError(t, err)
186 return ctx
187 }).
188 Test("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
189 var bmcfg = server.ReceiverMuxConfig{
190 ForemanProjectID: cfg.ForemanProjectID,
191 SubscriptionID: cfg.SubscriptionID,
192 Conn: cfg.TestPubSubConn,
193 TopicID: cfg.TopicID,
194 PollPeriod: cfg.PollBannersPeriod,
195 PollMaxRetries: cfg.PollBannersMaxRetries,
196 PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
197
198 Handler: handlerFunc,
199 PollFunc: pollFunc,
200 }
201 rm, err := server.NewReceiverMux(&bmcfg)
202 require.NoError(t, err)
203 go func() {
204 err := rm.Run(doneCtx)
205 runErrCh <- err
206 }()
207 return ctx
208 }).
209 Test("send message to each subscription", func(ctx f2.Context, t *testing.T) f2.Context {
210 var v = "one"
211 for _, banner := range bannerProjectIDs.ids {
212 got.Add(1)
213
214 var topic = client.TopicInProject(cfg.TopicID, banner)
215
216
217 exists, err := topic.Exists(ctx)
218 require.NoError(t, err)
219 require.True(t, exists)
220
221 topic.Publish(ctx, &pubsub.Message{
222 Data: []byte(banner),
223 Attributes: map[string]string{
224 banner: v,
225 },
226 })
227
228 got.Wait()
229 value, found := got.attrs[banner]
230 require.True(t, found, "did not find attribute with banner")
231 require.Equal(t, value, v)
232 }
233 return ctx
234 }).
235 Test("create new banner and verify messages", func(ctx f2.Context, t *testing.T) f2.Context {
236 var newBanner = "dynamic"
237 ctx = createKinformTopics(ctx, t, cfg, newBanner)
238
239 bannerProjectIDs.Lock()
240 bannerProjectIDs.ids = append(bannerProjectIDs.ids, newBanner)
241 bannerProjectIDs.Unlock()
242
243 <-time.After(time.Second)
244 got.Add(len(bannerProjectIDs.ids))
245 for _, banner := range bannerProjectIDs.ids {
246 var topic = client.TopicInProject(cfg.TopicID, banner)
247
248
249 exists, err := topic.Exists(ctx)
250 require.NoError(t, err)
251 require.True(t, exists)
252
253 topic.Publish(ctx, &pubsub.Message{
254 Data: []byte(banner),
255 Attributes: map[string]string{
256 banner: "two",
257 },
258 })
259 }
260
261 got.Wait()
262 for _, banner := range bannerProjectIDs.ids {
263 value, found := got.attrs[banner]
264 require.True(t, found, "did not find attribute with banner")
265 require.Equal(t, value, "two")
266 }
267
268 return ctx
269 }).
270 Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
271 stopReceiverMux()
272 require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
273 return ctx
274 }).
275 Feature()
276
277 f2f.Test(t, feat)
278 }
279
280
281 func TestReceiverMuxDynamicRemove(t *testing.T) {
282 var cfg = new(server.Config)
283
284 var bannerProjectIDs = struct {
285 sync.Mutex
286 ids []string
287 }{
288 ids: []string{"foo", "bar", "baz"},
289 }
290 var pollFunc = func(_ context.Context) ([]string, error) {
291 bannerProjectIDs.Lock()
292 defer bannerProjectIDs.Unlock()
293 return bannerProjectIDs.ids, nil
294 }
295
296 var got = struct {
297 sync.Mutex
298 sync.WaitGroup
299 attrs map[string]string
300 }{
301 attrs: make(map[string]string),
302 }
303 var handlerFunc = func(_ context.Context, msg *pubsub.Message) error {
304 defer got.Done()
305 got.Lock()
306 defer got.Unlock()
307 for k, v := range msg.Attributes {
308 got.attrs[k] = v
309 }
310 return nil
311 }
312
313
314 var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
315 var runErrCh = make(chan error, 1)
316
317 var client *pubsub.Client
318
319 var feat = f2.NewFeature(t.Name()).
320 Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
321 return setupConfig(ctx, t, cfg)
322 }).
323 Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
324 return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...)
325 }).
326 Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
327 ps := pstest.FromContextT(ctx, t)
328 var err error
329 client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
330 require.NoError(t, err)
331 return ctx
332 }).
333 Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
334 var bmcfg = server.ReceiverMuxConfig{
335 SubscriptionID: cfg.SubscriptionID,
336 ForemanProjectID: cfg.ForemanProjectID,
337 Conn: cfg.TestPubSubConn,
338 PollPeriod: cfg.PollBannersPeriod,
339 PollMaxRetries: cfg.PollBannersMaxRetries,
340 PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
341 TopicID: cfg.TopicID,
342
343 PollFunc: pollFunc,
344 Handler: handlerFunc,
345 }
346 rm, err := server.NewReceiverMux(&bmcfg)
347 require.NoError(t, err)
348 go func() {
349 err := rm.Run(doneCtx)
350 runErrCh <- err
351 }()
352 return ctx
353 }).
354 Test("delete a banner and wait until messages are no longer received", func(ctx f2.Context, t *testing.T) f2.Context {
355 var removed = bannerProjectIDs.ids[0]
356 var topic = client.TopicInProject(cfg.TopicID, removed)
357 exists, err := topic.Exists(ctx)
358 require.NoError(t, err)
359 require.True(t, exists)
360
361 var removeBannerOnce sync.Once
362 var receivedCount int
363 for {
364 got.Add(1)
365 topic.Publish(ctx, &pubsub.Message{
366 Data: []byte(removed),
367 })
368
369 var stillReceiving = make(chan struct{})
370 go func() {
371 got.Wait()
372 close(stillReceiving)
373 removeBannerOnce.Do(func() {
374 bannerProjectIDs.Lock()
375 bannerProjectIDs.ids = bannerProjectIDs.ids[1:]
376 bannerProjectIDs.Unlock()
377 })
378 }()
379
380 select {
381 case <-time.After(time.Second):
382 t.Logf("receiver mux dynamically unsubscribed")
383
384 t.Logf("received count: %d", receivedCount)
385 require.True(t, receivedCount > 0)
386
387 got.Done()
388 <-stillReceiving
389 return ctx
390 case <-stillReceiving:
391
392 receivedCount++
393 }
394 }
395 }).
396 Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context {
397 var v = "others still working"
398
399 got.Add(len(bannerProjectIDs.ids))
400 for _, banner := range bannerProjectIDs.ids {
401 var topic = client.TopicInProject(cfg.TopicID, banner)
402 exists, err := topic.Exists(ctx)
403 require.NoError(t, err)
404 require.True(t, exists)
405
406 topic.Publish(ctx, &pubsub.Message{
407 Data: []byte(banner),
408 Attributes: map[string]string{
409 banner: v,
410 },
411 })
412 }
413
414 got.Wait()
415 for _, banner := range bannerProjectIDs.ids {
416 value, found := got.attrs[banner]
417 require.True(t, found, "did not find attribute with banner")
418 require.Equal(t, value, v)
419 }
420 return ctx
421 }).
422 Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
423 stopReceiverMux()
424 require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
425 return ctx
426 }).
427 Feature()
428
429 f2f.Test(t, feat)
430 }
431
432
433 func TestReceiverMuxDynamicRemoveOnReceiveError(t *testing.T) {
434 var cfg = new(server.Config)
435
436 var bannerProjectIDs = struct {
437 sync.Mutex
438 ids []string
439 }{
440 ids: []string{"foo", "bar", "baz"},
441 }
442 var pollFunc = func(_ context.Context) ([]string, error) {
443 bannerProjectIDs.Lock()
444 defer bannerProjectIDs.Unlock()
445 return bannerProjectIDs.ids, nil
446 }
447
448 var got struct {
449 sync.Mutex
450 sync.WaitGroup
451 }
452 const markDone = "markDone"
453 const panicCanary = "shouldBeDropped"
454 var handlerFunc = func(_ context.Context, msg *pubsub.Message) error {
455 for k := range msg.Attributes {
456 switch k {
457 case markDone:
458 defer got.Done()
459 case panicCanary:
460 panic("should not have received this message")
461 }
462 }
463 return nil
464 }
465
466
467 var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
468 var runErrCh = make(chan error, 1)
469
470 var client *pubsub.Client
471
472 var feat = f2.NewFeature(t.Name()).
473 Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
474 return setupConfig(ctx, t, cfg)
475 }).
476 Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
477 return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...)
478 }).
479 Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
480 ps := pstest.FromContextT(ctx, t)
481 var err error
482 client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
483 require.NoError(t, err)
484 return ctx
485 }).
486 Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
487 var bmcfg = server.ReceiverMuxConfig{
488 ForemanProjectID: cfg.ForemanProjectID,
489 SubscriptionID: cfg.SubscriptionID,
490 TopicID: cfg.TopicID,
491 Conn: cfg.TestPubSubConn,
492 PollMaxRetries: cfg.PollBannersMaxRetries,
493 PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
494 PollFunc: pollFunc,
495 PollPeriod: cfg.PollBannersPeriod,
496 Handler: handlerFunc,
497 }
498 rm, err := server.NewReceiverMux(&bmcfg)
499 require.NoError(t, err)
500
501 go func() {
502 err := rm.Run(doneCtx)
503 runErrCh <- err
504 }()
505 return ctx
506 }).
507 Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context {
508 got.Add(len(bannerProjectIDs.ids))
509 for _, banner := range bannerProjectIDs.ids {
510 var topic = client.TopicInProject(cfg.TopicID, banner)
511 exists, err := topic.Exists(ctx)
512 require.NoError(t, err)
513 require.True(t, exists)
514
515 topic.Publish(ctx, &pubsub.Message{
516 Data: []byte(banner),
517 Attributes: map[string]string{
518 markDone: "yes",
519 },
520 })
521 }
522
523 got.Wait()
524 return ctx
525 }).
526 Test("delete a banner subscription and send canary message", func(ctx f2.Context, t *testing.T) f2.Context {
527 var broken = bannerProjectIDs.ids[0]
528 var topic = client.TopicInProject(cfg.TopicID, broken)
529 exists, err := topic.Exists(ctx)
530 require.NoError(t, err)
531 require.True(t, exists)
532
533 err = client.SubscriptionInProject(cfg.SubscriptionID, broken).Delete(ctx)
534 require.NoError(t, err)
535
536
537 topic.Publish(ctx, &pubsub.Message{
538 Data: []byte(broken),
539 Attributes: map[string]string{
540 panicCanary: "messages sent before the new subscription exists should not be received",
541 },
542 })
543 topic.Flush()
544
545
546 <-time.After(1 * time.Second)
547
548
549 topic.Publish(ctx, &pubsub.Message{
550 Data: []byte(broken),
551 Attributes: map[string]string{
552 panicCanary: "messages sent before the new subscription exists should not be received",
553 },
554 })
555 topic.Flush()
556
557 return ctx
558 }).
559 Test("recreate the banner subscription", func(ctx f2.Context, t *testing.T) f2.Context {
560 var broken = bannerProjectIDs.ids[0]
561
562 ps := pstest.FromContextT(ctx, t)
563 client2, err := pubsub.NewClient(ctx, broken, option.WithGRPCConn(ps.Conn))
564 require.NoError(t, err)
565
566 var topic = client2.Topic(cfg.TopicID)
567 exists, err := topic.Exists(ctx)
568 require.NoError(t, err)
569 require.True(t, exists)
570
571 sub, err := client2.CreateSubscription(ctx, cfg.SubscriptionID, pubsub.SubscriptionConfig{
572 Topic: topic,
573 })
574 require.NoError(t, err)
575 exists, err = sub.Exists(ctx)
576 require.NoError(t, err)
577 require.True(t, exists)
578
579 return ctx
580 }).
581 Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context {
582 got.Add(len(bannerProjectIDs.ids))
583 for _, banner := range bannerProjectIDs.ids {
584 var topic = client.TopicInProject(cfg.TopicID, banner)
585 exists, err := topic.Exists(ctx)
586 require.NoError(t, err)
587 require.True(t, exists)
588
589 topic.Publish(ctx, &pubsub.Message{
590 Data: []byte(banner),
591 Attributes: map[string]string{
592 markDone: "yes",
593 },
594 })
595 }
596
597 got.Wait()
598 t.Logf("receiver mux is receiving from all subscriptions again")
599 return ctx
600 }).
601 Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
602 stopReceiverMux()
603 require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
604 return ctx
605 }).
606 Feature()
607
608 f2f.Test(t, feat)
609 }
610
611
612 func TestReceiverMuxSubDoesNotExistAtStartup(t *testing.T) {
613 var cfg = new(server.Config)
614
615 var bannerProjectIDs = struct {
616 sync.Mutex
617 ids []string
618 }{
619 ids: []string{"foo", "bar", "baz"},
620 }
621 var pollFunc = func(_ context.Context) ([]string, error) {
622 bannerProjectIDs.Lock()
623 defer bannerProjectIDs.Unlock()
624 return bannerProjectIDs.ids, nil
625 }
626
627 var got struct {
628 sync.Mutex
629 sync.WaitGroup
630 }
631 var handlerFunc = func(_ context.Context, _ *pubsub.Message) error {
632 got.Done()
633 return nil
634 }
635
636
637 var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
638 var runErrCh = make(chan error, 1)
639
640 var client *pubsub.Client
641
642 var feat = f2.NewFeature(t.Name()).
643 Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
644 return setupConfig(ctx, t, cfg)
645 }).
646 Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
647 return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids[1:]...)
648 }).
649 Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
650 ps := pstest.FromContextT(ctx, t)
651 var err error
652 client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
653 require.NoError(t, err)
654 return ctx
655 }).
656 Setup("topic without subscription", func(ctx f2.Context, t *testing.T) f2.Context {
657 ps := pstest.FromContextT(ctx, t)
658 client2, err := pubsub.NewClient(ctx, bannerProjectIDs.ids[0], option.WithGRPCConn(ps.Conn))
659 require.NoError(t, err)
660
661 topic, err := client2.CreateTopic(ctx, cfg.TopicID)
662 require.NoError(t, err, "error creating pubsub topic")
663
664 exists, err := topic.Exists(ctx)
665 require.NoError(t, err, "error checking if topic exists")
666 require.True(t, exists, "topic does not exist")
667
668 return ctx
669 }).
670 Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
671 var bmcfg = server.ReceiverMuxConfig{
672 ForemanProjectID: cfg.ForemanProjectID,
673 SubscriptionID: cfg.SubscriptionID,
674 TopicID: cfg.TopicID,
675 Conn: cfg.TestPubSubConn,
676 PollPeriod: cfg.PollBannersPeriod,
677 PollMaxRetries: cfg.PollBannersMaxRetries,
678 PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
679 PollFunc: pollFunc,
680 Handler: handlerFunc,
681 }
682 rm, err := server.NewReceiverMux(&bmcfg)
683 require.NoError(t, err)
684 go func() {
685 err := rm.Run(doneCtx)
686 runErrCh <- err
687 }()
688 return ctx
689 }).
690 Test("send message to all banners", func(ctx f2.Context, t *testing.T) f2.Context {
691 got.Add(len(bannerProjectIDs.ids))
692 for _, banner := range bannerProjectIDs.ids {
693 var topic = client.TopicInProject(cfg.TopicID, banner)
694 exists, err := topic.Exists(ctx)
695 require.NoError(t, err)
696 require.True(t, exists)
697
698 topic.Publish(ctx, &pubsub.Message{
699 Data: []byte(banner),
700 })
701 }
702
703
704 go func() {
705 <-time.After(time.Second)
706 got.Done()
707 }()
708
709 got.Wait()
710 return ctx
711 }).
712 Test("create the missing banner subscription", func(ctx f2.Context, t *testing.T) f2.Context {
713 ps := pstest.FromContextT(ctx, t)
714 client2, err := pubsub.NewClient(ctx, bannerProjectIDs.ids[0], option.WithGRPCConn(ps.Conn))
715
716 require.NoError(t, err)
717
718 var topic = client2.Topic(cfg.TopicID)
719 exists, err := topic.Exists(ctx)
720 require.NoError(t, err)
721 require.True(t, exists)
722
723 sub, err := client2.CreateSubscription(ctx, cfg.SubscriptionID, pubsub.SubscriptionConfig{
724 Topic: topic,
725 })
726 require.NoError(t, err)
727 exists, err = sub.Exists(ctx)
728 require.NoError(t, err)
729 require.True(t, exists)
730
731 return ctx
732 }).
733 Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context {
734 got.Add(len(bannerProjectIDs.ids))
735 for _, banner := range bannerProjectIDs.ids {
736 var topic = client.TopicInProject(cfg.TopicID, banner)
737 exists, err := topic.Exists(ctx)
738 require.NoError(t, err)
739 require.True(t, exists)
740
741 topic.Publish(ctx, &pubsub.Message{
742 Data: []byte(banner),
743 })
744 }
745
746 got.Wait()
747 t.Logf("receiver mux is receiving from all subscriptions again")
748 return ctx
749 }).
750 Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
751 stopReceiverMux()
752 require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
753 return ctx
754 }).
755 Feature()
756
757 f2f.Test(t, feat)
758 }
759
View as plain text