...

Source file src/github.com/go-kit/kit/transport/amqp/subscriber.go

Documentation: github.com/go-kit/kit/transport/amqp

     1  package amqp
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"time"
     7  
     8  	"github.com/go-kit/kit/endpoint"
     9  	"github.com/go-kit/kit/transport"
    10  	"github.com/go-kit/log"
    11  	amqp "github.com/rabbitmq/amqp091-go"
    12  )
    13  
    14  // Subscriber wraps an endpoint and provides a handler for AMQP Delivery messages.
    15  type Subscriber struct {
    16  	e                 endpoint.Endpoint
    17  	dec               DecodeRequestFunc
    18  	enc               EncodeResponseFunc
    19  	before            []RequestFunc
    20  	after             []SubscriberResponseFunc
    21  	responsePublisher ResponsePublisher
    22  	errorEncoder      ErrorEncoder
    23  	errorHandler      transport.ErrorHandler
    24  }
    25  
    26  // NewSubscriber constructs a new subscriber, which provides a handler
    27  // for AMQP Delivery messages.
    28  func NewSubscriber(
    29  	e endpoint.Endpoint,
    30  	dec DecodeRequestFunc,
    31  	enc EncodeResponseFunc,
    32  	options ...SubscriberOption,
    33  ) *Subscriber {
    34  	s := &Subscriber{
    35  		e:                 e,
    36  		dec:               dec,
    37  		enc:               enc,
    38  		responsePublisher: DefaultResponsePublisher,
    39  		errorEncoder:      DefaultErrorEncoder,
    40  		errorHandler:      transport.NewLogErrorHandler(log.NewNopLogger()),
    41  	}
    42  	for _, option := range options {
    43  		option(s)
    44  	}
    45  	return s
    46  }
    47  
    48  // SubscriberOption sets an optional parameter for subscribers.
    49  type SubscriberOption func(*Subscriber)
    50  
    51  // SubscriberBefore functions are executed on the publisher delivery object
    52  // before the request is decoded.
    53  func SubscriberBefore(before ...RequestFunc) SubscriberOption {
    54  	return func(s *Subscriber) { s.before = append(s.before, before...) }
    55  }
    56  
    57  // SubscriberAfter functions are executed on the subscriber reply after the
    58  // endpoint is invoked, but before anything is published to the reply.
    59  func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption {
    60  	return func(s *Subscriber) { s.after = append(s.after, after...) }
    61  }
    62  
    63  // SubscriberResponsePublisher is used by the subscriber to deliver response
    64  // objects to the original sender.
    65  // By default, the DefaultResponsePublisher is used.
    66  func SubscriberResponsePublisher(rp ResponsePublisher) SubscriberOption {
    67  	return func(s *Subscriber) { s.responsePublisher = rp }
    68  }
    69  
    70  // SubscriberErrorEncoder is used to encode errors to the subscriber reply
    71  // whenever they're encountered in the processing of a request. Clients can
    72  // use this to provide custom error formatting. By default,
    73  // errors will be published with the DefaultErrorEncoder.
    74  func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption {
    75  	return func(s *Subscriber) { s.errorEncoder = ee }
    76  }
    77  
    78  // SubscriberErrorLogger is used to log non-terminal errors. By default, no errors
    79  // are logged. This is intended as a diagnostic measure. Finer-grained control
    80  // of error handling, including logging in more detail, should be performed in a
    81  // custom SubscriberErrorEncoder which has access to the context.
    82  // Deprecated: Use SubscriberErrorHandler instead.
    83  func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
    84  	return func(s *Subscriber) { s.errorHandler = transport.NewLogErrorHandler(logger) }
    85  }
    86  
    87  // SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors
    88  // are ignored. This is intended as a diagnostic measure. Finer-grained control
    89  // of error handling, including logging in more detail, should be performed in a
    90  // custom SubscriberErrorEncoder which has access to the context.
    91  func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption {
    92  	return func(s *Subscriber) { s.errorHandler = errorHandler }
    93  }
    94  
    95  // ServeDelivery handles AMQP Delivery messages
    96  // It is strongly recommended to use *amqp.Channel as the
    97  // Channel interface implementation.
    98  func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery) {
    99  	return func(deliv *amqp.Delivery) {
   100  		ctx, cancel := context.WithCancel(context.Background())
   101  		defer cancel()
   102  
   103  		pub := amqp.Publishing{}
   104  
   105  		for _, f := range s.before {
   106  			ctx = f(ctx, &pub, deliv)
   107  		}
   108  
   109  		request, err := s.dec(ctx, deliv)
   110  		if err != nil {
   111  			s.errorHandler.Handle(ctx, err)
   112  			s.errorEncoder(ctx, err, deliv, ch, &pub)
   113  			return
   114  		}
   115  
   116  		response, err := s.e(ctx, request)
   117  		if err != nil {
   118  			s.errorHandler.Handle(ctx, err)
   119  			s.errorEncoder(ctx, err, deliv, ch, &pub)
   120  			return
   121  		}
   122  
   123  		for _, f := range s.after {
   124  			ctx = f(ctx, deliv, ch, &pub)
   125  		}
   126  
   127  		if err := s.enc(ctx, &pub, response); err != nil {
   128  			s.errorHandler.Handle(ctx, err)
   129  			s.errorEncoder(ctx, err, deliv, ch, &pub)
   130  			return
   131  		}
   132  
   133  		if err := s.responsePublisher(ctx, deliv, ch, &pub); err != nil {
   134  			s.errorHandler.Handle(ctx, err)
   135  			s.errorEncoder(ctx, err, deliv, ch, &pub)
   136  			return
   137  		}
   138  	}
   139  
   140  }
   141  
   142  // EncodeJSONResponse marshals the response as JSON as part of the
   143  // payload of the AMQP Publishing object.
   144  func EncodeJSONResponse(
   145  	ctx context.Context,
   146  	pub *amqp.Publishing,
   147  	response interface{},
   148  ) error {
   149  	b, err := json.Marshal(response)
   150  	if err != nil {
   151  		return err
   152  	}
   153  	pub.Body = b
   154  	return nil
   155  }
   156  
   157  // EncodeNopResponse is a response function that does nothing.
   158  func EncodeNopResponse(
   159  	ctx context.Context,
   160  	pub *amqp.Publishing,
   161  	response interface{},
   162  ) error {
   163  	return nil
   164  }
   165  
   166  // ResponsePublisher functions are executed by the subscriber to
   167  // publish response object to the original sender.
   168  // Please note that the word "publisher" does not refer
   169  // to the publisher of pub/sub.
   170  // Rather, publisher is merely a function that publishes, or sends responses.
   171  type ResponsePublisher func(
   172  	context.Context,
   173  	*amqp.Delivery,
   174  	Channel,
   175  	*amqp.Publishing,
   176  ) error
   177  
   178  // DefaultResponsePublisher extracts the reply exchange and reply key
   179  // from the request, and sends the response object to that destination.
   180  func DefaultResponsePublisher(
   181  	ctx context.Context,
   182  	deliv *amqp.Delivery,
   183  	ch Channel,
   184  	pub *amqp.Publishing,
   185  ) error {
   186  	if pub.CorrelationId == "" {
   187  		pub.CorrelationId = deliv.CorrelationId
   188  	}
   189  
   190  	replyExchange := getPublishExchange(ctx)
   191  	replyTo := getPublishKey(ctx)
   192  	if replyTo == "" {
   193  		replyTo = deliv.ReplyTo
   194  	}
   195  
   196  	return ch.Publish(
   197  		replyExchange,
   198  		replyTo,
   199  		false, // mandatory
   200  		false, // immediate
   201  		*pub,
   202  	)
   203  }
   204  
   205  // NopResponsePublisher does not deliver a response to the original sender.
   206  // This response publisher is used when the user wants the subscriber to
   207  // receive and forget.
   208  func NopResponsePublisher(
   209  	ctx context.Context,
   210  	deliv *amqp.Delivery,
   211  	ch Channel,
   212  	pub *amqp.Publishing,
   213  ) error {
   214  	return nil
   215  }
   216  
   217  // ErrorEncoder is responsible for encoding an error to the subscriber reply.
   218  // Users are encouraged to use custom ErrorEncoders to encode errors to
   219  // their replies, and will likely want to pass and check for their own error
   220  // types.
   221  type ErrorEncoder func(ctx context.Context,
   222  	err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)
   223  
   224  // DefaultErrorEncoder simply ignores the message. It does not reply
   225  // nor Ack/Nack the message.
   226  func DefaultErrorEncoder(ctx context.Context,
   227  	err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
   228  }
   229  
   230  // SingleNackRequeueErrorEncoder issues a Nack to the delivery with multiple flag set as false
   231  // and requeue flag set as true. It does not reply the message.
   232  func SingleNackRequeueErrorEncoder(ctx context.Context,
   233  	err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
   234  	deliv.Nack(
   235  		false, //multiple
   236  		true,  //requeue
   237  	)
   238  	duration := getNackSleepDuration(ctx)
   239  	time.Sleep(duration)
   240  }
   241  
   242  // ReplyErrorEncoder serializes the error message as a DefaultErrorResponse
   243  // JSON and sends the message to the ReplyTo address.
   244  func ReplyErrorEncoder(
   245  	ctx context.Context,
   246  	err error,
   247  	deliv *amqp.Delivery,
   248  	ch Channel,
   249  	pub *amqp.Publishing,
   250  ) {
   251  
   252  	if pub.CorrelationId == "" {
   253  		pub.CorrelationId = deliv.CorrelationId
   254  	}
   255  
   256  	replyExchange := getPublishExchange(ctx)
   257  	replyTo := getPublishKey(ctx)
   258  	if replyTo == "" {
   259  		replyTo = deliv.ReplyTo
   260  	}
   261  
   262  	response := DefaultErrorResponse{err.Error()}
   263  
   264  	b, err := json.Marshal(response)
   265  	if err != nil {
   266  		return
   267  	}
   268  	pub.Body = b
   269  
   270  	ch.Publish(
   271  		replyExchange,
   272  		replyTo,
   273  		false, // mandatory
   274  		false, // immediate
   275  		*pub,
   276  	)
   277  }
   278  
   279  // ReplyAndAckErrorEncoder serializes the error message as a DefaultErrorResponse
   280  // JSON and sends the message to the ReplyTo address then Acks the original
   281  // message.
   282  func ReplyAndAckErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
   283  	ReplyErrorEncoder(ctx, err, deliv, ch, pub)
   284  	deliv.Ack(false)
   285  }
   286  
   287  // DefaultErrorResponse is the default structure of responses in the event
   288  // of an error.
   289  type DefaultErrorResponse struct {
   290  	Error string `json:"err"`
   291  }
   292  
   293  // Channel is a channel interface to make testing possible.
   294  // It is highly recommended to use *amqp.Channel as the interface implementation.
   295  type Channel interface {
   296  	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
   297  	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWail bool, args amqp.Table) (<-chan amqp.Delivery, error)
   298  }
   299  

View as plain text