...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub_test
16
17 import (
18 "context"
19 "strconv"
20 "sync"
21 "testing"
22
23 "cloud.google.com/go/internal/testutil"
24 "cloud.google.com/go/pubsub"
25 "cloud.google.com/go/pubsub/pstest"
26 "google.golang.org/api/option"
27 "google.golang.org/grpc"
28 )
29
30
31
32
33
34
35 func withGRPCHeadersAssertionAlt(t *testing.T, opts ...option.ClientOption) []option.ClientOption {
36 grpcHeadersEnforcer := &testutil.HeadersEnforcer{
37 OnFailure: t.Fatalf,
38 Checkers: []*testutil.HeaderChecker{
39 testutil.XGoogClientHeaderChecker,
40 },
41 }
42 return append(grpcHeadersEnforcer.CallOptions(), opts...)
43 }
44
45 func TestPSTest(t *testing.T) {
46 t.Parallel()
47 ctx := context.Background()
48 srv := pstest.NewServer()
49 defer srv.Close()
50
51 conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
52 if err != nil {
53 panic(err)
54 }
55 defer conn.Close()
56
57 opts := withGRPCHeadersAssertionAlt(t, option.WithGRPCConn(conn))
58 client, err := pubsub.NewClient(ctx, "some-project", opts...)
59 if err != nil {
60 panic(err)
61 }
62 defer client.Close()
63
64 topic, err := client.CreateTopic(ctx, "test-topic")
65 if err != nil {
66 panic(err)
67 }
68
69 sub, err := client.CreateSubscription(ctx, "sub-name", pubsub.SubscriptionConfig{Topic: topic})
70 if err != nil {
71 panic(err)
72 }
73
74 go func() {
75 for i := 0; i < 10; i++ {
76 srv.Publish("projects/some-project/topics/test-topic", []byte(strconv.Itoa(i)), nil)
77 }
78 }()
79
80 ctx, cancel := context.WithCancel(ctx)
81 var mu sync.Mutex
82 count := 0
83 err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
84 mu.Lock()
85 count++
86 if count >= 10 {
87 cancel()
88 }
89 mu.Unlock()
90 m.Ack()
91 })
92 if err != nil {
93 panic(err)
94 }
95 }
96
View as plain text