1
18
19
20
21 package stubserver
22
23 import (
24 "context"
25 "fmt"
26 "net"
27 "testing"
28 "time"
29
30 "golang.org/x/net/http2"
31 "google.golang.org/grpc"
32 "google.golang.org/grpc/connectivity"
33 "google.golang.org/grpc/credentials/insecure"
34 "google.golang.org/grpc/resolver"
35 "google.golang.org/grpc/resolver/manual"
36 "google.golang.org/grpc/serviceconfig"
37
38 testgrpc "google.golang.org/grpc/interop/grpc_testing"
39 testpb "google.golang.org/grpc/interop/grpc_testing"
40 )
41
42
43
44 type StubServer struct {
45
46 testgrpc.TestServiceServer
47
48
49 EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
50 UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
51 FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error
52
53
54 Client testgrpc.TestServiceClient
55 CC *grpc.ClientConn
56 S *grpc.Server
57
58
59
60 Network string
61 Address string
62 Target string
63
64
65
66 Listener net.Listener
67
68 cleanups []func()
69
70
71 R *manual.Resolver
72 }
73
74
75 func (ss *StubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
76 return ss.EmptyCallF(ctx, in)
77 }
78
79
80 func (ss *StubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
81 return ss.UnaryCallF(ctx, in)
82 }
83
84
85 func (ss *StubServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
86 return ss.FullDuplexCallF(stream)
87 }
88
89
90 func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error {
91 if err := ss.StartServer(sopts...); err != nil {
92 return err
93 }
94 if err := ss.StartClient(dopts...); err != nil {
95 ss.Stop()
96 return err
97 }
98 return nil
99 }
100
101 type registerServiceServerOption struct {
102 grpc.EmptyServerOption
103 f func(*grpc.Server)
104 }
105
106
107
108
109
110 func RegisterServiceServerOption(f func(*grpc.Server)) grpc.ServerOption {
111 return ®isterServiceServerOption{f: f}
112 }
113
114 func (ss *StubServer) setupServer(sopts ...grpc.ServerOption) (net.Listener, error) {
115 if ss.Network == "" {
116 ss.Network = "tcp"
117 }
118 if ss.Address == "" {
119 ss.Address = "localhost:0"
120 }
121 if ss.Target == "" {
122 ss.R = manual.NewBuilderWithScheme("whatever")
123 }
124
125 lis := ss.Listener
126 if lis == nil {
127 var err error
128 lis, err = net.Listen(ss.Network, ss.Address)
129 if err != nil {
130 return nil, fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
131 }
132 }
133 ss.Address = lis.Addr().String()
134
135 ss.S = grpc.NewServer(sopts...)
136 for _, so := range sopts {
137 switch x := so.(type) {
138 case *registerServiceServerOption:
139 x.f(ss.S)
140 }
141 }
142
143 testgrpc.RegisterTestServiceServer(ss.S, ss)
144 ss.cleanups = append(ss.cleanups, ss.S.Stop)
145 return lis, nil
146 }
147
148
149
150
151 func (ss *StubServer) StartHandlerServer(sopts ...grpc.ServerOption) error {
152 lis, err := ss.setupServer(sopts...)
153 if err != nil {
154 return err
155 }
156
157 go func() {
158 hs := &http2.Server{}
159 opts := &http2.ServeConnOpts{Handler: ss.S}
160 for {
161 conn, err := lis.Accept()
162 if err != nil {
163 return
164 }
165 hs.ServeConn(conn, opts)
166 }
167 }()
168 ss.cleanups = append(ss.cleanups, func() { lis.Close() })
169
170 return nil
171 }
172
173
174
175 func (ss *StubServer) StartServer(sopts ...grpc.ServerOption) error {
176 lis, err := ss.setupServer(sopts...)
177 if err != nil {
178 return err
179 }
180
181 go ss.S.Serve(lis)
182
183 return nil
184 }
185
186
187
188 func (ss *StubServer) StartClient(dopts ...grpc.DialOption) error {
189 opts := append([]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, dopts...)
190 if ss.R != nil {
191 ss.Target = ss.R.Scheme() + ":///" + ss.Address
192 opts = append(opts, grpc.WithResolvers(ss.R))
193 }
194
195 cc, err := grpc.Dial(ss.Target, opts...)
196 if err != nil {
197 return fmt.Errorf("grpc.Dial(%q) = %v", ss.Target, err)
198 }
199 ss.CC = cc
200 if ss.R != nil {
201 ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}})
202 }
203 if err := waitForReady(cc); err != nil {
204 cc.Close()
205 return err
206 }
207
208 ss.cleanups = append(ss.cleanups, func() { cc.Close() })
209
210 ss.Client = testgrpc.NewTestServiceClient(cc)
211 return nil
212 }
213
214
215 func (ss *StubServer) NewServiceConfig(sc string) {
216 if ss.R != nil {
217 ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}, ServiceConfig: parseCfg(ss.R, sc)})
218 }
219 }
220
221 func waitForReady(cc *grpc.ClientConn) error {
222 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
223 defer cancel()
224 for {
225 s := cc.GetState()
226 if s == connectivity.Ready {
227 return nil
228 }
229 if !cc.WaitForStateChange(ctx, s) {
230
231 return ctx.Err()
232 }
233 }
234 }
235
236
237 func (ss *StubServer) Stop() {
238 for i := len(ss.cleanups) - 1; i >= 0; i-- {
239 ss.cleanups[i]()
240 }
241 }
242
243 func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
244 g := r.CC.ParseServiceConfig(s)
245 if g.Err != nil {
246 panic(fmt.Sprintf("Error parsing config %q: %v", s, g.Err))
247 }
248 return g
249 }
250
251
252
253
254 func StartTestService(t *testing.T, server *StubServer, sopts ...grpc.ServerOption) *StubServer {
255 if server == nil {
256 server = &StubServer{
257 EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
258 UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
259 return &testpb.SimpleResponse{}, nil
260 },
261 }
262 }
263 server.StartServer(sopts...)
264
265 t.Logf("Started test service backend at %q", server.Address)
266 return server
267 }
268
View as plain text