...

Source file src/github.com/go-kit/kit/transport/amqp/publisher.go

Documentation: github.com/go-kit/kit/transport/amqp

     1  package amqp
     2  
     3  import (
     4  	"context"
     5  	"time"
     6  
     7  	"github.com/go-kit/kit/endpoint"
     8  	amqp "github.com/rabbitmq/amqp091-go"
     9  )
    10  
    11  // The golang AMQP implementation requires the []byte representation of
    12  // correlation id strings to have a maximum length of 255 bytes.
    13  const maxCorrelationIdLength = 255
    14  
    15  // Publisher wraps an AMQP channel and queue, and provides a method that
    16  // implements endpoint.Endpoint.
    17  type Publisher struct {
    18  	ch        Channel
    19  	q         *amqp.Queue
    20  	enc       EncodeRequestFunc
    21  	dec       DecodeResponseFunc
    22  	before    []RequestFunc
    23  	after     []PublisherResponseFunc
    24  	deliverer Deliverer
    25  	timeout   time.Duration
    26  }
    27  
    28  // NewPublisher constructs a usable Publisher for a single remote method.
    29  func NewPublisher(
    30  	ch Channel,
    31  	q *amqp.Queue,
    32  	enc EncodeRequestFunc,
    33  	dec DecodeResponseFunc,
    34  	options ...PublisherOption,
    35  ) *Publisher {
    36  	p := &Publisher{
    37  		ch:        ch,
    38  		q:         q,
    39  		enc:       enc,
    40  		dec:       dec,
    41  		deliverer: DefaultDeliverer,
    42  		timeout:   10 * time.Second,
    43  	}
    44  	for _, option := range options {
    45  		option(p)
    46  	}
    47  	return p
    48  }
    49  
    50  // PublisherOption sets an optional parameter for clients.
    51  type PublisherOption func(*Publisher)
    52  
    53  // PublisherBefore sets the RequestFuncs that are applied to the outgoing AMQP
    54  // request before it's invoked.
    55  func PublisherBefore(before ...RequestFunc) PublisherOption {
    56  	return func(p *Publisher) { p.before = append(p.before, before...) }
    57  }
    58  
    59  // PublisherAfter sets the ClientResponseFuncs applied to the incoming AMQP
    60  // request prior to it being decoded. This is useful for obtaining anything off
    61  // of the response and adding onto the context prior to decoding.
    62  func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
    63  	return func(p *Publisher) { p.after = append(p.after, after...) }
    64  }
    65  
    66  // PublisherDeliverer sets the deliverer function that the Publisher invokes.
    67  func PublisherDeliverer(deliverer Deliverer) PublisherOption {
    68  	return func(p *Publisher) { p.deliverer = deliverer }
    69  }
    70  
    71  // PublisherTimeout sets the available timeout for an AMQP request.
    72  func PublisherTimeout(timeout time.Duration) PublisherOption {
    73  	return func(p *Publisher) { p.timeout = timeout }
    74  }
    75  
    76  // Endpoint returns a usable endpoint that invokes the remote endpoint.
    77  func (p Publisher) Endpoint() endpoint.Endpoint {
    78  	return func(ctx context.Context, request interface{}) (interface{}, error) {
    79  		ctx, cancel := context.WithTimeout(ctx, p.timeout)
    80  		defer cancel()
    81  
    82  		pub := amqp.Publishing{
    83  			ReplyTo:       p.q.Name,
    84  			CorrelationId: randomString(randInt(5, maxCorrelationIdLength)),
    85  		}
    86  
    87  		if err := p.enc(ctx, &pub, request); err != nil {
    88  			return nil, err
    89  		}
    90  
    91  		for _, f := range p.before {
    92  			// Affect only amqp.Publishing
    93  			ctx = f(ctx, &pub, nil)
    94  		}
    95  
    96  		deliv, err := p.deliverer(ctx, p, &pub)
    97  		if err != nil {
    98  			return nil, err
    99  		}
   100  
   101  		for _, f := range p.after {
   102  			ctx = f(ctx, deliv)
   103  		}
   104  		response, err := p.dec(ctx, deliv)
   105  		if err != nil {
   106  			return nil, err
   107  		}
   108  
   109  		return response, nil
   110  	}
   111  }
   112  
   113  // Deliverer is invoked by the Publisher to publish the specified Publishing, and to
   114  // retrieve the appropriate response Delivery object.
   115  type Deliverer func(
   116  	context.Context,
   117  	Publisher,
   118  	*amqp.Publishing,
   119  ) (*amqp.Delivery, error)
   120  
   121  // DefaultDeliverer is a deliverer that publishes the specified Publishing
   122  // and returns the first Delivery object with the matching correlationId.
   123  // If the context times out while waiting for a reply, an error will be returned.
   124  func DefaultDeliverer(
   125  	ctx context.Context,
   126  	p Publisher,
   127  	pub *amqp.Publishing,
   128  ) (*amqp.Delivery, error) {
   129  	err := p.ch.Publish(
   130  		getPublishExchange(ctx),
   131  		getPublishKey(ctx),
   132  		false, //mandatory
   133  		false, //immediate
   134  		*pub,
   135  	)
   136  	if err != nil {
   137  		return nil, err
   138  	}
   139  	autoAck := getConsumeAutoAck(ctx)
   140  
   141  	msg, err := p.ch.Consume(
   142  		p.q.Name,
   143  		"", //consumer
   144  		autoAck,
   145  		false, //exclusive
   146  		false, //noLocal
   147  		false, //noWait
   148  		getConsumeArgs(ctx),
   149  	)
   150  	if err != nil {
   151  		return nil, err
   152  	}
   153  
   154  	for {
   155  		select {
   156  		case d := <-msg:
   157  			if d.CorrelationId == pub.CorrelationId {
   158  				if !autoAck {
   159  					d.Ack(false) //multiple
   160  				}
   161  				return &d, nil
   162  			}
   163  
   164  		case <-ctx.Done():
   165  			return nil, ctx.Err()
   166  		}
   167  	}
   168  
   169  }
   170  
   171  // SendAndForgetDeliverer delivers the supplied publishing and
   172  // returns a nil response.
   173  // When using this deliverer please ensure that the supplied DecodeResponseFunc and
   174  // PublisherResponseFunc are able to handle nil-type responses.
   175  func SendAndForgetDeliverer(
   176  	ctx context.Context,
   177  	p Publisher,
   178  	pub *amqp.Publishing,
   179  ) (*amqp.Delivery, error) {
   180  	err := p.ch.Publish(
   181  		getPublishExchange(ctx),
   182  		getPublishKey(ctx),
   183  		false, //mandatory
   184  		false, //immediate
   185  		*pub,
   186  	)
   187  	return nil, err
   188  }
   189  

View as plain text