1
2
3 package server_test
4
5 import (
6 "context"
7 "fmt"
8 "sync"
9 "testing"
10 "time"
11
12 server "edge-infra.dev/pkg/edge/psqlinjector"
13 "edge-infra.dev/test/f2"
14 "edge-infra.dev/test/f2/x/pstest"
15
16 "cloud.google.com/go/pubsub"
17 "github.com/google/uuid"
18 "github.com/stretchr/testify/require"
19 "google.golang.org/api/option"
20 )
21
22 func TestReceiverMuxHealthCheck(t *testing.T) {
23 var cfg = new(server.Config)
24
25 var bannerProjectIDs struct {
26 sync.Mutex
27 ids []string
28 }
29
30 for range 20 {
31 bannerProjectIDs.ids = append(bannerProjectIDs.ids, fmt.Sprintf("sub%s", uuid.New()))
32 }
33
34 var pollFunc = func(_ context.Context) ([]string, error) {
35 bannerProjectIDs.Lock()
36 defer bannerProjectIDs.Unlock()
37 return bannerProjectIDs.ids, nil
38 }
39
40 var got struct {
41 sync.Mutex
42 sync.WaitGroup
43 }
44 const markNacked = "nacked"
45 var handlerFunc = func(_ context.Context, msg *pubsub.Message) error {
46 defer got.Done()
47 for k := range msg.Attributes {
48 if k == markNacked {
49 return fmt.Errorf("nacked")
50 }
51 }
52 return nil
53 }
54
55
56 var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
57 var runErrCh = make(chan error, 1)
58
59 var client *pubsub.Client
60
61 var getHealthCheck func(*testing.T) server.ReceiverMuxHealthCheck
62
63 var feat = f2.NewFeature(t.Name()).
64 Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
65 return setupConfig(ctx, t, cfg)
66 }).
67 Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
68 return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...)
69 }).
70 Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
71 ps := pstest.FromContextT(ctx, t)
72 var err error
73 client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
74 require.NoError(t, err)
75 return ctx
76 }).
77 Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
78 var bmcfg = server.ReceiverMuxConfig{
79 ForemanProjectID: cfg.ForemanProjectID,
80 SubscriptionID: cfg.SubscriptionID,
81 TopicID: cfg.TopicID,
82 Conn: cfg.TestPubSubConn,
83 PollPeriod: cfg.PollBannersPeriod,
84 PollMaxRetries: cfg.PollBannersMaxRetries,
85 PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
86 PollFunc: pollFunc,
87 Handler: handlerFunc,
88 }
89 rm, err := server.NewReceiverMux(&bmcfg)
90 require.NoError(t, err)
91
92 getHealthCheck = func(t *testing.T) server.ReceiverMuxHealthCheck {
93 var bmhc = rm.HealthCheck()
94
101 require.Equal(t, bmhc.Count, len(bannerProjectIDs.ids))
102 return bmhc
103 }
104
105 go func() {
106 err := rm.Run(doneCtx)
107 runErrCh <- err
108 }()
109 return ctx
110 }).
111 Test("health check", func(ctx f2.Context, t *testing.T) f2.Context {
112 for _, banner := range bannerProjectIDs.ids {
113 var topic = client.TopicInProject(cfg.TopicID, banner)
114 exists, err := topic.Exists(ctx)
115 require.NoError(t, err)
116 require.True(t, exists)
117
118 got.Add(1)
119 topic.Publish(ctx, &pubsub.Message{
120 Data: []byte(banner),
121
122 })
123 }
124
125 got.Wait()
126 <-time.After(time.Second)
127
128 var bmhc = getHealthCheck(t)
129 require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count)
130 require.Equal(t, len(bannerProjectIDs.ids), bmhc.HealthyCount)
131 require.Equal(t, 0, bmhc.UnhealthyCount)
132 for _, projectID := range bannerProjectIDs.ids {
133 _, found := bmhc.Receivers[projectID]
134 require.True(t, found)
135 }
136
137 for _, rhc := range bmhc.Receivers {
138 require.True(t, rhc.Healthy)
139 require.Equal(t, rhc.RestartCount, 0)
140
141 require.Equal(t, rhc.TotalMessagesProcessed, 1)
142 require.Equal(t, rhc.CurrentMessagesProcessed, 1)
143
144 require.Equal(t, rhc.TotalMessagesAcked, 1)
145 require.Equal(t, rhc.CurrentMessagesAcked, 1)
146
147 require.Equal(t, rhc.TotalMessagesNacked, 0)
148 require.Equal(t, rhc.CurrentMessagesNacked, 0)
149
150 require.False(t, rhc.TotalReceiveDuration == 0)
151 require.False(t, rhc.CurrentReceiveDuration == 0)
152 require.Equal(t, rhc.TotalReceiveDuration, rhc.CurrentReceiveDuration)
153 }
154
155
158 var broken = make(map[string]bool)
159 for i, projectID := range bannerProjectIDs.ids {
160 if i%2 == 0 {
161 continue
162 }
163 broken[projectID] = true
164 var topic = client.TopicInProject(cfg.TopicID, projectID)
165 exists, err := topic.Exists(ctx)
166 require.NoError(t, err)
167 require.True(t, exists)
168
169 err = client.SubscriptionInProject(cfg.SubscriptionID, projectID).Delete(ctx)
170 require.NoError(t, err)
171 }
172
173
174 bmhc = getHealthCheck(t)
175 for bmhc.UnhealthyCount != len(broken) {
176 <-time.After(500 * time.Millisecond)
177 bmhc = getHealthCheck(t)
178 }
179
180 require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count)
181 require.Equal(t, len(bannerProjectIDs.ids)-len(broken), bmhc.HealthyCount)
182 require.Equal(t, len(broken), bmhc.UnhealthyCount)
183 for _, projectID := range bannerProjectIDs.ids {
184 _, found := bmhc.Receivers[projectID]
185 require.True(t, found)
186 }
187
188 for projectID, rhc := range bmhc.Receivers {
189 require.Equal(t, rhc.TotalMessagesProcessed, 1)
190 require.Equal(t, rhc.TotalMessagesAcked, 1)
191 require.Equal(t, rhc.TotalMessagesNacked, 0)
192 require.Equal(t, rhc.CurrentMessagesNacked, 0)
193
194 if !broken[projectID] {
195 require.True(t, rhc.Healthy)
196 require.Equal(t, rhc.RestartCount, 0)
197 require.Equal(t, rhc.CurrentMessagesProcessed, 1)
198 require.Equal(t, rhc.CurrentMessagesAcked, 1)
199 require.True(t, rhc.TotalReceiveDuration == rhc.CurrentReceiveDuration)
200 require.True(t, rhc.CurrentReceiveDuration != 0)
201 continue
202 }
203 require.False(t, rhc.Healthy)
204 require.Equal(t, rhc.RestartCount, 1)
205 require.Equal(t, rhc.CurrentMessagesProcessed, 0)
206 require.Equal(t, rhc.CurrentMessagesAcked, 0)
207 require.True(t, rhc.TotalReceiveDuration != rhc.CurrentReceiveDuration)
208 require.True(t, rhc.CurrentReceiveDuration == 0)
209 }
210
211
214
215 for projectID := range broken {
216 ps := pstest.FromContextT(ctx, t)
217 client2, err := pubsub.NewClient(ctx, projectID, option.WithGRPCConn(ps.Conn))
218 require.NoError(t, err)
219
220 var topic = client2.Topic(cfg.TopicID)
221 exists, err := topic.Exists(ctx)
222 require.NoError(t, err)
223 require.True(t, exists)
224
225 sub, err := client2.CreateSubscription(ctx, cfg.SubscriptionID, pubsub.SubscriptionConfig{
226 Topic: topic,
227 })
228 require.NoError(t, err)
229 exists, err = sub.Exists(ctx)
230 require.NoError(t, err)
231 require.True(t, exists)
232 }
233
234
235 bmhc = getHealthCheck(t)
236 for bmhc.UnhealthyCount != 0 {
237 <-time.After(500 * time.Millisecond)
238 bmhc = getHealthCheck(t)
239 }
240
241 require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count)
242 require.Equal(t, len(bannerProjectIDs.ids), bmhc.HealthyCount)
243 require.Equal(t, 0, bmhc.UnhealthyCount)
244 for _, projectID := range bannerProjectIDs.ids {
245 _, found := bmhc.Receivers[projectID]
246 require.True(t, found)
247 }
248
249
252 for _, banner := range bannerProjectIDs.ids {
253 var topic = client.TopicInProject(cfg.TopicID, banner)
254 exists, err := topic.Exists(ctx)
255 require.NoError(t, err)
256 require.True(t, exists)
257
258 got.Add(1)
259 topic.Publish(ctx, &pubsub.Message{
260 Data: []byte(banner),
261 })
262
263 got.Add(1)
264 topic.Publish(ctx, &pubsub.Message{
265 Data: []byte(banner),
266 })
267 }
268
269 got.Wait()
270 <-time.After(time.Second)
271
272 bmhc = getHealthCheck(t)
273 require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count)
274 require.Equal(t, len(bannerProjectIDs.ids), bmhc.HealthyCount)
275 require.Equal(t, 0, bmhc.UnhealthyCount)
276 for _, projectID := range bannerProjectIDs.ids {
277 _, found := bmhc.Receivers[projectID]
278 require.True(t, found)
279 }
280
281 for projectID, rhc := range bmhc.Receivers {
282 require.True(t, rhc.Healthy)
283 require.Equal(t, rhc.TotalMessagesProcessed, 3)
284 require.Equal(t, rhc.TotalMessagesAcked, 3)
285 require.Equal(t, rhc.TotalMessagesNacked, 0)
286 require.Equal(t, rhc.CurrentMessagesNacked, 0)
287 require.True(t, rhc.CurrentReceiveDuration != 0)
288
289 if broken[projectID] {
290
291 require.Equal(t, rhc.RestartCount, 1)
292 require.Equal(t, rhc.CurrentMessagesProcessed, 2)
293 require.Equal(t, rhc.CurrentMessagesAcked, 2)
294 require.True(t, rhc.TotalReceiveDuration != rhc.CurrentReceiveDuration)
295 } else {
296 require.Equal(t, rhc.RestartCount, 0)
297 require.Equal(t, rhc.CurrentMessagesProcessed, 3)
298 require.Equal(t, rhc.CurrentMessagesAcked, 3)
299 require.True(t, rhc.TotalReceiveDuration == rhc.CurrentReceiveDuration)
300 }
301 }
302
303 t.Logf("receiver mux is receiving from all subscriptions again and HealthCheck is correct")
304 return ctx
305 }).
306 Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
307 stopReceiverMux()
308 require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
309 return ctx
310 }).
311 Feature()
312
313 f2f.Test(t, feat)
314 }
315
View as plain text