1 package amqp
2
3 import (
4 "context"
5 "encoding/json"
6 "time"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/transport"
10 "github.com/go-kit/log"
11 amqp "github.com/rabbitmq/amqp091-go"
12 )
13
14
15 type Subscriber struct {
16 e endpoint.Endpoint
17 dec DecodeRequestFunc
18 enc EncodeResponseFunc
19 before []RequestFunc
20 after []SubscriberResponseFunc
21 responsePublisher ResponsePublisher
22 errorEncoder ErrorEncoder
23 errorHandler transport.ErrorHandler
24 }
25
26
27
28 func NewSubscriber(
29 e endpoint.Endpoint,
30 dec DecodeRequestFunc,
31 enc EncodeResponseFunc,
32 options ...SubscriberOption,
33 ) *Subscriber {
34 s := &Subscriber{
35 e: e,
36 dec: dec,
37 enc: enc,
38 responsePublisher: DefaultResponsePublisher,
39 errorEncoder: DefaultErrorEncoder,
40 errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
41 }
42 for _, option := range options {
43 option(s)
44 }
45 return s
46 }
47
48
49 type SubscriberOption func(*Subscriber)
50
51
52
53 func SubscriberBefore(before ...RequestFunc) SubscriberOption {
54 return func(s *Subscriber) { s.before = append(s.before, before...) }
55 }
56
57
58
59 func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption {
60 return func(s *Subscriber) { s.after = append(s.after, after...) }
61 }
62
63
64
65
66 func SubscriberResponsePublisher(rp ResponsePublisher) SubscriberOption {
67 return func(s *Subscriber) { s.responsePublisher = rp }
68 }
69
70
71
72
73
74 func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption {
75 return func(s *Subscriber) { s.errorEncoder = ee }
76 }
77
78
79
80
81
82
83 func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
84 return func(s *Subscriber) { s.errorHandler = transport.NewLogErrorHandler(logger) }
85 }
86
87
88
89
90
91 func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption {
92 return func(s *Subscriber) { s.errorHandler = errorHandler }
93 }
94
95
96
97
98 func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery) {
99 return func(deliv *amqp.Delivery) {
100 ctx, cancel := context.WithCancel(context.Background())
101 defer cancel()
102
103 pub := amqp.Publishing{}
104
105 for _, f := range s.before {
106 ctx = f(ctx, &pub, deliv)
107 }
108
109 request, err := s.dec(ctx, deliv)
110 if err != nil {
111 s.errorHandler.Handle(ctx, err)
112 s.errorEncoder(ctx, err, deliv, ch, &pub)
113 return
114 }
115
116 response, err := s.e(ctx, request)
117 if err != nil {
118 s.errorHandler.Handle(ctx, err)
119 s.errorEncoder(ctx, err, deliv, ch, &pub)
120 return
121 }
122
123 for _, f := range s.after {
124 ctx = f(ctx, deliv, ch, &pub)
125 }
126
127 if err := s.enc(ctx, &pub, response); err != nil {
128 s.errorHandler.Handle(ctx, err)
129 s.errorEncoder(ctx, err, deliv, ch, &pub)
130 return
131 }
132
133 if err := s.responsePublisher(ctx, deliv, ch, &pub); err != nil {
134 s.errorHandler.Handle(ctx, err)
135 s.errorEncoder(ctx, err, deliv, ch, &pub)
136 return
137 }
138 }
139
140 }
141
142
143
144 func EncodeJSONResponse(
145 ctx context.Context,
146 pub *amqp.Publishing,
147 response interface{},
148 ) error {
149 b, err := json.Marshal(response)
150 if err != nil {
151 return err
152 }
153 pub.Body = b
154 return nil
155 }
156
157
158 func EncodeNopResponse(
159 ctx context.Context,
160 pub *amqp.Publishing,
161 response interface{},
162 ) error {
163 return nil
164 }
165
166
167
168
169
170
171 type ResponsePublisher func(
172 context.Context,
173 *amqp.Delivery,
174 Channel,
175 *amqp.Publishing,
176 ) error
177
178
179
180 func DefaultResponsePublisher(
181 ctx context.Context,
182 deliv *amqp.Delivery,
183 ch Channel,
184 pub *amqp.Publishing,
185 ) error {
186 if pub.CorrelationId == "" {
187 pub.CorrelationId = deliv.CorrelationId
188 }
189
190 replyExchange := getPublishExchange(ctx)
191 replyTo := getPublishKey(ctx)
192 if replyTo == "" {
193 replyTo = deliv.ReplyTo
194 }
195
196 return ch.Publish(
197 replyExchange,
198 replyTo,
199 false,
200 false,
201 *pub,
202 )
203 }
204
205
206
207
208 func NopResponsePublisher(
209 ctx context.Context,
210 deliv *amqp.Delivery,
211 ch Channel,
212 pub *amqp.Publishing,
213 ) error {
214 return nil
215 }
216
217
218
219
220
221 type ErrorEncoder func(ctx context.Context,
222 err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)
223
224
225
226 func DefaultErrorEncoder(ctx context.Context,
227 err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
228 }
229
230
231
232 func SingleNackRequeueErrorEncoder(ctx context.Context,
233 err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
234 deliv.Nack(
235 false,
236 true,
237 )
238 duration := getNackSleepDuration(ctx)
239 time.Sleep(duration)
240 }
241
242
243
244 func ReplyErrorEncoder(
245 ctx context.Context,
246 err error,
247 deliv *amqp.Delivery,
248 ch Channel,
249 pub *amqp.Publishing,
250 ) {
251
252 if pub.CorrelationId == "" {
253 pub.CorrelationId = deliv.CorrelationId
254 }
255
256 replyExchange := getPublishExchange(ctx)
257 replyTo := getPublishKey(ctx)
258 if replyTo == "" {
259 replyTo = deliv.ReplyTo
260 }
261
262 response := DefaultErrorResponse{err.Error()}
263
264 b, err := json.Marshal(response)
265 if err != nil {
266 return
267 }
268 pub.Body = b
269
270 ch.Publish(
271 replyExchange,
272 replyTo,
273 false,
274 false,
275 *pub,
276 )
277 }
278
279
280
281
282 func ReplyAndAckErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
283 ReplyErrorEncoder(ctx, err, deliv, ch, pub)
284 deliv.Ack(false)
285 }
286
287
288
289 type DefaultErrorResponse struct {
290 Error string `json:"err"`
291 }
292
293
294
295 type Channel interface {
296 Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
297 Consume(queue, consumer string, autoAck, exclusive, noLocal, noWail bool, args amqp.Table) (<-chan amqp.Delivery, error)
298 }
299
View as plain text