...

Source file src/github.com/go-kit/kit/transport/amqp/request_response_func.go

Documentation: github.com/go-kit/kit/transport/amqp

     1  package amqp
     2  
     3  import (
     4  	"context"
     5  	"time"
     6  
     7  	amqp "github.com/rabbitmq/amqp091-go"
     8  )
     9  
    10  // RequestFunc may take information from a publisher request and put it into a
    11  // request context. In Subscribers, RequestFuncs are executed prior to invoking
    12  // the endpoint.
    13  type RequestFunc func(context.Context, *amqp.Publishing, *amqp.Delivery) context.Context
    14  
    15  // SubscriberResponseFunc may take information from a request context and use it to
    16  // manipulate a Publisher. SubscriberResponseFuncs are only executed in
    17  // subscribers, after invoking the endpoint but prior to publishing a reply.
    18  type SubscriberResponseFunc func(context.Context,
    19  	*amqp.Delivery,
    20  	Channel,
    21  	*amqp.Publishing,
    22  ) context.Context
    23  
    24  // PublisherResponseFunc may take information from an AMQP request and make the
    25  // response available for consumption. PublisherResponseFunc are only executed
    26  // in publishers, after a request has been made, but prior to it being decoded.
    27  type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context
    28  
    29  // SetPublishExchange returns a RequestFunc that sets the Exchange field
    30  // of an AMQP Publish call.
    31  func SetPublishExchange(publishExchange string) RequestFunc {
    32  	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
    33  		return context.WithValue(ctx, ContextKeyExchange, publishExchange)
    34  	}
    35  }
    36  
    37  // SetPublishKey returns a RequestFunc that sets the Key field
    38  // of an AMQP Publish call.
    39  func SetPublishKey(publishKey string) RequestFunc {
    40  	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
    41  		return context.WithValue(ctx, ContextKeyPublishKey, publishKey)
    42  	}
    43  }
    44  
    45  // SetPublishDeliveryMode sets the delivery mode of a Publishing.
    46  // Please refer to AMQP delivery mode constants in the AMQP package.
    47  func SetPublishDeliveryMode(dmode uint8) RequestFunc {
    48  	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
    49  		pub.DeliveryMode = dmode
    50  		return ctx
    51  	}
    52  }
    53  
    54  // SetNackSleepDuration returns a RequestFunc that sets the amount of time
    55  // to sleep in the event of a Nack.
    56  // This has to be used in conjunction with an error encoder that Nack and sleeps.
    57  // One example is the SingleNackRequeueErrorEncoder.
    58  // It is designed to be used by Subscribers.
    59  func SetNackSleepDuration(duration time.Duration) RequestFunc {
    60  	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
    61  		return context.WithValue(ctx, ContextKeyNackSleepDuration, duration)
    62  	}
    63  }
    64  
    65  // SetConsumeAutoAck returns a RequestFunc that sets whether or not to autoAck
    66  // messages when consuming.
    67  // When set to false, the publisher will Ack the first message it receives with
    68  // a matching correlationId.
    69  // It is designed to be used by Publishers.
    70  func SetConsumeAutoAck(autoAck bool) RequestFunc {
    71  	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
    72  		return context.WithValue(ctx, ContextKeyAutoAck, autoAck)
    73  	}
    74  }
    75  
    76  // SetConsumeArgs returns a RequestFunc that set the arguments for amqp Consume
    77  // function.
    78  // It is designed to be used by Publishers.
    79  func SetConsumeArgs(args amqp.Table) RequestFunc {
    80  	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
    81  		return context.WithValue(ctx, ContextKeyConsumeArgs, args)
    82  	}
    83  }
    84  
    85  // SetContentType returns a RequestFunc that sets the ContentType field of
    86  // an AMQP Publishing.
    87  func SetContentType(contentType string) RequestFunc {
    88  	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
    89  		pub.ContentType = contentType
    90  		return ctx
    91  	}
    92  }
    93  
    94  // SetContentEncoding returns a RequestFunc that sets the ContentEncoding field
    95  // of an AMQP Publishing.
    96  func SetContentEncoding(contentEncoding string) RequestFunc {
    97  	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
    98  		pub.ContentEncoding = contentEncoding
    99  		return ctx
   100  	}
   101  }
   102  
   103  // SetCorrelationID returns a RequestFunc that sets the CorrelationId field
   104  // of an AMQP Publishing.
   105  func SetCorrelationID(cid string) RequestFunc {
   106  	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
   107  		pub.CorrelationId = cid
   108  		return ctx
   109  	}
   110  }
   111  
   112  // SetAckAfterEndpoint returns a SubscriberResponseFunc that prompts the service
   113  // to Ack the Delivery object after successfully evaluating the endpoint,
   114  // and before it encodes the response.
   115  // It is designed to be used by Subscribers.
   116  func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc {
   117  	return func(ctx context.Context,
   118  		deliv *amqp.Delivery,
   119  		ch Channel,
   120  		pub *amqp.Publishing,
   121  	) context.Context {
   122  		deliv.Ack(multiple)
   123  		return ctx
   124  	}
   125  }
   126  
   127  func getPublishExchange(ctx context.Context) string {
   128  	if exchange := ctx.Value(ContextKeyExchange); exchange != nil {
   129  		return exchange.(string)
   130  	}
   131  	return ""
   132  }
   133  
   134  func getPublishKey(ctx context.Context) string {
   135  	if publishKey := ctx.Value(ContextKeyPublishKey); publishKey != nil {
   136  		return publishKey.(string)
   137  	}
   138  	return ""
   139  }
   140  
   141  func getNackSleepDuration(ctx context.Context) time.Duration {
   142  	if duration := ctx.Value(ContextKeyNackSleepDuration); duration != nil {
   143  		return duration.(time.Duration)
   144  	}
   145  	return 0
   146  }
   147  
   148  func getConsumeAutoAck(ctx context.Context) bool {
   149  	if autoAck := ctx.Value(ContextKeyAutoAck); autoAck != nil {
   150  		return autoAck.(bool)
   151  	}
   152  	return false
   153  }
   154  
   155  func getConsumeArgs(ctx context.Context) amqp.Table {
   156  	if args := ctx.Value(ContextKeyConsumeArgs); args != nil {
   157  		return args.(amqp.Table)
   158  	}
   159  	return nil
   160  }
   161  
   162  type contextKey int
   163  
   164  const (
   165  	// ContextKeyExchange is the value of the reply Exchange in
   166  	// amqp.Publish.
   167  	ContextKeyExchange contextKey = iota
   168  	// ContextKeyPublishKey is the value of the ReplyTo field in
   169  	// amqp.Publish.
   170  	ContextKeyPublishKey
   171  	// ContextKeyNackSleepDuration is the duration to sleep for if the
   172  	// service Nack and requeues a message.
   173  	// This is to prevent sporadic send-resending of message
   174  	// when a message is constantly Nack'd and requeued.
   175  	ContextKeyNackSleepDuration
   176  	// ContextKeyAutoAck is the value of autoAck field when calling
   177  	// amqp.Channel.Consume.
   178  	ContextKeyAutoAck
   179  	// ContextKeyConsumeArgs is the value of consumeArgs field when calling
   180  	// amqp.Channel.Consume.
   181  	ContextKeyConsumeArgs
   182  )
   183  

View as plain text