1 package pstest
2
3 import (
4 "context"
5 "errors"
6 "os"
7 "testing"
8 "time"
9
10 "cloud.google.com/go/pubsub"
11 "google.golang.org/api/option"
12 "gotest.tools/v3/assert"
13 "gotest.tools/v3/poll"
14
15 "edge-infra.dev/test/f2"
16 "edge-infra.dev/test/f2/x/pstest"
17 )
18
19 const (
20 delay = 10 * time.Millisecond
21 timeout = 500 * time.Millisecond
22 )
23
24 var f f2.Framework
25
26 func TestMain(m *testing.M) {
27 f = f2.New(context.Background(), f2.WithExtensions(pstest.New(pstest.WithProjectID("test_id")))).
28 WithLabel("foo", "bar").
29 Flaky().
30 Setup().
31 Teardown()
32 os.Exit(f.Run(m))
33 }
34
35 func TestPstest(t *testing.T) {
36 psEmu := f2.NewFeature("PubSub Emulator").
37 Setup("Create Topic", pstest.WithNewTopic("topic")).
38 Setup("Create Subscription to Topic",
39 pstest.WithNewSubscription("sub", "topic",
40
41
42 pstest.WithAckDeadline(time.Minute),
43 pstest.WithExpirationPolicy(time.Duration(0)))).
44 Test("Successfully receive a published message", func(ctx f2.Context, t *testing.T) f2.Context {
45 topicID, subID := "topic", "sub"
46
47
48 ps := pstest.FromContextT(ctx, t)
49
50
51 messageReceived := false
52 var receivedMessage *pubsub.Message
53 subCtx, cancelfunc := context.WithCancel(ctx)
54 go func() {
55 err := ps.Subscribe(subCtx, subID, func(ctx context.Context, msg *pubsub.Message) {
56 messageReceived = true
57 receivedMessage = msg
58 msg.Ack()
59 })
60 if err != nil {
61 assert.ErrorIs(t, err, context.Canceled)
62 }
63 }()
64
65
66 data, attr := "hello world", map[string]string{"attr": "value"}
67 msgID, err := ps.Publish(ctx, topicID, data, attr)
68 assert.NilError(t, err)
69
70
71 poll.WaitOn(
72 t,
73 func(t poll.LogT) poll.Result {
74 if messageReceived {
75 return poll.Success()
76 }
77 return poll.Continue("waiting...")
78 },
79 poll.WithDelay(delay),
80 poll.WithTimeout(timeout),
81 )
82
83 assert.Equal(t, data, string(receivedMessage.Data))
84 assert.DeepEqual(t, attr, receivedMessage.Attributes)
85 assert.Equal(t, msgID, receivedMessage.ID)
86
87
88 cancelfunc()
89 poll.WaitOn(
90 t,
91 func(t poll.LogT) poll.Result {
92 if errors.Is(subCtx.Err(), context.Canceled) {
93 return poll.Success()
94 }
95 return poll.Continue("waiting...")
96 },
97 poll.WithDelay(delay),
98 poll.WithTimeout(timeout),
99 )
100
101 return ctx
102 }).
103 Test("Use external client", func(ctx f2.Context, t *testing.T) f2.Context {
104 ps := pstest.FromContextT(ctx, t)
105
106
107 client, err := pubsub.NewClient(ctx, "project", option.WithGRPCConn(ps.Conn))
108 assert.NilError(t, err)
109
110
111 top, err := client.CreateTopic(ctx, "different-topic")
112 assert.NilError(t, err)
113 sub, err := client.CreateSubscription(ctx, "different-sub", pubsub.SubscriptionConfig{Topic: top})
114 assert.NilError(t, err)
115
116
117 messageReceived := false
118 subCtx, cancelfunc := context.WithCancel(ctx)
119 go func() {
120 err = sub.Receive(subCtx, func(ctx context.Context, msg *pubsub.Message) {
121 messageReceived = true
122 msg.Ack()
123 })
124 if err != nil {
125 assert.ErrorIs(t, err, context.Canceled)
126 }
127 }()
128 top.Publish(ctx, &pubsub.Message{Data: []byte("hello world")})
129
130 poll.WaitOn(
131 t,
132 func(t poll.LogT) poll.Result {
133 if messageReceived {
134 return poll.Success()
135 }
136 return poll.Continue("waiting...")
137 },
138 poll.WithDelay(delay),
139 poll.WithTimeout(timeout),
140 )
141
142 cancelfunc()
143 poll.WaitOn(
144 t,
145 func(t poll.LogT) poll.Result {
146 if errors.Is(subCtx.Err(), context.Canceled) {
147 return poll.Success()
148 }
149 return poll.Continue("waiting...")
150 },
151 poll.WithDelay(delay),
152 poll.WithTimeout(timeout),
153 )
154
155 return ctx
156 }).
157 Feature()
158 f.Test(t, psEmu)
159 }
160
View as plain text