...

Source file src/github.com/go-kit/kit/transport/grpc/client.go

Documentation: github.com/go-kit/kit/transport/grpc

     1  package grpc
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"reflect"
     7  
     8  	"google.golang.org/grpc"
     9  	"google.golang.org/grpc/metadata"
    10  
    11  	"github.com/go-kit/kit/endpoint"
    12  )
    13  
    14  // Client wraps a gRPC connection and provides a method that implements
    15  // endpoint.Endpoint.
    16  type Client struct {
    17  	client      *grpc.ClientConn
    18  	serviceName string
    19  	method      string
    20  	enc         EncodeRequestFunc
    21  	dec         DecodeResponseFunc
    22  	grpcReply   reflect.Type
    23  	before      []ClientRequestFunc
    24  	after       []ClientResponseFunc
    25  	finalizer   []ClientFinalizerFunc
    26  }
    27  
    28  // NewClient constructs a usable Client for a single remote endpoint.
    29  // Pass an zero-value protobuf message of the RPC response type as
    30  // the grpcReply argument.
    31  func NewClient(
    32  	cc *grpc.ClientConn,
    33  	serviceName string,
    34  	method string,
    35  	enc EncodeRequestFunc,
    36  	dec DecodeResponseFunc,
    37  	grpcReply interface{},
    38  	options ...ClientOption,
    39  ) *Client {
    40  	c := &Client{
    41  		client: cc,
    42  		method: fmt.Sprintf("/%s/%s", serviceName, method),
    43  		enc:    enc,
    44  		dec:    dec,
    45  		// We are using reflect.Indirect here to allow both reply structs and
    46  		// pointers to these reply structs. New consumers of the client should
    47  		// use structs directly, while existing consumers will not break if they
    48  		// remain to use pointers to structs.
    49  		grpcReply: reflect.TypeOf(
    50  			reflect.Indirect(
    51  				reflect.ValueOf(grpcReply),
    52  			).Interface(),
    53  		),
    54  		before: []ClientRequestFunc{},
    55  		after:  []ClientResponseFunc{},
    56  	}
    57  	for _, option := range options {
    58  		option(c)
    59  	}
    60  	return c
    61  }
    62  
    63  // ClientOption sets an optional parameter for clients.
    64  type ClientOption func(*Client)
    65  
    66  // ClientBefore sets the RequestFuncs that are applied to the outgoing gRPC
    67  // request before it's invoked.
    68  func ClientBefore(before ...ClientRequestFunc) ClientOption {
    69  	return func(c *Client) { c.before = append(c.before, before...) }
    70  }
    71  
    72  // ClientAfter sets the ClientResponseFuncs that are applied to the incoming
    73  // gRPC response prior to it being decoded. This is useful for obtaining
    74  // response metadata and adding onto the context prior to decoding.
    75  func ClientAfter(after ...ClientResponseFunc) ClientOption {
    76  	return func(c *Client) { c.after = append(c.after, after...) }
    77  }
    78  
    79  // ClientFinalizer is executed at the end of every gRPC request.
    80  // By default, no finalizer is registered.
    81  func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption {
    82  	return func(s *Client) { s.finalizer = append(s.finalizer, f...) }
    83  }
    84  
    85  // Endpoint returns a usable endpoint that will invoke the gRPC specified by the
    86  // client.
    87  func (c Client) Endpoint() endpoint.Endpoint {
    88  	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
    89  		ctx, cancel := context.WithCancel(ctx)
    90  		defer cancel()
    91  
    92  		if c.finalizer != nil {
    93  			defer func() {
    94  				for _, f := range c.finalizer {
    95  					f(ctx, err)
    96  				}
    97  			}()
    98  		}
    99  
   100  		ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method)
   101  
   102  		req, err := c.enc(ctx, request)
   103  		if err != nil {
   104  			return nil, err
   105  		}
   106  
   107  		md := &metadata.MD{}
   108  		for _, f := range c.before {
   109  			ctx = f(ctx, md)
   110  		}
   111  		ctx = metadata.NewOutgoingContext(ctx, *md)
   112  
   113  		var header, trailer metadata.MD
   114  		grpcReply := reflect.New(c.grpcReply).Interface()
   115  		if err = c.client.Invoke(
   116  			ctx, c.method, req, grpcReply, grpc.Header(&header),
   117  			grpc.Trailer(&trailer),
   118  		); err != nil {
   119  			return nil, err
   120  		}
   121  
   122  		for _, f := range c.after {
   123  			ctx = f(ctx, header, trailer)
   124  		}
   125  
   126  		response, err = c.dec(ctx, grpcReply)
   127  		if err != nil {
   128  			return nil, err
   129  		}
   130  		return response, nil
   131  	}
   132  }
   133  
   134  // ClientFinalizerFunc can be used to perform work at the end of a client gRPC
   135  // request, after the response is returned. The principal
   136  // intended use is for error logging. Additional response parameters are
   137  // provided in the context under keys with the ContextKeyResponse prefix.
   138  // Note: err may be nil. There maybe also no additional response parameters depending on
   139  // when an error occurs.
   140  type ClientFinalizerFunc func(ctx context.Context, err error)
   141  

View as plain text