1 package grpc
2
3 import (
4 "context"
5 "fmt"
6 "reflect"
7
8 "google.golang.org/grpc"
9 "google.golang.org/grpc/metadata"
10
11 "github.com/go-kit/kit/endpoint"
12 )
13
14
15
16 type Client struct {
17 client *grpc.ClientConn
18 serviceName string
19 method string
20 enc EncodeRequestFunc
21 dec DecodeResponseFunc
22 grpcReply reflect.Type
23 before []ClientRequestFunc
24 after []ClientResponseFunc
25 finalizer []ClientFinalizerFunc
26 }
27
28
29
30
31 func NewClient(
32 cc *grpc.ClientConn,
33 serviceName string,
34 method string,
35 enc EncodeRequestFunc,
36 dec DecodeResponseFunc,
37 grpcReply interface{},
38 options ...ClientOption,
39 ) *Client {
40 c := &Client{
41 client: cc,
42 method: fmt.Sprintf("/%s/%s", serviceName, method),
43 enc: enc,
44 dec: dec,
45
46
47
48
49 grpcReply: reflect.TypeOf(
50 reflect.Indirect(
51 reflect.ValueOf(grpcReply),
52 ).Interface(),
53 ),
54 before: []ClientRequestFunc{},
55 after: []ClientResponseFunc{},
56 }
57 for _, option := range options {
58 option(c)
59 }
60 return c
61 }
62
63
64 type ClientOption func(*Client)
65
66
67
68 func ClientBefore(before ...ClientRequestFunc) ClientOption {
69 return func(c *Client) { c.before = append(c.before, before...) }
70 }
71
72
73
74
75 func ClientAfter(after ...ClientResponseFunc) ClientOption {
76 return func(c *Client) { c.after = append(c.after, after...) }
77 }
78
79
80
81 func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption {
82 return func(s *Client) { s.finalizer = append(s.finalizer, f...) }
83 }
84
85
86
87 func (c Client) Endpoint() endpoint.Endpoint {
88 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
89 ctx, cancel := context.WithCancel(ctx)
90 defer cancel()
91
92 if c.finalizer != nil {
93 defer func() {
94 for _, f := range c.finalizer {
95 f(ctx, err)
96 }
97 }()
98 }
99
100 ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method)
101
102 req, err := c.enc(ctx, request)
103 if err != nil {
104 return nil, err
105 }
106
107 md := &metadata.MD{}
108 for _, f := range c.before {
109 ctx = f(ctx, md)
110 }
111 ctx = metadata.NewOutgoingContext(ctx, *md)
112
113 var header, trailer metadata.MD
114 grpcReply := reflect.New(c.grpcReply).Interface()
115 if err = c.client.Invoke(
116 ctx, c.method, req, grpcReply, grpc.Header(&header),
117 grpc.Trailer(&trailer),
118 ); err != nil {
119 return nil, err
120 }
121
122 for _, f := range c.after {
123 ctx = f(ctx, header, trailer)
124 }
125
126 response, err = c.dec(ctx, grpcReply)
127 if err != nil {
128 return nil, err
129 }
130 return response, nil
131 }
132 }
133
134
135
136
137
138
139
140 type ClientFinalizerFunc func(ctx context.Context, err error)
141
View as plain text