1 package nats
2
3 import (
4 "context"
5 "encoding/json"
6
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/transport"
9 "github.com/go-kit/log"
10
11 "github.com/nats-io/nats.go"
12 )
13
14
15 type Subscriber struct {
16 e endpoint.Endpoint
17 dec DecodeRequestFunc
18 enc EncodeResponseFunc
19 before []RequestFunc
20 after []SubscriberResponseFunc
21 errorEncoder ErrorEncoder
22 finalizer []SubscriberFinalizerFunc
23 errorHandler transport.ErrorHandler
24 }
25
26
27
28 func NewSubscriber(
29 e endpoint.Endpoint,
30 dec DecodeRequestFunc,
31 enc EncodeResponseFunc,
32 options ...SubscriberOption,
33 ) *Subscriber {
34 s := &Subscriber{
35 e: e,
36 dec: dec,
37 enc: enc,
38 errorEncoder: DefaultErrorEncoder,
39 errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
40 }
41 for _, option := range options {
42 option(s)
43 }
44 return s
45 }
46
47
48 type SubscriberOption func(*Subscriber)
49
50
51
52 func SubscriberBefore(before ...RequestFunc) SubscriberOption {
53 return func(s *Subscriber) { s.before = append(s.before, before...) }
54 }
55
56
57
58 func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption {
59 return func(s *Subscriber) { s.after = append(s.after, after...) }
60 }
61
62
63
64
65
66 func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption {
67 return func(s *Subscriber) { s.errorEncoder = ee }
68 }
69
70
71
72
73
74
75 func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
76 return func(s *Subscriber) { s.errorHandler = transport.NewLogErrorHandler(logger) }
77 }
78
79
80
81
82
83 func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption {
84 return func(s *Subscriber) { s.errorHandler = errorHandler }
85 }
86
87
88
89 func SubscriberFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption {
90 return func(s *Subscriber) { s.finalizer = f }
91 }
92
93
94 func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg) {
95 return func(msg *nats.Msg) {
96 ctx, cancel := context.WithCancel(context.Background())
97 defer cancel()
98
99 if len(s.finalizer) > 0 {
100 defer func() {
101 for _, f := range s.finalizer {
102 f(ctx, msg)
103 }
104 }()
105 }
106
107 for _, f := range s.before {
108 ctx = f(ctx, msg)
109 }
110
111 request, err := s.dec(ctx, msg)
112 if err != nil {
113 s.errorHandler.Handle(ctx, err)
114 if msg.Reply == "" {
115 return
116 }
117 s.errorEncoder(ctx, err, msg.Reply, nc)
118 return
119 }
120
121 response, err := s.e(ctx, request)
122 if err != nil {
123 s.errorHandler.Handle(ctx, err)
124 if msg.Reply == "" {
125 return
126 }
127 s.errorEncoder(ctx, err, msg.Reply, nc)
128 return
129 }
130
131 for _, f := range s.after {
132 ctx = f(ctx, nc)
133 }
134
135 if msg.Reply == "" {
136 return
137 }
138
139 if err := s.enc(ctx, msg.Reply, nc, response); err != nil {
140 s.errorHandler.Handle(ctx, err)
141 s.errorEncoder(ctx, err, msg.Reply, nc)
142 return
143 }
144 }
145 }
146
147
148
149
150
151 type ErrorEncoder func(ctx context.Context, err error, reply string, nc *nats.Conn)
152
153
154
155
156 type SubscriberFinalizerFunc func(ctx context.Context, msg *nats.Msg)
157
158
159
160 func NopRequestDecoder(_ context.Context, _ *nats.Msg) (interface{}, error) {
161 return nil, nil
162 }
163
164
165
166
167 func EncodeJSONResponse(_ context.Context, reply string, nc *nats.Conn, response interface{}) error {
168 b, err := json.Marshal(response)
169 if err != nil {
170 return err
171 }
172
173 return nc.Publish(reply, b)
174 }
175
176
177 func DefaultErrorEncoder(_ context.Context, err error, reply string, nc *nats.Conn) {
178 logger := log.NewNopLogger()
179
180 type Response struct {
181 Error string `json:"err"`
182 }
183
184 var response Response
185
186 response.Error = err.Error()
187
188 b, err := json.Marshal(response)
189 if err != nil {
190 logger.Log("err", err)
191 return
192 }
193
194 if err := nc.Publish(reply, b); err != nil {
195 logger.Log("err", err)
196 }
197 }
198
View as plain text