...

Source file src/github.com/go-kit/kit/transport/nats/subscriber.go

Documentation: github.com/go-kit/kit/transport/nats

     1  package nats
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  
     7  	"github.com/go-kit/kit/endpoint"
     8  	"github.com/go-kit/kit/transport"
     9  	"github.com/go-kit/log"
    10  
    11  	"github.com/nats-io/nats.go"
    12  )
    13  
    14  // Subscriber wraps an endpoint and provides nats.MsgHandler.
    15  type Subscriber struct {
    16  	e            endpoint.Endpoint
    17  	dec          DecodeRequestFunc
    18  	enc          EncodeResponseFunc
    19  	before       []RequestFunc
    20  	after        []SubscriberResponseFunc
    21  	errorEncoder ErrorEncoder
    22  	finalizer    []SubscriberFinalizerFunc
    23  	errorHandler transport.ErrorHandler
    24  }
    25  
    26  // NewSubscriber constructs a new subscriber, which provides nats.MsgHandler and wraps
    27  // the provided endpoint.
    28  func NewSubscriber(
    29  	e endpoint.Endpoint,
    30  	dec DecodeRequestFunc,
    31  	enc EncodeResponseFunc,
    32  	options ...SubscriberOption,
    33  ) *Subscriber {
    34  	s := &Subscriber{
    35  		e:            e,
    36  		dec:          dec,
    37  		enc:          enc,
    38  		errorEncoder: DefaultErrorEncoder,
    39  		errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
    40  	}
    41  	for _, option := range options {
    42  		option(s)
    43  	}
    44  	return s
    45  }
    46  
    47  // SubscriberOption sets an optional parameter for subscribers.
    48  type SubscriberOption func(*Subscriber)
    49  
    50  // SubscriberBefore functions are executed on the publisher request object before the
    51  // request is decoded.
    52  func SubscriberBefore(before ...RequestFunc) SubscriberOption {
    53  	return func(s *Subscriber) { s.before = append(s.before, before...) }
    54  }
    55  
    56  // SubscriberAfter functions are executed on the subscriber reply after the
    57  // endpoint is invoked, but before anything is published to the reply.
    58  func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption {
    59  	return func(s *Subscriber) { s.after = append(s.after, after...) }
    60  }
    61  
    62  // SubscriberErrorEncoder is used to encode errors to the subscriber reply
    63  // whenever they're encountered in the processing of a request. Clients can
    64  // use this to provide custom error formatting. By default,
    65  // errors will be published with the DefaultErrorEncoder.
    66  func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption {
    67  	return func(s *Subscriber) { s.errorEncoder = ee }
    68  }
    69  
    70  // SubscriberErrorLogger is used to log non-terminal errors. By default, no errors
    71  // are logged. This is intended as a diagnostic measure. Finer-grained control
    72  // of error handling, including logging in more detail, should be performed in a
    73  // custom SubscriberErrorEncoder which has access to the context.
    74  // Deprecated: Use SubscriberErrorHandler instead.
    75  func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
    76  	return func(s *Subscriber) { s.errorHandler = transport.NewLogErrorHandler(logger) }
    77  }
    78  
    79  // SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors
    80  // are ignored. This is intended as a diagnostic measure. Finer-grained control
    81  // of error handling, including logging in more detail, should be performed in a
    82  // custom SubscriberErrorEncoder which has access to the context.
    83  func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption {
    84  	return func(s *Subscriber) { s.errorHandler = errorHandler }
    85  }
    86  
    87  // SubscriberFinalizer is executed at the end of every request from a publisher through NATS.
    88  // By default, no finalizer is registered.
    89  func SubscriberFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption {
    90  	return func(s *Subscriber) { s.finalizer = f }
    91  }
    92  
    93  // ServeMsg provides nats.MsgHandler.
    94  func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg) {
    95  	return func(msg *nats.Msg) {
    96  		ctx, cancel := context.WithCancel(context.Background())
    97  		defer cancel()
    98  
    99  		if len(s.finalizer) > 0 {
   100  			defer func() {
   101  				for _, f := range s.finalizer {
   102  					f(ctx, msg)
   103  				}
   104  			}()
   105  		}
   106  
   107  		for _, f := range s.before {
   108  			ctx = f(ctx, msg)
   109  		}
   110  
   111  		request, err := s.dec(ctx, msg)
   112  		if err != nil {
   113  			s.errorHandler.Handle(ctx, err)
   114  			if msg.Reply == "" {
   115  				return
   116  			}
   117  			s.errorEncoder(ctx, err, msg.Reply, nc)
   118  			return
   119  		}
   120  
   121  		response, err := s.e(ctx, request)
   122  		if err != nil {
   123  			s.errorHandler.Handle(ctx, err)
   124  			if msg.Reply == "" {
   125  				return
   126  			}
   127  			s.errorEncoder(ctx, err, msg.Reply, nc)
   128  			return
   129  		}
   130  
   131  		for _, f := range s.after {
   132  			ctx = f(ctx, nc)
   133  		}
   134  
   135  		if msg.Reply == "" {
   136  			return
   137  		}
   138  
   139  		if err := s.enc(ctx, msg.Reply, nc, response); err != nil {
   140  			s.errorHandler.Handle(ctx, err)
   141  			s.errorEncoder(ctx, err, msg.Reply, nc)
   142  			return
   143  		}
   144  	}
   145  }
   146  
   147  // ErrorEncoder is responsible for encoding an error to the subscriber reply.
   148  // Users are encouraged to use custom ErrorEncoders to encode errors to
   149  // their replies, and will likely want to pass and check for their own error
   150  // types.
   151  type ErrorEncoder func(ctx context.Context, err error, reply string, nc *nats.Conn)
   152  
   153  // SubscriberFinalizerFunc can be used to perform work at the end of an request
   154  // from a publisher, after the response has been written to the publisher. The principal
   155  // intended use is for request logging.
   156  type SubscriberFinalizerFunc func(ctx context.Context, msg *nats.Msg)
   157  
   158  // NopRequestDecoder is a DecodeRequestFunc that can be used for requests that do not
   159  // need to be decoded, and simply returns nil, nil.
   160  func NopRequestDecoder(_ context.Context, _ *nats.Msg) (interface{}, error) {
   161  	return nil, nil
   162  }
   163  
   164  // EncodeJSONResponse is a EncodeResponseFunc that serializes the response as a
   165  // JSON object to the subscriber reply. Many JSON-over services can use it as
   166  // a sensible default.
   167  func EncodeJSONResponse(_ context.Context, reply string, nc *nats.Conn, response interface{}) error {
   168  	b, err := json.Marshal(response)
   169  	if err != nil {
   170  		return err
   171  	}
   172  
   173  	return nc.Publish(reply, b)
   174  }
   175  
   176  // DefaultErrorEncoder writes the error to the subscriber reply.
   177  func DefaultErrorEncoder(_ context.Context, err error, reply string, nc *nats.Conn) {
   178  	logger := log.NewNopLogger()
   179  
   180  	type Response struct {
   181  		Error string `json:"err"`
   182  	}
   183  
   184  	var response Response
   185  
   186  	response.Error = err.Error()
   187  
   188  	b, err := json.Marshal(response)
   189  	if err != nil {
   190  		logger.Log("err", err)
   191  		return
   192  	}
   193  
   194  	if err := nc.Publish(reply, b); err != nil {
   195  		logger.Log("err", err)
   196  	}
   197  }
   198  

View as plain text