1 package amqp_test
2
3 import (
4 "context"
5 "encoding/json"
6 "errors"
7 "testing"
8 "time"
9
10 amqptransport "github.com/go-kit/kit/transport/amqp"
11 amqp "github.com/rabbitmq/amqp091-go"
12 )
13
14 var (
15 defaultContentType = ""
16 defaultContentEncoding = ""
17 )
18
19
20 func TestBadEncode(t *testing.T) {
21 ch := &mockChannel{f: nullFunc}
22 q := &amqp.Queue{Name: "some queue"}
23 pub := amqptransport.NewPublisher(
24 ch,
25 q,
26 func(context.Context, *amqp.Publishing, interface{}) error { return errors.New("err!") },
27 func(context.Context, *amqp.Delivery) (response interface{}, err error) { return struct{}{}, nil },
28 )
29 errChan := make(chan error, 1)
30 var err error
31 go func() {
32 _, err := pub.Endpoint()(context.Background(), struct{}{})
33 errChan <- err
34
35 }()
36 select {
37 case err = <-errChan:
38 break
39
40 case <-time.After(100 * time.Millisecond):
41 t.Fatal("Timed out waiting for result")
42 }
43 if err == nil {
44 t.Error("expected error")
45 }
46 if want, have := "err!", err.Error(); want != have {
47 t.Errorf("want %s, have %s", want, have)
48 }
49 }
50
51
52 func TestBadDecode(t *testing.T) {
53 cid := "correlation"
54 ch := &mockChannel{
55 f: nullFunc,
56 c: make(chan amqp.Publishing, 1),
57 deliveries: []amqp.Delivery{
58 amqp.Delivery{
59 CorrelationId: cid,
60 },
61 },
62 }
63 q := &amqp.Queue{Name: "some queue"}
64
65 pub := amqptransport.NewPublisher(
66 ch,
67 q,
68 func(context.Context, *amqp.Publishing, interface{}) error { return nil },
69 func(context.Context, *amqp.Delivery) (response interface{}, err error) {
70 return struct{}{}, errors.New("err!")
71 },
72 amqptransport.PublisherBefore(
73 amqptransport.SetCorrelationID(cid),
74 ),
75 )
76
77 var err error
78 errChan := make(chan error, 1)
79 go func() {
80 _, err := pub.Endpoint()(context.Background(), struct{}{})
81 errChan <- err
82
83 }()
84
85 select {
86 case err = <-errChan:
87 break
88
89 case <-time.After(100 * time.Millisecond):
90 t.Fatal("Timed out waiting for result")
91 }
92
93 if err == nil {
94 t.Error("expected error")
95 }
96 if want, have := "err!", err.Error(); want != have {
97 t.Errorf("want %s, have %s", want, have)
98 }
99 }
100
101
102 func TestPublisherTimeout(t *testing.T) {
103 ch := &mockChannel{
104 f: nullFunc,
105 c: make(chan amqp.Publishing, 1),
106 deliveries: []amqp.Delivery{},
107 }
108 q := &amqp.Queue{Name: "some queue"}
109
110 pub := amqptransport.NewPublisher(
111 ch,
112 q,
113 func(context.Context, *amqp.Publishing, interface{}) error { return nil },
114 func(context.Context, *amqp.Delivery) (response interface{}, err error) {
115 return struct{}{}, nil
116 },
117 amqptransport.PublisherTimeout(50*time.Millisecond),
118 )
119
120 var err error
121 errChan := make(chan error, 1)
122 go func() {
123 _, err := pub.Endpoint()(context.Background(), struct{}{})
124 errChan <- err
125
126 }()
127
128 select {
129 case err = <-errChan:
130 break
131
132 case <-time.After(100 * time.Millisecond):
133 t.Fatal("timed out waiting for result")
134 }
135
136 if err == nil {
137 t.Error("expected error")
138 }
139 if want, have := context.DeadlineExceeded.Error(), err.Error(); want != have {
140 t.Errorf("want %s, have %s", want, have)
141 }
142 }
143
144 func TestSuccessfulPublisher(t *testing.T) {
145 cid := "correlation"
146 mockReq := testReq{437}
147 mockRes := testRes{
148 Squadron: mockReq.Squadron,
149 Name: names[mockReq.Squadron],
150 }
151 b, err := json.Marshal(mockRes)
152 if err != nil {
153 t.Fatal(err)
154 }
155 reqChan := make(chan amqp.Publishing, 1)
156 ch := &mockChannel{
157 f: nullFunc,
158 c: reqChan,
159 deliveries: []amqp.Delivery{
160 amqp.Delivery{
161 CorrelationId: cid,
162 Body: b,
163 },
164 },
165 }
166 q := &amqp.Queue{Name: "some queue"}
167
168 pub := amqptransport.NewPublisher(
169 ch,
170 q,
171 testReqEncoder,
172 testResDeliveryDecoder,
173 amqptransport.PublisherBefore(
174 amqptransport.SetCorrelationID(cid),
175 ),
176 )
177 var publishing amqp.Publishing
178 var res testRes
179 var ok bool
180 resChan := make(chan interface{}, 1)
181 errChan := make(chan error, 1)
182 go func() {
183 res, err := pub.Endpoint()(context.Background(), mockReq)
184 if err != nil {
185 errChan <- err
186 } else {
187 resChan <- res
188 }
189 }()
190
191 select {
192 case publishing = <-reqChan:
193 break
194
195 case <-time.After(100 * time.Millisecond):
196 t.Fatal("timed out waiting for request")
197 }
198 if want, have := defaultContentType, publishing.ContentType; want != have {
199 t.Errorf("want %s, have %s", want, have)
200 }
201 if want, have := defaultContentEncoding, publishing.ContentEncoding; want != have {
202 t.Errorf("want %s, have %s", want, have)
203 }
204
205 select {
206 case response := <-resChan:
207 res, ok = response.(testRes)
208 if !ok {
209 t.Error("failed to assert endpoint response type")
210 }
211 break
212
213 case err = <-errChan:
214 break
215
216 case <-time.After(100 * time.Millisecond):
217 t.Fatal("timed out waiting for result")
218 }
219
220 if err != nil {
221 t.Fatal(err)
222 }
223 if want, have := mockRes.Name, res.Name; want != have {
224 t.Errorf("want %s, have %s", want, have)
225 }
226 }
227
228
229 func TestSendAndForgetPublisher(t *testing.T) {
230 ch := &mockChannel{
231 f: nullFunc,
232 c: make(chan amqp.Publishing, 1),
233 deliveries: []amqp.Delivery{},
234 }
235 q := &amqp.Queue{Name: "some queue"}
236
237 pub := amqptransport.NewPublisher(
238 ch,
239 q,
240 func(context.Context, *amqp.Publishing, interface{}) error { return nil },
241 func(context.Context, *amqp.Delivery) (response interface{}, err error) {
242 return struct{}{}, nil
243 },
244 amqptransport.PublisherDeliverer(amqptransport.SendAndForgetDeliverer),
245 amqptransport.PublisherTimeout(50*time.Millisecond),
246 )
247
248 var err error
249 errChan := make(chan error, 1)
250 finishChan := make(chan bool, 1)
251 go func() {
252 _, err := pub.Endpoint()(context.Background(), struct{}{})
253 if err != nil {
254 errChan <- err
255 } else {
256 finishChan <- true
257 }
258
259 }()
260
261 select {
262 case <-finishChan:
263 break
264 case err = <-errChan:
265 t.Errorf("unexpected error %s", err)
266 case <-time.After(100 * time.Millisecond):
267 t.Fatal("timed out waiting for result")
268 }
269
270 }
271
View as plain text