...
1 package amqp
2
3 import (
4 "context"
5 "time"
6
7 amqp "github.com/rabbitmq/amqp091-go"
8 )
9
10
11
12
13 type RequestFunc func(context.Context, *amqp.Publishing, *amqp.Delivery) context.Context
14
15
16
17
18 type SubscriberResponseFunc func(context.Context,
19 *amqp.Delivery,
20 Channel,
21 *amqp.Publishing,
22 ) context.Context
23
24
25
26
27 type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context
28
29
30
31 func SetPublishExchange(publishExchange string) RequestFunc {
32 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
33 return context.WithValue(ctx, ContextKeyExchange, publishExchange)
34 }
35 }
36
37
38
39 func SetPublishKey(publishKey string) RequestFunc {
40 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
41 return context.WithValue(ctx, ContextKeyPublishKey, publishKey)
42 }
43 }
44
45
46
47 func SetPublishDeliveryMode(dmode uint8) RequestFunc {
48 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
49 pub.DeliveryMode = dmode
50 return ctx
51 }
52 }
53
54
55
56
57
58
59 func SetNackSleepDuration(duration time.Duration) RequestFunc {
60 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
61 return context.WithValue(ctx, ContextKeyNackSleepDuration, duration)
62 }
63 }
64
65
66
67
68
69
70 func SetConsumeAutoAck(autoAck bool) RequestFunc {
71 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
72 return context.WithValue(ctx, ContextKeyAutoAck, autoAck)
73 }
74 }
75
76
77
78
79 func SetConsumeArgs(args amqp.Table) RequestFunc {
80 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
81 return context.WithValue(ctx, ContextKeyConsumeArgs, args)
82 }
83 }
84
85
86
87 func SetContentType(contentType string) RequestFunc {
88 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
89 pub.ContentType = contentType
90 return ctx
91 }
92 }
93
94
95
96 func SetContentEncoding(contentEncoding string) RequestFunc {
97 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
98 pub.ContentEncoding = contentEncoding
99 return ctx
100 }
101 }
102
103
104
105 func SetCorrelationID(cid string) RequestFunc {
106 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
107 pub.CorrelationId = cid
108 return ctx
109 }
110 }
111
112
113
114
115
116 func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc {
117 return func(ctx context.Context,
118 deliv *amqp.Delivery,
119 ch Channel,
120 pub *amqp.Publishing,
121 ) context.Context {
122 deliv.Ack(multiple)
123 return ctx
124 }
125 }
126
127 func getPublishExchange(ctx context.Context) string {
128 if exchange := ctx.Value(ContextKeyExchange); exchange != nil {
129 return exchange.(string)
130 }
131 return ""
132 }
133
134 func getPublishKey(ctx context.Context) string {
135 if publishKey := ctx.Value(ContextKeyPublishKey); publishKey != nil {
136 return publishKey.(string)
137 }
138 return ""
139 }
140
141 func getNackSleepDuration(ctx context.Context) time.Duration {
142 if duration := ctx.Value(ContextKeyNackSleepDuration); duration != nil {
143 return duration.(time.Duration)
144 }
145 return 0
146 }
147
148 func getConsumeAutoAck(ctx context.Context) bool {
149 if autoAck := ctx.Value(ContextKeyAutoAck); autoAck != nil {
150 return autoAck.(bool)
151 }
152 return false
153 }
154
155 func getConsumeArgs(ctx context.Context) amqp.Table {
156 if args := ctx.Value(ContextKeyConsumeArgs); args != nil {
157 return args.(amqp.Table)
158 }
159 return nil
160 }
161
162 type contextKey int
163
164 const (
165
166
167 ContextKeyExchange contextKey = iota
168
169
170 ContextKeyPublishKey
171
172
173
174
175 ContextKeyNackSleepDuration
176
177
178 ContextKeyAutoAck
179
180
181 ContextKeyConsumeArgs
182 )
183
View as plain text