1 package amqp
2
3 import (
4 "context"
5 "time"
6
7 "github.com/go-kit/kit/endpoint"
8 amqp "github.com/rabbitmq/amqp091-go"
9 )
10
11
12
13 const maxCorrelationIdLength = 255
14
15
16
17 type Publisher struct {
18 ch Channel
19 q *amqp.Queue
20 enc EncodeRequestFunc
21 dec DecodeResponseFunc
22 before []RequestFunc
23 after []PublisherResponseFunc
24 deliverer Deliverer
25 timeout time.Duration
26 }
27
28
29 func NewPublisher(
30 ch Channel,
31 q *amqp.Queue,
32 enc EncodeRequestFunc,
33 dec DecodeResponseFunc,
34 options ...PublisherOption,
35 ) *Publisher {
36 p := &Publisher{
37 ch: ch,
38 q: q,
39 enc: enc,
40 dec: dec,
41 deliverer: DefaultDeliverer,
42 timeout: 10 * time.Second,
43 }
44 for _, option := range options {
45 option(p)
46 }
47 return p
48 }
49
50
51 type PublisherOption func(*Publisher)
52
53
54
55 func PublisherBefore(before ...RequestFunc) PublisherOption {
56 return func(p *Publisher) { p.before = append(p.before, before...) }
57 }
58
59
60
61
62 func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
63 return func(p *Publisher) { p.after = append(p.after, after...) }
64 }
65
66
67 func PublisherDeliverer(deliverer Deliverer) PublisherOption {
68 return func(p *Publisher) { p.deliverer = deliverer }
69 }
70
71
72 func PublisherTimeout(timeout time.Duration) PublisherOption {
73 return func(p *Publisher) { p.timeout = timeout }
74 }
75
76
77 func (p Publisher) Endpoint() endpoint.Endpoint {
78 return func(ctx context.Context, request interface{}) (interface{}, error) {
79 ctx, cancel := context.WithTimeout(ctx, p.timeout)
80 defer cancel()
81
82 pub := amqp.Publishing{
83 ReplyTo: p.q.Name,
84 CorrelationId: randomString(randInt(5, maxCorrelationIdLength)),
85 }
86
87 if err := p.enc(ctx, &pub, request); err != nil {
88 return nil, err
89 }
90
91 for _, f := range p.before {
92
93 ctx = f(ctx, &pub, nil)
94 }
95
96 deliv, err := p.deliverer(ctx, p, &pub)
97 if err != nil {
98 return nil, err
99 }
100
101 for _, f := range p.after {
102 ctx = f(ctx, deliv)
103 }
104 response, err := p.dec(ctx, deliv)
105 if err != nil {
106 return nil, err
107 }
108
109 return response, nil
110 }
111 }
112
113
114
115 type Deliverer func(
116 context.Context,
117 Publisher,
118 *amqp.Publishing,
119 ) (*amqp.Delivery, error)
120
121
122
123
124 func DefaultDeliverer(
125 ctx context.Context,
126 p Publisher,
127 pub *amqp.Publishing,
128 ) (*amqp.Delivery, error) {
129 err := p.ch.Publish(
130 getPublishExchange(ctx),
131 getPublishKey(ctx),
132 false,
133 false,
134 *pub,
135 )
136 if err != nil {
137 return nil, err
138 }
139 autoAck := getConsumeAutoAck(ctx)
140
141 msg, err := p.ch.Consume(
142 p.q.Name,
143 "",
144 autoAck,
145 false,
146 false,
147 false,
148 getConsumeArgs(ctx),
149 )
150 if err != nil {
151 return nil, err
152 }
153
154 for {
155 select {
156 case d := <-msg:
157 if d.CorrelationId == pub.CorrelationId {
158 if !autoAck {
159 d.Ack(false)
160 }
161 return &d, nil
162 }
163
164 case <-ctx.Done():
165 return nil, ctx.Err()
166 }
167 }
168
169 }
170
171
172
173
174
175 func SendAndForgetDeliverer(
176 ctx context.Context,
177 p Publisher,
178 pub *amqp.Publishing,
179 ) (*amqp.Delivery, error) {
180 err := p.ch.Publish(
181 getPublishExchange(ctx),
182 getPublishKey(ctx),
183 false,
184 false,
185 *pub,
186 )
187 return nil, err
188 }
189
View as plain text