1 package nats_test
2
3 import (
4 "context"
5 "encoding/json"
6 "errors"
7 "strings"
8 "sync"
9 "testing"
10 "time"
11
12 "github.com/nats-io/nats-server/v2/server"
13 "github.com/nats-io/nats.go"
14
15 "github.com/go-kit/kit/endpoint"
16 natstransport "github.com/go-kit/kit/transport/nats"
17 )
18
19 type TestResponse struct {
20 String string `json:"str"`
21 Error string `json:"err"`
22 }
23
24 func newNATSConn(t *testing.T) (*server.Server, *nats.Conn) {
25 s, err := server.NewServer(&server.Options{
26 Host: "localhost",
27 Port: 0,
28 })
29 if err != nil {
30 t.Fatal(err)
31 }
32
33 go s.Start()
34
35 for i := 0; i < 5 && !s.Running(); i++ {
36 t.Logf("Running %v", s.Running())
37 time.Sleep(time.Second)
38 }
39 if !s.Running() {
40 s.Shutdown()
41 s.WaitForShutdown()
42 t.Fatal("not yet running")
43 }
44
45 if ok := s.ReadyForConnections(5 * time.Second); !ok {
46 t.Fatal("not ready for connections")
47 }
48
49 c, err := nats.Connect("nats://"+s.Addr().String(), nats.Name(t.Name()))
50 if err != nil {
51 t.Fatalf("failed to connect to NATS server: %s", err)
52 }
53
54 return s, c
55 }
56
57 func TestSubscriberBadDecode(t *testing.T) {
58 s, c := newNATSConn(t)
59 defer func() { s.Shutdown(); s.WaitForShutdown() }()
60 defer c.Close()
61
62 handler := natstransport.NewSubscriber(
63 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
64 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, errors.New("dang") },
65 func(context.Context, string, *nats.Conn, interface{}) error { return nil },
66 )
67
68 resp := testRequest(t, c, handler)
69
70 if want, have := "dang", resp.Error; want != have {
71 t.Errorf("want %s, have %s", want, have)
72 }
73
74 }
75
76 func TestSubscriberBadEndpoint(t *testing.T) {
77 s, c := newNATSConn(t)
78 defer func() { s.Shutdown(); s.WaitForShutdown() }()
79 defer c.Close()
80
81 handler := natstransport.NewSubscriber(
82 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New("dang") },
83 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
84 func(context.Context, string, *nats.Conn, interface{}) error { return nil },
85 )
86
87 resp := testRequest(t, c, handler)
88
89 if want, have := "dang", resp.Error; want != have {
90 t.Errorf("want %s, have %s", want, have)
91 }
92 }
93
94 func TestSubscriberBadEncode(t *testing.T) {
95 s, c := newNATSConn(t)
96 defer func() { s.Shutdown(); s.WaitForShutdown() }()
97 defer c.Close()
98
99 handler := natstransport.NewSubscriber(
100 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
101 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
102 func(context.Context, string, *nats.Conn, interface{}) error { return errors.New("dang") },
103 )
104
105 resp := testRequest(t, c, handler)
106
107 if want, have := "dang", resp.Error; want != have {
108 t.Errorf("want %s, have %s", want, have)
109 }
110 }
111
112 func TestSubscriberErrorEncoder(t *testing.T) {
113 s, c := newNATSConn(t)
114 defer func() { s.Shutdown(); s.WaitForShutdown() }()
115 defer c.Close()
116
117 errTeapot := errors.New("teapot")
118 code := func(err error) error {
119 if errors.Is(err, errTeapot) {
120 return err
121 }
122 return errors.New("dang")
123 }
124 handler := natstransport.NewSubscriber(
125 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errTeapot },
126 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
127 func(context.Context, string, *nats.Conn, interface{}) error { return nil },
128 natstransport.SubscriberErrorEncoder(func(_ context.Context, err error, reply string, nc *nats.Conn) {
129 var r TestResponse
130 r.Error = code(err).Error()
131
132 b, err := json.Marshal(r)
133 if err != nil {
134 t.Fatal(err)
135 }
136
137 if err := c.Publish(reply, b); err != nil {
138 t.Fatal(err)
139 }
140 }),
141 )
142
143 resp := testRequest(t, c, handler)
144
145 if want, have := errTeapot.Error(), resp.Error; want != have {
146 t.Errorf("want %s, have %s", want, have)
147 }
148 }
149
150 func TestSubscriberHappySubject(t *testing.T) {
151 step, response := testSubscriber(t)
152 step()
153 r := <-response
154
155 var resp TestResponse
156 err := json.Unmarshal(r.Data, &resp)
157 if err != nil {
158 t.Fatal(err)
159 }
160
161 if want, have := "", resp.Error; want != have {
162 t.Errorf("want %s, have %s (%s)", want, have, r.Data)
163 }
164 }
165
166 func TestMultipleSubscriberBefore(t *testing.T) {
167 s, c := newNATSConn(t)
168 defer func() { s.Shutdown(); s.WaitForShutdown() }()
169 defer c.Close()
170
171 var (
172 response = struct{ Body string }{"go eat a fly ugly\n"}
173 wg sync.WaitGroup
174 done = make(chan struct{})
175 )
176 handler := natstransport.NewSubscriber(
177 endpoint.Nop,
178 func(context.Context, *nats.Msg) (interface{}, error) {
179 return struct{}{}, nil
180 },
181 func(_ context.Context, reply string, nc *nats.Conn, _ interface{}) error {
182 b, err := json.Marshal(response)
183 if err != nil {
184 return err
185 }
186
187 return c.Publish(reply, b)
188 },
189 natstransport.SubscriberBefore(func(ctx context.Context, _ *nats.Msg) context.Context {
190 ctx = context.WithValue(ctx, "one", 1)
191
192 return ctx
193 }),
194 natstransport.SubscriberBefore(func(ctx context.Context, _ *nats.Msg) context.Context {
195 if _, ok := ctx.Value("one").(int); !ok {
196 t.Error("Value was not set properly when multiple ServerBefores are used")
197 }
198
199 close(done)
200 return ctx
201 }),
202 )
203
204 sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
205 if err != nil {
206 t.Fatal(err)
207 }
208 defer sub.Unsubscribe()
209
210 wg.Add(1)
211 go func() {
212 defer wg.Done()
213 _, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
214 if err != nil {
215 t.Fatal(err)
216 }
217 }()
218
219 select {
220 case <-done:
221 case <-time.After(time.Second):
222 t.Fatal("timeout waiting for finalizer")
223 }
224
225 wg.Wait()
226 }
227
228 func TestMultipleSubscriberAfter(t *testing.T) {
229 s, c := newNATSConn(t)
230 defer func() { s.Shutdown(); s.WaitForShutdown() }()
231 defer c.Close()
232
233 var (
234 response = struct{ Body string }{"go eat a fly ugly\n"}
235 wg sync.WaitGroup
236 done = make(chan struct{})
237 )
238 handler := natstransport.NewSubscriber(
239 endpoint.Nop,
240 func(context.Context, *nats.Msg) (interface{}, error) {
241 return struct{}{}, nil
242 },
243 func(_ context.Context, reply string, nc *nats.Conn, _ interface{}) error {
244 b, err := json.Marshal(response)
245 if err != nil {
246 return err
247 }
248 return c.Publish(reply, b)
249 },
250 natstransport.SubscriberAfter(func(ctx context.Context, nc *nats.Conn) context.Context {
251 return context.WithValue(ctx, "one", 1)
252 }),
253 natstransport.SubscriberAfter(func(ctx context.Context, nc *nats.Conn) context.Context {
254 if _, ok := ctx.Value("one").(int); !ok {
255 t.Error("Value was not set properly when multiple ServerAfters are used")
256 }
257 close(done)
258 return ctx
259 }),
260 )
261
262 sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
263 if err != nil {
264 t.Fatal(err)
265 }
266 defer sub.Unsubscribe()
267
268 wg.Add(1)
269 go func() {
270 defer wg.Done()
271 _, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
272 if err != nil {
273 t.Fatal(err)
274 }
275 }()
276
277 select {
278 case <-done:
279 case <-time.After(time.Second):
280 t.Fatal("timeout waiting for finalizer")
281 }
282
283 wg.Wait()
284 }
285
286 func TestSubscriberFinalizerFunc(t *testing.T) {
287 s, c := newNATSConn(t)
288 defer func() { s.Shutdown(); s.WaitForShutdown() }()
289 defer c.Close()
290
291 var (
292 response = struct{ Body string }{"go eat a fly ugly\n"}
293 wg sync.WaitGroup
294 done = make(chan struct{})
295 )
296 handler := natstransport.NewSubscriber(
297 endpoint.Nop,
298 func(context.Context, *nats.Msg) (interface{}, error) {
299 return struct{}{}, nil
300 },
301 func(_ context.Context, reply string, nc *nats.Conn, _ interface{}) error {
302 b, err := json.Marshal(response)
303 if err != nil {
304 return err
305 }
306
307 return c.Publish(reply, b)
308 },
309 natstransport.SubscriberFinalizer(func(ctx context.Context, _ *nats.Msg) {
310 close(done)
311 }),
312 )
313
314 sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
315 if err != nil {
316 t.Fatal(err)
317 }
318 defer sub.Unsubscribe()
319
320 wg.Add(1)
321 go func() {
322 defer wg.Done()
323 _, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
324 if err != nil {
325 t.Fatal(err)
326 }
327 }()
328
329 select {
330 case <-done:
331 case <-time.After(time.Second):
332 t.Fatal("timeout waiting for finalizer")
333 }
334
335 wg.Wait()
336 }
337
338 func TestEncodeJSONResponse(t *testing.T) {
339 s, c := newNATSConn(t)
340 defer func() { s.Shutdown(); s.WaitForShutdown() }()
341 defer c.Close()
342
343 handler := natstransport.NewSubscriber(
344 func(context.Context, interface{}) (interface{}, error) {
345 return struct {
346 Foo string `json:"foo"`
347 }{"bar"}, nil
348 },
349 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
350 natstransport.EncodeJSONResponse,
351 )
352
353 sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
354 if err != nil {
355 t.Fatal(err)
356 }
357 defer sub.Unsubscribe()
358
359 r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
360 if err != nil {
361 t.Fatal(err)
362 }
363
364 if want, have := `{"foo":"bar"}`, strings.TrimSpace(string(r.Data)); want != have {
365 t.Errorf("Body: want %s, have %s", want, have)
366 }
367 }
368
369 type responseError struct {
370 msg string
371 }
372
373 func (m responseError) Error() string {
374 return m.msg
375 }
376
377 func TestErrorEncoder(t *testing.T) {
378 s, c := newNATSConn(t)
379 defer func() { s.Shutdown(); s.WaitForShutdown() }()
380 defer c.Close()
381
382 errResp := struct {
383 Error string `json:"err"`
384 }{"oh no"}
385 handler := natstransport.NewSubscriber(
386 func(context.Context, interface{}) (interface{}, error) {
387 return nil, responseError{msg: errResp.Error}
388 },
389 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
390 natstransport.EncodeJSONResponse,
391 )
392
393 sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
394 if err != nil {
395 t.Fatal(err)
396 }
397 defer sub.Unsubscribe()
398
399 r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
400 if err != nil {
401 t.Fatal(err)
402 }
403
404 b, err := json.Marshal(errResp)
405 if err != nil {
406 t.Fatal(err)
407 }
408 if string(b) != string(r.Data) {
409 t.Errorf("ErrorEncoder: got: %q, expected: %q", r.Data, b)
410 }
411 }
412
413 type noContentResponse struct{}
414
415 func TestEncodeNoContent(t *testing.T) {
416 s, c := newNATSConn(t)
417 defer func() { s.Shutdown(); s.WaitForShutdown() }()
418 defer c.Close()
419
420 handler := natstransport.NewSubscriber(
421 func(context.Context, interface{}) (interface{}, error) { return noContentResponse{}, nil },
422 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
423 natstransport.EncodeJSONResponse,
424 )
425
426 sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
427 if err != nil {
428 t.Fatal(err)
429 }
430 defer sub.Unsubscribe()
431
432 r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
433 if err != nil {
434 t.Fatal(err)
435 }
436
437 if want, have := `{}`, strings.TrimSpace(string(r.Data)); want != have {
438 t.Errorf("Body: want %s, have %s", want, have)
439 }
440 }
441
442 func TestNoOpRequestDecoder(t *testing.T) {
443 s, c := newNATSConn(t)
444 defer func() { s.Shutdown(); s.WaitForShutdown() }()
445 defer c.Close()
446
447 handler := natstransport.NewSubscriber(
448 func(ctx context.Context, request interface{}) (interface{}, error) {
449 if request != nil {
450 t.Error("Expected nil request in endpoint when using NopRequestDecoder")
451 }
452 return nil, nil
453 },
454 natstransport.NopRequestDecoder,
455 natstransport.EncodeJSONResponse,
456 )
457
458 sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
459 if err != nil {
460 t.Fatal(err)
461 }
462 defer sub.Unsubscribe()
463
464 r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
465 if err != nil {
466 t.Fatal(err)
467 }
468
469 if want, have := `null`, strings.TrimSpace(string(r.Data)); want != have {
470 t.Errorf("Body: want %s, have %s", want, have)
471 }
472 }
473
474 func testSubscriber(t *testing.T) (step func(), resp <-chan *nats.Msg) {
475 var (
476 stepch = make(chan bool)
477 endpoint = func(context.Context, interface{}) (interface{}, error) {
478 <-stepch
479 return struct{}{}, nil
480 }
481 response = make(chan *nats.Msg)
482 handler = natstransport.NewSubscriber(
483 endpoint,
484 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
485 natstransport.EncodeJSONResponse,
486 natstransport.SubscriberBefore(func(ctx context.Context, msg *nats.Msg) context.Context { return ctx }),
487 natstransport.SubscriberAfter(func(ctx context.Context, nc *nats.Conn) context.Context { return ctx }),
488 )
489 )
490
491 go func() {
492 s, c := newNATSConn(t)
493 defer func() { s.Shutdown(); s.WaitForShutdown() }()
494 defer c.Close()
495
496 sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
497 if err != nil {
498 t.Fatal(err)
499 }
500 defer sub.Unsubscribe()
501
502 r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
503 if err != nil {
504 t.Fatal(err)
505 }
506
507 response <- r
508 }()
509
510 return func() { stepch <- true }, response
511 }
512
513 func testRequest(t *testing.T, c *nats.Conn, handler *natstransport.Subscriber) TestResponse {
514 sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
515 if err != nil {
516 t.Fatal(err)
517 }
518 defer sub.Unsubscribe()
519
520 r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
521 if err != nil {
522 t.Fatal(err)
523 }
524
525 var resp TestResponse
526 err = json.Unmarshal(r.Data, &resp)
527 if err != nil {
528 t.Fatal(err)
529 }
530
531 return resp
532 }
533
View as plain text