...

Source file src/github.com/go-kit/kit/transport/amqp/publisher_test.go

Documentation: github.com/go-kit/kit/transport/amqp

     1  package amqp_test
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"errors"
     7  	"testing"
     8  	"time"
     9  
    10  	amqptransport "github.com/go-kit/kit/transport/amqp"
    11  	amqp "github.com/rabbitmq/amqp091-go"
    12  )
    13  
    14  var (
    15  	defaultContentType     = ""
    16  	defaultContentEncoding = ""
    17  )
    18  
    19  // TestBadEncode tests if encode errors are handled properly.
    20  func TestBadEncode(t *testing.T) {
    21  	ch := &mockChannel{f: nullFunc}
    22  	q := &amqp.Queue{Name: "some queue"}
    23  	pub := amqptransport.NewPublisher(
    24  		ch,
    25  		q,
    26  		func(context.Context, *amqp.Publishing, interface{}) error { return errors.New("err!") },
    27  		func(context.Context, *amqp.Delivery) (response interface{}, err error) { return struct{}{}, nil },
    28  	)
    29  	errChan := make(chan error, 1)
    30  	var err error
    31  	go func() {
    32  		_, err := pub.Endpoint()(context.Background(), struct{}{})
    33  		errChan <- err
    34  
    35  	}()
    36  	select {
    37  	case err = <-errChan:
    38  		break
    39  
    40  	case <-time.After(100 * time.Millisecond):
    41  		t.Fatal("Timed out waiting for result")
    42  	}
    43  	if err == nil {
    44  		t.Error("expected error")
    45  	}
    46  	if want, have := "err!", err.Error(); want != have {
    47  		t.Errorf("want %s, have %s", want, have)
    48  	}
    49  }
    50  
    51  // TestBadDecode tests if decode errors are handled properly.
    52  func TestBadDecode(t *testing.T) {
    53  	cid := "correlation"
    54  	ch := &mockChannel{
    55  		f: nullFunc,
    56  		c: make(chan amqp.Publishing, 1),
    57  		deliveries: []amqp.Delivery{
    58  			amqp.Delivery{
    59  				CorrelationId: cid,
    60  			},
    61  		},
    62  	}
    63  	q := &amqp.Queue{Name: "some queue"}
    64  
    65  	pub := amqptransport.NewPublisher(
    66  		ch,
    67  		q,
    68  		func(context.Context, *amqp.Publishing, interface{}) error { return nil },
    69  		func(context.Context, *amqp.Delivery) (response interface{}, err error) {
    70  			return struct{}{}, errors.New("err!")
    71  		},
    72  		amqptransport.PublisherBefore(
    73  			amqptransport.SetCorrelationID(cid),
    74  		),
    75  	)
    76  
    77  	var err error
    78  	errChan := make(chan error, 1)
    79  	go func() {
    80  		_, err := pub.Endpoint()(context.Background(), struct{}{})
    81  		errChan <- err
    82  
    83  	}()
    84  
    85  	select {
    86  	case err = <-errChan:
    87  		break
    88  
    89  	case <-time.After(100 * time.Millisecond):
    90  		t.Fatal("Timed out waiting for result")
    91  	}
    92  
    93  	if err == nil {
    94  		t.Error("expected error")
    95  	}
    96  	if want, have := "err!", err.Error(); want != have {
    97  		t.Errorf("want %s, have %s", want, have)
    98  	}
    99  }
   100  
   101  // TestPublisherTimeout ensures that the publisher timeout mechanism works.
   102  func TestPublisherTimeout(t *testing.T) {
   103  	ch := &mockChannel{
   104  		f:          nullFunc,
   105  		c:          make(chan amqp.Publishing, 1),
   106  		deliveries: []amqp.Delivery{}, // no reply from mock subscriber
   107  	}
   108  	q := &amqp.Queue{Name: "some queue"}
   109  
   110  	pub := amqptransport.NewPublisher(
   111  		ch,
   112  		q,
   113  		func(context.Context, *amqp.Publishing, interface{}) error { return nil },
   114  		func(context.Context, *amqp.Delivery) (response interface{}, err error) {
   115  			return struct{}{}, nil
   116  		},
   117  		amqptransport.PublisherTimeout(50*time.Millisecond),
   118  	)
   119  
   120  	var err error
   121  	errChan := make(chan error, 1)
   122  	go func() {
   123  		_, err := pub.Endpoint()(context.Background(), struct{}{})
   124  		errChan <- err
   125  
   126  	}()
   127  
   128  	select {
   129  	case err = <-errChan:
   130  		break
   131  
   132  	case <-time.After(100 * time.Millisecond):
   133  		t.Fatal("timed out waiting for result")
   134  	}
   135  
   136  	if err == nil {
   137  		t.Error("expected error")
   138  	}
   139  	if want, have := context.DeadlineExceeded.Error(), err.Error(); want != have {
   140  		t.Errorf("want %s, have %s", want, have)
   141  	}
   142  }
   143  
   144  func TestSuccessfulPublisher(t *testing.T) {
   145  	cid := "correlation"
   146  	mockReq := testReq{437}
   147  	mockRes := testRes{
   148  		Squadron: mockReq.Squadron,
   149  		Name:     names[mockReq.Squadron],
   150  	}
   151  	b, err := json.Marshal(mockRes)
   152  	if err != nil {
   153  		t.Fatal(err)
   154  	}
   155  	reqChan := make(chan amqp.Publishing, 1)
   156  	ch := &mockChannel{
   157  		f: nullFunc,
   158  		c: reqChan,
   159  		deliveries: []amqp.Delivery{
   160  			amqp.Delivery{
   161  				CorrelationId: cid,
   162  				Body:          b,
   163  			},
   164  		},
   165  	}
   166  	q := &amqp.Queue{Name: "some queue"}
   167  
   168  	pub := amqptransport.NewPublisher(
   169  		ch,
   170  		q,
   171  		testReqEncoder,
   172  		testResDeliveryDecoder,
   173  		amqptransport.PublisherBefore(
   174  			amqptransport.SetCorrelationID(cid),
   175  		),
   176  	)
   177  	var publishing amqp.Publishing
   178  	var res testRes
   179  	var ok bool
   180  	resChan := make(chan interface{}, 1)
   181  	errChan := make(chan error, 1)
   182  	go func() {
   183  		res, err := pub.Endpoint()(context.Background(), mockReq)
   184  		if err != nil {
   185  			errChan <- err
   186  		} else {
   187  			resChan <- res
   188  		}
   189  	}()
   190  
   191  	select {
   192  	case publishing = <-reqChan:
   193  		break
   194  
   195  	case <-time.After(100 * time.Millisecond):
   196  		t.Fatal("timed out waiting for request")
   197  	}
   198  	if want, have := defaultContentType, publishing.ContentType; want != have {
   199  		t.Errorf("want %s, have %s", want, have)
   200  	}
   201  	if want, have := defaultContentEncoding, publishing.ContentEncoding; want != have {
   202  		t.Errorf("want %s, have %s", want, have)
   203  	}
   204  
   205  	select {
   206  	case response := <-resChan:
   207  		res, ok = response.(testRes)
   208  		if !ok {
   209  			t.Error("failed to assert endpoint response type")
   210  		}
   211  		break
   212  
   213  	case err = <-errChan:
   214  		break
   215  
   216  	case <-time.After(100 * time.Millisecond):
   217  		t.Fatal("timed out waiting for result")
   218  	}
   219  
   220  	if err != nil {
   221  		t.Fatal(err)
   222  	}
   223  	if want, have := mockRes.Name, res.Name; want != have {
   224  		t.Errorf("want %s, have %s", want, have)
   225  	}
   226  }
   227  
   228  // TestSendAndForgetPublisher tests that the SendAndForgetDeliverer is working
   229  func TestSendAndForgetPublisher(t *testing.T) {
   230  	ch := &mockChannel{
   231  		f:          nullFunc,
   232  		c:          make(chan amqp.Publishing, 1),
   233  		deliveries: []amqp.Delivery{}, // no reply from mock subscriber
   234  	}
   235  	q := &amqp.Queue{Name: "some queue"}
   236  
   237  	pub := amqptransport.NewPublisher(
   238  		ch,
   239  		q,
   240  		func(context.Context, *amqp.Publishing, interface{}) error { return nil },
   241  		func(context.Context, *amqp.Delivery) (response interface{}, err error) {
   242  			return struct{}{}, nil
   243  		},
   244  		amqptransport.PublisherDeliverer(amqptransport.SendAndForgetDeliverer),
   245  		amqptransport.PublisherTimeout(50*time.Millisecond),
   246  	)
   247  
   248  	var err error
   249  	errChan := make(chan error, 1)
   250  	finishChan := make(chan bool, 1)
   251  	go func() {
   252  		_, err := pub.Endpoint()(context.Background(), struct{}{})
   253  		if err != nil {
   254  			errChan <- err
   255  		} else {
   256  			finishChan <- true
   257  		}
   258  
   259  	}()
   260  
   261  	select {
   262  	case <-finishChan:
   263  		break
   264  	case err = <-errChan:
   265  		t.Errorf("unexpected error %s", err)
   266  	case <-time.After(100 * time.Millisecond):
   267  		t.Fatal("timed out waiting for result")
   268  	}
   269  
   270  }
   271  

View as plain text