...

Source file src/github.com/go-kit/kit/transport/nats/subscriber_test.go

Documentation: github.com/go-kit/kit/transport/nats

     1  package nats_test
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"errors"
     7  	"strings"
     8  	"sync"
     9  	"testing"
    10  	"time"
    11  
    12  	"github.com/nats-io/nats-server/v2/server"
    13  	"github.com/nats-io/nats.go"
    14  
    15  	"github.com/go-kit/kit/endpoint"
    16  	natstransport "github.com/go-kit/kit/transport/nats"
    17  )
    18  
    19  type TestResponse struct {
    20  	String string `json:"str"`
    21  	Error  string `json:"err"`
    22  }
    23  
    24  func newNATSConn(t *testing.T) (*server.Server, *nats.Conn) {
    25  	s, err := server.NewServer(&server.Options{
    26  		Host: "localhost",
    27  		Port: 0,
    28  	})
    29  	if err != nil {
    30  		t.Fatal(err)
    31  	}
    32  
    33  	go s.Start()
    34  
    35  	for i := 0; i < 5 && !s.Running(); i++ {
    36  		t.Logf("Running %v", s.Running())
    37  		time.Sleep(time.Second)
    38  	}
    39  	if !s.Running() {
    40  		s.Shutdown()
    41  		s.WaitForShutdown()
    42  		t.Fatal("not yet running")
    43  	}
    44  
    45  	if ok := s.ReadyForConnections(5 * time.Second); !ok {
    46  		t.Fatal("not ready for connections")
    47  	}
    48  
    49  	c, err := nats.Connect("nats://"+s.Addr().String(), nats.Name(t.Name()))
    50  	if err != nil {
    51  		t.Fatalf("failed to connect to NATS server: %s", err)
    52  	}
    53  
    54  	return s, c
    55  }
    56  
    57  func TestSubscriberBadDecode(t *testing.T) {
    58  	s, c := newNATSConn(t)
    59  	defer func() { s.Shutdown(); s.WaitForShutdown() }()
    60  	defer c.Close()
    61  
    62  	handler := natstransport.NewSubscriber(
    63  		func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
    64  		func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, errors.New("dang") },
    65  		func(context.Context, string, *nats.Conn, interface{}) error { return nil },
    66  	)
    67  
    68  	resp := testRequest(t, c, handler)
    69  
    70  	if want, have := "dang", resp.Error; want != have {
    71  		t.Errorf("want %s, have %s", want, have)
    72  	}
    73  
    74  }
    75  
    76  func TestSubscriberBadEndpoint(t *testing.T) {
    77  	s, c := newNATSConn(t)
    78  	defer func() { s.Shutdown(); s.WaitForShutdown() }()
    79  	defer c.Close()
    80  
    81  	handler := natstransport.NewSubscriber(
    82  		func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New("dang") },
    83  		func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
    84  		func(context.Context, string, *nats.Conn, interface{}) error { return nil },
    85  	)
    86  
    87  	resp := testRequest(t, c, handler)
    88  
    89  	if want, have := "dang", resp.Error; want != have {
    90  		t.Errorf("want %s, have %s", want, have)
    91  	}
    92  }
    93  
    94  func TestSubscriberBadEncode(t *testing.T) {
    95  	s, c := newNATSConn(t)
    96  	defer func() { s.Shutdown(); s.WaitForShutdown() }()
    97  	defer c.Close()
    98  
    99  	handler := natstransport.NewSubscriber(
   100  		func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
   101  		func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
   102  		func(context.Context, string, *nats.Conn, interface{}) error { return errors.New("dang") },
   103  	)
   104  
   105  	resp := testRequest(t, c, handler)
   106  
   107  	if want, have := "dang", resp.Error; want != have {
   108  		t.Errorf("want %s, have %s", want, have)
   109  	}
   110  }
   111  
   112  func TestSubscriberErrorEncoder(t *testing.T) {
   113  	s, c := newNATSConn(t)
   114  	defer func() { s.Shutdown(); s.WaitForShutdown() }()
   115  	defer c.Close()
   116  
   117  	errTeapot := errors.New("teapot")
   118  	code := func(err error) error {
   119  		if errors.Is(err, errTeapot) {
   120  			return err
   121  		}
   122  		return errors.New("dang")
   123  	}
   124  	handler := natstransport.NewSubscriber(
   125  		func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errTeapot },
   126  		func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
   127  		func(context.Context, string, *nats.Conn, interface{}) error { return nil },
   128  		natstransport.SubscriberErrorEncoder(func(_ context.Context, err error, reply string, nc *nats.Conn) {
   129  			var r TestResponse
   130  			r.Error = code(err).Error()
   131  
   132  			b, err := json.Marshal(r)
   133  			if err != nil {
   134  				t.Fatal(err)
   135  			}
   136  
   137  			if err := c.Publish(reply, b); err != nil {
   138  				t.Fatal(err)
   139  			}
   140  		}),
   141  	)
   142  
   143  	resp := testRequest(t, c, handler)
   144  
   145  	if want, have := errTeapot.Error(), resp.Error; want != have {
   146  		t.Errorf("want %s, have %s", want, have)
   147  	}
   148  }
   149  
   150  func TestSubscriberHappySubject(t *testing.T) {
   151  	step, response := testSubscriber(t)
   152  	step()
   153  	r := <-response
   154  
   155  	var resp TestResponse
   156  	err := json.Unmarshal(r.Data, &resp)
   157  	if err != nil {
   158  		t.Fatal(err)
   159  	}
   160  
   161  	if want, have := "", resp.Error; want != have {
   162  		t.Errorf("want %s, have %s (%s)", want, have, r.Data)
   163  	}
   164  }
   165  
   166  func TestMultipleSubscriberBefore(t *testing.T) {
   167  	s, c := newNATSConn(t)
   168  	defer func() { s.Shutdown(); s.WaitForShutdown() }()
   169  	defer c.Close()
   170  
   171  	var (
   172  		response = struct{ Body string }{"go eat a fly ugly\n"}
   173  		wg       sync.WaitGroup
   174  		done     = make(chan struct{})
   175  	)
   176  	handler := natstransport.NewSubscriber(
   177  		endpoint.Nop,
   178  		func(context.Context, *nats.Msg) (interface{}, error) {
   179  			return struct{}{}, nil
   180  		},
   181  		func(_ context.Context, reply string, nc *nats.Conn, _ interface{}) error {
   182  			b, err := json.Marshal(response)
   183  			if err != nil {
   184  				return err
   185  			}
   186  
   187  			return c.Publish(reply, b)
   188  		},
   189  		natstransport.SubscriberBefore(func(ctx context.Context, _ *nats.Msg) context.Context {
   190  			ctx = context.WithValue(ctx, "one", 1)
   191  
   192  			return ctx
   193  		}),
   194  		natstransport.SubscriberBefore(func(ctx context.Context, _ *nats.Msg) context.Context {
   195  			if _, ok := ctx.Value("one").(int); !ok {
   196  				t.Error("Value was not set properly when multiple ServerBefores are used")
   197  			}
   198  
   199  			close(done)
   200  			return ctx
   201  		}),
   202  	)
   203  
   204  	sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
   205  	if err != nil {
   206  		t.Fatal(err)
   207  	}
   208  	defer sub.Unsubscribe()
   209  
   210  	wg.Add(1)
   211  	go func() {
   212  		defer wg.Done()
   213  		_, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
   214  		if err != nil {
   215  			t.Fatal(err)
   216  		}
   217  	}()
   218  
   219  	select {
   220  	case <-done:
   221  	case <-time.After(time.Second):
   222  		t.Fatal("timeout waiting for finalizer")
   223  	}
   224  
   225  	wg.Wait()
   226  }
   227  
   228  func TestMultipleSubscriberAfter(t *testing.T) {
   229  	s, c := newNATSConn(t)
   230  	defer func() { s.Shutdown(); s.WaitForShutdown() }()
   231  	defer c.Close()
   232  
   233  	var (
   234  		response = struct{ Body string }{"go eat a fly ugly\n"}
   235  		wg       sync.WaitGroup
   236  		done     = make(chan struct{})
   237  	)
   238  	handler := natstransport.NewSubscriber(
   239  		endpoint.Nop,
   240  		func(context.Context, *nats.Msg) (interface{}, error) {
   241  			return struct{}{}, nil
   242  		},
   243  		func(_ context.Context, reply string, nc *nats.Conn, _ interface{}) error {
   244  			b, err := json.Marshal(response)
   245  			if err != nil {
   246  				return err
   247  			}
   248  			return c.Publish(reply, b)
   249  		},
   250  		natstransport.SubscriberAfter(func(ctx context.Context, nc *nats.Conn) context.Context {
   251  			return context.WithValue(ctx, "one", 1)
   252  		}),
   253  		natstransport.SubscriberAfter(func(ctx context.Context, nc *nats.Conn) context.Context {
   254  			if _, ok := ctx.Value("one").(int); !ok {
   255  				t.Error("Value was not set properly when multiple ServerAfters are used")
   256  			}
   257  			close(done)
   258  			return ctx
   259  		}),
   260  	)
   261  
   262  	sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
   263  	if err != nil {
   264  		t.Fatal(err)
   265  	}
   266  	defer sub.Unsubscribe()
   267  
   268  	wg.Add(1)
   269  	go func() {
   270  		defer wg.Done()
   271  		_, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
   272  		if err != nil {
   273  			t.Fatal(err)
   274  		}
   275  	}()
   276  
   277  	select {
   278  	case <-done:
   279  	case <-time.After(time.Second):
   280  		t.Fatal("timeout waiting for finalizer")
   281  	}
   282  
   283  	wg.Wait()
   284  }
   285  
   286  func TestSubscriberFinalizerFunc(t *testing.T) {
   287  	s, c := newNATSConn(t)
   288  	defer func() { s.Shutdown(); s.WaitForShutdown() }()
   289  	defer c.Close()
   290  
   291  	var (
   292  		response = struct{ Body string }{"go eat a fly ugly\n"}
   293  		wg       sync.WaitGroup
   294  		done     = make(chan struct{})
   295  	)
   296  	handler := natstransport.NewSubscriber(
   297  		endpoint.Nop,
   298  		func(context.Context, *nats.Msg) (interface{}, error) {
   299  			return struct{}{}, nil
   300  		},
   301  		func(_ context.Context, reply string, nc *nats.Conn, _ interface{}) error {
   302  			b, err := json.Marshal(response)
   303  			if err != nil {
   304  				return err
   305  			}
   306  
   307  			return c.Publish(reply, b)
   308  		},
   309  		natstransport.SubscriberFinalizer(func(ctx context.Context, _ *nats.Msg) {
   310  			close(done)
   311  		}),
   312  	)
   313  
   314  	sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
   315  	if err != nil {
   316  		t.Fatal(err)
   317  	}
   318  	defer sub.Unsubscribe()
   319  
   320  	wg.Add(1)
   321  	go func() {
   322  		defer wg.Done()
   323  		_, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
   324  		if err != nil {
   325  			t.Fatal(err)
   326  		}
   327  	}()
   328  
   329  	select {
   330  	case <-done:
   331  	case <-time.After(time.Second):
   332  		t.Fatal("timeout waiting for finalizer")
   333  	}
   334  
   335  	wg.Wait()
   336  }
   337  
   338  func TestEncodeJSONResponse(t *testing.T) {
   339  	s, c := newNATSConn(t)
   340  	defer func() { s.Shutdown(); s.WaitForShutdown() }()
   341  	defer c.Close()
   342  
   343  	handler := natstransport.NewSubscriber(
   344  		func(context.Context, interface{}) (interface{}, error) {
   345  			return struct {
   346  				Foo string `json:"foo"`
   347  			}{"bar"}, nil
   348  		},
   349  		func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
   350  		natstransport.EncodeJSONResponse,
   351  	)
   352  
   353  	sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
   354  	if err != nil {
   355  		t.Fatal(err)
   356  	}
   357  	defer sub.Unsubscribe()
   358  
   359  	r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
   360  	if err != nil {
   361  		t.Fatal(err)
   362  	}
   363  
   364  	if want, have := `{"foo":"bar"}`, strings.TrimSpace(string(r.Data)); want != have {
   365  		t.Errorf("Body: want %s, have %s", want, have)
   366  	}
   367  }
   368  
   369  type responseError struct {
   370  	msg string
   371  }
   372  
   373  func (m responseError) Error() string {
   374  	return m.msg
   375  }
   376  
   377  func TestErrorEncoder(t *testing.T) {
   378  	s, c := newNATSConn(t)
   379  	defer func() { s.Shutdown(); s.WaitForShutdown() }()
   380  	defer c.Close()
   381  
   382  	errResp := struct {
   383  		Error string `json:"err"`
   384  	}{"oh no"}
   385  	handler := natstransport.NewSubscriber(
   386  		func(context.Context, interface{}) (interface{}, error) {
   387  			return nil, responseError{msg: errResp.Error}
   388  		},
   389  		func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
   390  		natstransport.EncodeJSONResponse,
   391  	)
   392  
   393  	sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
   394  	if err != nil {
   395  		t.Fatal(err)
   396  	}
   397  	defer sub.Unsubscribe()
   398  
   399  	r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
   400  	if err != nil {
   401  		t.Fatal(err)
   402  	}
   403  
   404  	b, err := json.Marshal(errResp)
   405  	if err != nil {
   406  		t.Fatal(err)
   407  	}
   408  	if string(b) != string(r.Data) {
   409  		t.Errorf("ErrorEncoder: got: %q, expected: %q", r.Data, b)
   410  	}
   411  }
   412  
   413  type noContentResponse struct{}
   414  
   415  func TestEncodeNoContent(t *testing.T) {
   416  	s, c := newNATSConn(t)
   417  	defer func() { s.Shutdown(); s.WaitForShutdown() }()
   418  	defer c.Close()
   419  
   420  	handler := natstransport.NewSubscriber(
   421  		func(context.Context, interface{}) (interface{}, error) { return noContentResponse{}, nil },
   422  		func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
   423  		natstransport.EncodeJSONResponse,
   424  	)
   425  
   426  	sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
   427  	if err != nil {
   428  		t.Fatal(err)
   429  	}
   430  	defer sub.Unsubscribe()
   431  
   432  	r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
   433  	if err != nil {
   434  		t.Fatal(err)
   435  	}
   436  
   437  	if want, have := `{}`, strings.TrimSpace(string(r.Data)); want != have {
   438  		t.Errorf("Body: want %s, have %s", want, have)
   439  	}
   440  }
   441  
   442  func TestNoOpRequestDecoder(t *testing.T) {
   443  	s, c := newNATSConn(t)
   444  	defer func() { s.Shutdown(); s.WaitForShutdown() }()
   445  	defer c.Close()
   446  
   447  	handler := natstransport.NewSubscriber(
   448  		func(ctx context.Context, request interface{}) (interface{}, error) {
   449  			if request != nil {
   450  				t.Error("Expected nil request in endpoint when using NopRequestDecoder")
   451  			}
   452  			return nil, nil
   453  		},
   454  		natstransport.NopRequestDecoder,
   455  		natstransport.EncodeJSONResponse,
   456  	)
   457  
   458  	sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
   459  	if err != nil {
   460  		t.Fatal(err)
   461  	}
   462  	defer sub.Unsubscribe()
   463  
   464  	r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
   465  	if err != nil {
   466  		t.Fatal(err)
   467  	}
   468  
   469  	if want, have := `null`, strings.TrimSpace(string(r.Data)); want != have {
   470  		t.Errorf("Body: want %s, have %s", want, have)
   471  	}
   472  }
   473  
   474  func testSubscriber(t *testing.T) (step func(), resp <-chan *nats.Msg) {
   475  	var (
   476  		stepch   = make(chan bool)
   477  		endpoint = func(context.Context, interface{}) (interface{}, error) {
   478  			<-stepch
   479  			return struct{}{}, nil
   480  		}
   481  		response = make(chan *nats.Msg)
   482  		handler  = natstransport.NewSubscriber(
   483  			endpoint,
   484  			func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
   485  			natstransport.EncodeJSONResponse,
   486  			natstransport.SubscriberBefore(func(ctx context.Context, msg *nats.Msg) context.Context { return ctx }),
   487  			natstransport.SubscriberAfter(func(ctx context.Context, nc *nats.Conn) context.Context { return ctx }),
   488  		)
   489  	)
   490  
   491  	go func() {
   492  		s, c := newNATSConn(t)
   493  		defer func() { s.Shutdown(); s.WaitForShutdown() }()
   494  		defer c.Close()
   495  
   496  		sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
   497  		if err != nil {
   498  			t.Fatal(err)
   499  		}
   500  		defer sub.Unsubscribe()
   501  
   502  		r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
   503  		if err != nil {
   504  			t.Fatal(err)
   505  		}
   506  
   507  		response <- r
   508  	}()
   509  
   510  	return func() { stepch <- true }, response
   511  }
   512  
   513  func testRequest(t *testing.T, c *nats.Conn, handler *natstransport.Subscriber) TestResponse {
   514  	sub, err := c.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(c))
   515  	if err != nil {
   516  		t.Fatal(err)
   517  	}
   518  	defer sub.Unsubscribe()
   519  
   520  	r, err := c.Request("natstransport.test", []byte("test data"), 2*time.Second)
   521  	if err != nil {
   522  		t.Fatal(err)
   523  	}
   524  
   525  	var resp TestResponse
   526  	err = json.Unmarshal(r.Data, &resp)
   527  	if err != nil {
   528  		t.Fatal(err)
   529  	}
   530  
   531  	return resp
   532  }
   533  

View as plain text