1
18
19 package test
20
21 import (
22 "context"
23 "errors"
24 "fmt"
25 "net"
26 "sync"
27 "testing"
28 "time"
29
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/connectivity"
33 "google.golang.org/grpc/credentials/insecure"
34 "google.golang.org/grpc/health"
35 "google.golang.org/grpc/internal"
36 "google.golang.org/grpc/internal/channelz"
37 "google.golang.org/grpc/internal/grpctest"
38 "google.golang.org/grpc/internal/testutils"
39 "google.golang.org/grpc/resolver"
40 "google.golang.org/grpc/resolver/manual"
41 "google.golang.org/grpc/status"
42
43 healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
44 healthpb "google.golang.org/grpc/health/grpc_health_v1"
45 testgrpc "google.golang.org/grpc/interop/grpc_testing"
46 testpb "google.golang.org/grpc/interop/grpc_testing"
47 )
48
49 var testHealthCheckFunc = internal.HealthCheckFunc
50
51 func newTestHealthServer() *testHealthServer {
52 return newTestHealthServerWithWatchFunc(defaultWatchFunc)
53 }
54
55 func newTestHealthServerWithWatchFunc(f healthWatchFunc) *testHealthServer {
56 return &testHealthServer{
57 watchFunc: f,
58 update: make(chan struct{}, 1),
59 status: make(map[string]healthpb.HealthCheckResponse_ServingStatus),
60 }
61 }
62
63
64 func defaultWatchFunc(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
65 if in.Service != "foo" {
66 return status.Error(codes.FailedPrecondition,
67 "the defaultWatchFunc only handles request with service name to be \"foo\"")
68 }
69 var done bool
70 for {
71 select {
72 case <-stream.Context().Done():
73 done = true
74 case <-s.update:
75 }
76 if done {
77 break
78 }
79 s.mu.Lock()
80 resp := &healthpb.HealthCheckResponse{
81 Status: s.status[in.Service],
82 }
83 s.mu.Unlock()
84 stream.SendMsg(resp)
85 }
86 return nil
87 }
88
89 type healthWatchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
90
91 type testHealthServer struct {
92 healthgrpc.UnimplementedHealthServer
93 watchFunc healthWatchFunc
94 mu sync.Mutex
95 status map[string]healthpb.HealthCheckResponse_ServingStatus
96 update chan struct{}
97 }
98
99 func (s *testHealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
100 return &healthpb.HealthCheckResponse{
101 Status: healthpb.HealthCheckResponse_SERVING,
102 }, nil
103 }
104
105 func (s *testHealthServer) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
106 return s.watchFunc(s, in, stream)
107 }
108
109
110
111 func (s *testHealthServer) SetServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) {
112 s.mu.Lock()
113 s.status[service] = status
114 select {
115 case <-s.update:
116 default:
117 }
118 s.update <- struct{}{}
119 s.mu.Unlock()
120 }
121
122 func setupHealthCheckWrapper() (hcEnterChan chan struct{}, hcExitChan chan struct{}, wrapper internal.HealthChecker) {
123 hcEnterChan = make(chan struct{})
124 hcExitChan = make(chan struct{})
125 wrapper = func(ctx context.Context, newStream func(string) (any, error), update func(connectivity.State, error), service string) error {
126 close(hcEnterChan)
127 defer close(hcExitChan)
128 return testHealthCheckFunc(ctx, newStream, update, service)
129 }
130 return
131 }
132
133 func setupServer(t *testing.T, watchFunc healthWatchFunc) (*grpc.Server, net.Listener, *testHealthServer) {
134 t.Helper()
135
136 lis, err := net.Listen("tcp", "localhost:0")
137 if err != nil {
138 t.Fatalf("net.Listen() failed: %v", err)
139 }
140
141 var ts *testHealthServer
142 if watchFunc != nil {
143 ts = newTestHealthServerWithWatchFunc(watchFunc)
144 } else {
145 ts = newTestHealthServer()
146 }
147 s := grpc.NewServer()
148 healthgrpc.RegisterHealthServer(s, ts)
149 testgrpc.RegisterTestServiceServer(s, &testServer{})
150 go s.Serve(lis)
151 t.Cleanup(func() { s.Stop() })
152 return s, lis, ts
153 }
154
155 type clientConfig struct {
156 balancerName string
157 testHealthCheckFuncWrapper internal.HealthChecker
158 extraDialOption []grpc.DialOption
159 }
160
161 func setupClient(t *testing.T, c *clientConfig) (*grpc.ClientConn, *manual.Resolver) {
162 t.Helper()
163
164 r := manual.NewBuilderWithScheme("whatever")
165 opts := []grpc.DialOption{
166 grpc.WithTransportCredentials(insecure.NewCredentials()),
167 grpc.WithResolvers(r),
168 }
169 if c != nil {
170 if c.balancerName != "" {
171 opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, c.balancerName)))
172 }
173 if c.testHealthCheckFuncWrapper != nil {
174 opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper))
175 }
176 opts = append(opts, c.extraDialOption...)
177 }
178
179 cc, err := grpc.Dial(r.Scheme()+":///test.server", opts...)
180 if err != nil {
181 t.Fatalf("grpc.Dial() failed: %v", err)
182 }
183 t.Cleanup(func() { cc.Close() })
184 return cc, r
185 }
186
187 func (s) TestHealthCheckWatchStateChange(t *testing.T) {
188 _, lis, ts := setupServer(t, nil)
189
190
191
192
193
194
195
196
197
198
199
200
201
202 ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING)
203
204 cc, r := setupClient(t, nil)
205 r.UpdateState(resolver.State{
206 Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
207 ServiceConfig: parseServiceConfig(t, r, `{
208 "healthCheckConfig": {
209 "serviceName": "foo"
210 },
211 "loadBalancingConfig": [{"round_robin":{}}]
212 }`)})
213
214 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
215 defer cancel()
216 testutils.AwaitNotState(ctx, t, cc, connectivity.Idle)
217 testutils.AwaitNotState(ctx, t, cc, connectivity.Connecting)
218 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
219 if s := cc.GetState(); s != connectivity.TransientFailure {
220 t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
221 }
222
223 ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
224 testutils.AwaitNotState(ctx, t, cc, connectivity.TransientFailure)
225 if s := cc.GetState(); s != connectivity.Ready {
226 t.Fatalf("ClientConn is in %v state, want READY", s)
227 }
228
229 ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
230 testutils.AwaitNotState(ctx, t, cc, connectivity.Ready)
231 if s := cc.GetState(); s != connectivity.TransientFailure {
232 t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
233 }
234
235 ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
236 testutils.AwaitNotState(ctx, t, cc, connectivity.TransientFailure)
237 if s := cc.GetState(); s != connectivity.Ready {
238 t.Fatalf("ClientConn is in %v state, want READY", s)
239 }
240
241 ts.SetServingStatus("foo", healthpb.HealthCheckResponse_UNKNOWN)
242 testutils.AwaitNotState(ctx, t, cc, connectivity.Ready)
243 if s := cc.GetState(); s != connectivity.TransientFailure {
244 t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
245 }
246 }
247
248
249 func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) {
250 grpctest.TLogger.ExpectError("Subchannel health check is unimplemented at server side, thus health check is disabled")
251 s := grpc.NewServer()
252 lis, err := net.Listen("tcp", "localhost:0")
253 if err != nil {
254 t.Fatalf("failed to listen due to err: %v", err)
255 }
256 go s.Serve(lis)
257 defer s.Stop()
258
259 cc, r := setupClient(t, nil)
260 r.UpdateState(resolver.State{
261 Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
262 ServiceConfig: parseServiceConfig(t, r, `{
263 "healthCheckConfig": {
264 "serviceName": "foo"
265 },
266 "loadBalancingConfig": [{"round_robin":{}}]
267 }`)})
268
269 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
270 defer cancel()
271 testutils.AwaitNotState(ctx, t, cc, connectivity.Idle)
272 testutils.AwaitNotState(ctx, t, cc, connectivity.Connecting)
273 if s := cc.GetState(); s != connectivity.Ready {
274 t.Fatalf("ClientConn is in %v state, want READY", s)
275 }
276 }
277
278
279
280 func (s) TestHealthCheckWithGoAway(t *testing.T) {
281 s, lis, ts := setupServer(t, nil)
282 ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
283
284 hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
285 cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
286 tc := testgrpc.NewTestServiceClient(cc)
287 r.UpdateState(resolver.State{
288 Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
289 ServiceConfig: parseServiceConfig(t, r, `{
290 "healthCheckConfig": {
291 "serviceName": "foo"
292 },
293 "loadBalancingConfig": [{"round_robin":{}}]
294 }`)})
295
296 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
297 defer cancel()
298
299 if err := verifyResultWithDelay(func() (bool, error) {
300 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
301 return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
302 }
303 return true, nil
304 }); err != nil {
305 t.Fatal(err)
306 }
307
308
309 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
310 if err != nil {
311 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
312 }
313 respParam := []*testpb.ResponseParameters{{Size: 1}}
314 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
315 if err != nil {
316 t.Fatal(err)
317 }
318 req := &testpb.StreamingOutputCallRequest{
319 ResponseParameters: respParam,
320 Payload: payload,
321 }
322 if err := stream.Send(req); err != nil {
323 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
324 }
325 if _, err := stream.Recv(); err != nil {
326 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
327 }
328
329 select {
330 case <-hcExitChan:
331 t.Fatal("Health check function has exited, which is not expected.")
332 default:
333 }
334
335
336 go s.GracefulStop()
337
338 select {
339 case <-hcExitChan:
340 case <-time.After(5 * time.Second):
341 select {
342 case <-hcEnterChan:
343 default:
344 t.Fatal("Health check function has not entered after 5s.")
345 }
346 t.Fatal("Health check function has not exited after 5s.")
347 }
348
349
350 if err := stream.Send(req); err != nil {
351 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
352 }
353 if _, err := stream.Recv(); err != nil {
354 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
355 }
356 }
357
358 func (s) TestHealthCheckWithConnClose(t *testing.T) {
359 s, lis, ts := setupServer(t, nil)
360 ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
361
362 hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
363 cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
364 tc := testgrpc.NewTestServiceClient(cc)
365 r.UpdateState(resolver.State{
366 Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
367 ServiceConfig: parseServiceConfig(t, r, `{
368 "healthCheckConfig": {
369 "serviceName": "foo"
370 },
371 "loadBalancingConfig": [{"round_robin":{}}]
372 }`)})
373
374 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
375 defer cancel()
376
377 if err := verifyResultWithDelay(func() (bool, error) {
378 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
379 return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
380 }
381 return true, nil
382 }); err != nil {
383 t.Fatal(err)
384 }
385
386 select {
387 case <-hcExitChan:
388 t.Fatal("Health check function has exited, which is not expected.")
389 default:
390 }
391
392 s.Stop()
393
394 select {
395 case <-hcExitChan:
396 case <-time.After(5 * time.Second):
397 select {
398 case <-hcEnterChan:
399 default:
400 t.Fatal("Health check function has not entered after 5s.")
401 }
402 t.Fatal("Health check function has not exited after 5s.")
403 }
404 }
405
406
407
408 func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
409 _, lis, ts := setupServer(t, nil)
410 ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
411
412 hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
413 cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
414 tc := testgrpc.NewTestServiceClient(cc)
415 sc := parseServiceConfig(t, r, `{
416 "healthCheckConfig": {
417 "serviceName": "foo"
418 },
419 "loadBalancingConfig": [{"round_robin":{}}]
420 }`)
421 r.UpdateState(resolver.State{
422 Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
423 ServiceConfig: sc,
424 })
425
426 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
427 defer cancel()
428
429 if err := verifyResultWithDelay(func() (bool, error) {
430 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
431 return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
432 }
433 return true, nil
434 }); err != nil {
435 t.Fatal(err)
436 }
437
438
439 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
440 if err != nil {
441 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
442 }
443 respParam := []*testpb.ResponseParameters{{Size: 1}}
444 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
445 if err != nil {
446 t.Fatal(err)
447 }
448 req := &testpb.StreamingOutputCallRequest{
449 ResponseParameters: respParam,
450 Payload: payload,
451 }
452 if err := stream.Send(req); err != nil {
453 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
454 }
455 if _, err := stream.Recv(); err != nil {
456 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
457 }
458
459 select {
460 case <-hcExitChan:
461 t.Fatal("Health check function has exited, which is not expected.")
462 default:
463 }
464
465 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})
466
467 select {
468 case <-hcExitChan:
469 case <-time.After(5 * time.Second):
470 select {
471 case <-hcEnterChan:
472 default:
473 t.Fatal("Health check function has not entered after 5s.")
474 }
475 t.Fatal("Health check function has not exited after 5s.")
476 }
477
478
479 if err := stream.Send(req); err != nil {
480 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
481 }
482 if _, err := stream.Recv(); err != nil {
483 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
484 }
485 }
486
487
488 func (s) TestHealthCheckWithClientConnClose(t *testing.T) {
489 _, lis, ts := setupServer(t, nil)
490 ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
491
492 hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
493 cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
494 tc := testgrpc.NewTestServiceClient(cc)
495 r.UpdateState(resolver.State{
496 Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
497 ServiceConfig: parseServiceConfig(t, r, `{
498 "healthCheckConfig": {
499 "serviceName": "foo"
500 },
501 "loadBalancingConfig": [{"round_robin":{}}]
502 }`)})
503
504 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
505 defer cancel()
506
507 if err := verifyResultWithDelay(func() (bool, error) {
508 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
509 return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
510 }
511 return true, nil
512 }); err != nil {
513 t.Fatal(err)
514 }
515
516 select {
517 case <-hcExitChan:
518 t.Fatal("Health check function has exited, which is not expected.")
519 default:
520 }
521
522
523 cc.Close()
524
525 select {
526 case <-hcExitChan:
527 case <-time.After(5 * time.Second):
528 select {
529 case <-hcEnterChan:
530 default:
531 t.Fatal("Health check function has not entered after 5s.")
532 }
533 t.Fatal("Health check function has not exited after 5s.")
534 }
535 }
536
537
538
539
540 func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *testing.T) {
541 watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
542 if in.Service != "delay" {
543 return status.Error(codes.FailedPrecondition,
544 "this special Watch function only handles request with service name to be \"delay\"")
545 }
546
547
548
549 select {
550 case <-stream.Context().Done():
551 case <-time.After(5 * time.Second):
552 }
553 return nil
554 }
555 _, lis, ts := setupServer(t, watchFunc)
556 ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
557
558 hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
559 _, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
560
561
562
563
564 sc := parseServiceConfig(t, r, `{
565 "healthCheckConfig": {
566 "serviceName": "delay"
567 },
568 "loadBalancingConfig": [{"round_robin":{}}]
569 }`)
570 r.UpdateState(resolver.State{
571 Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
572 ServiceConfig: sc,
573 })
574
575 select {
576 case <-hcExitChan:
577 t.Fatal("Health check function has exited, which is not expected.")
578 default:
579 }
580
581 select {
582 case <-hcEnterChan:
583 case <-time.After(5 * time.Second):
584 t.Fatal("Health check function has not been invoked after 5s.")
585 }
586
587 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})
588
589
590
591 select {
592 case <-hcExitChan:
593 case <-time.After(5 * time.Second):
594 t.Fatal("Health check function has not exited after 5s.")
595 }
596
597
598 }
599
600
601
602
603 func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) {
604 watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
605 if in.Service != "delay" {
606 return status.Error(codes.FailedPrecondition,
607 "this special Watch function only handles request with service name to be \"delay\"")
608 }
609
610
611
612 select {
613 case <-stream.Context().Done():
614 case <-time.After(5 * time.Second):
615 }
616 return nil
617 }
618 s, lis, ts := setupServer(t, watchFunc)
619 ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
620
621 hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
622 _, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
623
624
625
626
627 r.UpdateState(resolver.State{
628 Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
629 ServiceConfig: parseServiceConfig(t, r, `{
630 "healthCheckConfig": {
631 "serviceName": "delay"
632 },
633 "loadBalancingConfig": [{"round_robin":{}}]
634 }`)})
635
636 select {
637 case <-hcExitChan:
638 t.Fatal("Health check function has exited, which is not expected.")
639 default:
640 }
641
642 select {
643 case <-hcEnterChan:
644 case <-time.After(5 * time.Second):
645 t.Fatal("Health check function has not been invoked after 5s.")
646 }
647
648 s.Stop()
649
650
651
652 select {
653 case <-hcExitChan:
654 case <-time.After(5 * time.Second):
655 t.Fatal("Health check function has not exited after 5s.")
656 }
657
658
659 }
660
661 func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
662 hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
663 cc, r := setupClient(t, &clientConfig{
664 testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
665 extraDialOption: []grpc.DialOption{grpc.WithDisableHealthCheck()},
666 })
667 tc := testgrpc.NewTestServiceClient(cc)
668 r.UpdateState(resolver.State{
669 Addresses: []resolver.Address{{Addr: addr}},
670 ServiceConfig: parseServiceConfig(t, r, `{
671 "healthCheckConfig": {
672 "serviceName": "foo"
673 },
674 "loadBalancingConfig": [{"round_robin":{}}]
675 }`)})
676
677 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
678 defer cancel()
679
680 if err := verifyResultWithDelay(func() (bool, error) {
681 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
682 return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
683 }
684 return true, nil
685 }); err != nil {
686 t.Fatal(err)
687 }
688
689 select {
690 case <-hcEnterChan:
691 t.Fatal("Health check function has exited, which is not expected.")
692 default:
693 }
694 }
695
696 func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
697 hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
698 cc, r := setupClient(t, &clientConfig{
699 testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
700 })
701 tc := testgrpc.NewTestServiceClient(cc)
702 r.UpdateState(resolver.State{
703 Addresses: []resolver.Address{{Addr: addr}},
704 ServiceConfig: parseServiceConfig(t, r, `{
705 "healthCheckConfig": {
706 "serviceName": "foo"
707 },
708 "loadBalancingConfig": [{"pick_first":{}}]
709 }`)})
710
711 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
712 defer cancel()
713
714 if err := verifyResultWithDelay(func() (bool, error) {
715 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
716 return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
717 }
718 return true, nil
719 }); err != nil {
720 t.Fatal(err)
721 }
722
723 select {
724 case <-hcEnterChan:
725 t.Fatal("Health check function has started, which is not expected.")
726 default:
727 }
728 }
729
730 func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
731 hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
732 cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
733 tc := testgrpc.NewTestServiceClient(cc)
734 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: addr}}})
735
736 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
737 defer cancel()
738
739 if err := verifyResultWithDelay(func() (bool, error) {
740 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
741 return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
742 }
743 return true, nil
744 }); err != nil {
745 t.Fatal(err)
746 }
747
748 select {
749 case <-hcEnterChan:
750 t.Fatal("Health check function has started, which is not expected.")
751 default:
752 }
753 }
754
755 func (s) TestHealthCheckDisable(t *testing.T) {
756 _, lis, ts := setupServer(t, nil)
757 ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
758
759
760 testHealthCheckDisableWithDialOption(t, lis.Addr().String())
761 testHealthCheckDisableWithBalancer(t, lis.Addr().String())
762 testHealthCheckDisableWithServiceConfig(t, lis.Addr().String())
763 }
764
765 func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
766 watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
767 if in.Service != "channelzSuccess" {
768 return status.Error(codes.FailedPrecondition,
769 "this special Watch function only handles request with service name to be \"channelzSuccess\"")
770 }
771 return status.Error(codes.OK, "fake success")
772 }
773 _, lis, _ := setupServer(t, watchFunc)
774
775 _, r := setupClient(t, nil)
776 r.UpdateState(resolver.State{
777 Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
778 ServiceConfig: parseServiceConfig(t, r, `{
779 "healthCheckConfig": {
780 "serviceName": "channelzSuccess"
781 },
782 "loadBalancingConfig": [{"round_robin":{}}]
783 }`)})
784
785 if err := verifyResultWithDelay(func() (bool, error) {
786 cm, _ := channelz.GetTopChannels(0, 0)
787 if len(cm) == 0 {
788 return false, errors.New("channelz.GetTopChannels return 0 top channel")
789 }
790 subChans := cm[0].SubChans()
791 if len(subChans) == 0 {
792 return false, errors.New("there is 0 subchannel")
793 }
794 var id int64
795 for k := range subChans {
796 id = k
797 break
798 }
799 scm := channelz.GetSubChannel(id)
800 if scm == nil {
801 return false, errors.New("nil subchannel returned")
802 }
803
804 cstart, csucc, cfail := scm.ChannelMetrics.CallsStarted.Load(), scm.ChannelMetrics.CallsSucceeded.Load(), scm.ChannelMetrics.CallsFailed.Load()
805 if cstart > 0 && csucc > 0 && cfail == 0 {
806 return true, nil
807 }
808 return false, fmt.Errorf("got %d CallsStarted, %d CallsSucceeded %d CallsFailed, want >0 >0 =0", cstart, csucc, cfail)
809 }); err != nil {
810 t.Fatal(err)
811 }
812 }
813
814 func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
815 watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
816 if in.Service != "channelzFailure" {
817 return status.Error(codes.FailedPrecondition,
818 "this special Watch function only handles request with service name to be \"channelzFailure\"")
819 }
820 return status.Error(codes.Internal, "fake failure")
821 }
822 _, lis, _ := setupServer(t, watchFunc)
823
824 _, r := setupClient(t, nil)
825 r.UpdateState(resolver.State{
826 Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
827 ServiceConfig: parseServiceConfig(t, r, `{
828 "healthCheckConfig": {
829 "serviceName": "channelzFailure"
830 },
831 "loadBalancingConfig": [{"round_robin":{}}]
832 }`)})
833
834 if err := verifyResultWithDelay(func() (bool, error) {
835 cm, _ := channelz.GetTopChannels(0, 0)
836 if len(cm) == 0 {
837 return false, errors.New("channelz.GetTopChannels return 0 top channel")
838 }
839 subChans := cm[0].SubChans()
840 if len(subChans) == 0 {
841 return false, errors.New("there is 0 subchannel")
842 }
843 var id int64
844 for k := range subChans {
845 id = k
846 break
847 }
848 scm := channelz.GetSubChannel(id)
849 if scm == nil {
850 return false, errors.New("nil subchannel returned")
851 }
852
853 cstart, cfail, csucc := scm.ChannelMetrics.CallsStarted.Load(), scm.ChannelMetrics.CallsFailed.Load(), scm.ChannelMetrics.CallsSucceeded.Load()
854 if cstart > 0 && cfail > 0 && csucc == 0 {
855 return true, nil
856 }
857 return false, fmt.Errorf("got %d CallsStarted, %d CallsFailed, %d CallsSucceeded, want >0, >0", cstart, cfail, csucc)
858 }); err != nil {
859 t.Fatal(err)
860 }
861 }
862
863
864
865 func healthCheck(d time.Duration, cc *grpc.ClientConn, service string) (*healthpb.HealthCheckResponse, error) {
866 ctx, cancel := context.WithTimeout(context.Background(), d)
867 defer cancel()
868 hc := healthgrpc.NewHealthClient(cc)
869 return hc.Check(ctx, &healthpb.HealthCheckRequest{Service: service})
870 }
871
872
873
874 func verifyHealthCheckStatus(t *testing.T, d time.Duration, cc *grpc.ClientConn, service string, wantStatus healthpb.HealthCheckResponse_ServingStatus) {
875 t.Helper()
876 resp, err := healthCheck(d, cc, service)
877 if err != nil {
878 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
879 }
880 if resp.Status != wantStatus {
881 t.Fatalf("Got the serving status %v, want %v", resp.Status, wantStatus)
882 }
883 }
884
885
886
887 func verifyHealthCheckErrCode(t *testing.T, d time.Duration, cc *grpc.ClientConn, service string, wantCode codes.Code) {
888 t.Helper()
889 if _, err := healthCheck(d, cc, service); status.Code(err) != wantCode {
890 t.Fatalf("Health/Check() got errCode %v, want %v", status.Code(err), wantCode)
891 }
892 }
893
894
895
896 func newHealthCheckStream(t *testing.T, cc *grpc.ClientConn, service string) (healthgrpc.Health_WatchClient, context.CancelFunc) {
897 t.Helper()
898 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
899 hc := healthgrpc.NewHealthClient(cc)
900 stream, err := hc.Watch(ctx, &healthpb.HealthCheckRequest{Service: service})
901 if err != nil {
902 t.Fatalf("hc.Watch(_, %v) failed: %v", service, err)
903 }
904 return stream, cancel
905 }
906
907
908
909 func healthWatchChecker(t *testing.T, stream healthgrpc.Health_WatchClient, wantStatus healthpb.HealthCheckResponse_ServingStatus) {
910 t.Helper()
911 response, err := stream.Recv()
912 if err != nil {
913 t.Fatalf("stream.Recv() failed: %v", err)
914 }
915 if response.Status != wantStatus {
916 t.Fatalf("got servingStatus %v, want %v", response.Status, wantStatus)
917 }
918 }
919
920
921
922 func (s) TestHealthCheckSuccess(t *testing.T) {
923 for _, e := range listTestEnv() {
924 testHealthCheckSuccess(t, e)
925 }
926 }
927
928 func testHealthCheckSuccess(t *testing.T, e env) {
929 te := newTest(t, e)
930 te.enableHealthServer = true
931 te.startServer(&testServer{security: e.security})
932 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
933 defer te.tearDown()
934
935 verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.OK)
936 }
937
938
939
940 func (s) TestHealthCheckFailure(t *testing.T) {
941 for _, e := range listTestEnv() {
942 testHealthCheckFailure(t, e)
943 }
944 }
945
946 func testHealthCheckFailure(t *testing.T, e env) {
947 te := newTest(t, e)
948 te.declareLogNoise(
949 "Failed to dial ",
950 "grpc: the client connection is closing; please retry",
951 )
952 te.enableHealthServer = true
953 te.startServer(&testServer{security: e.security})
954 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
955 defer te.tearDown()
956
957 verifyHealthCheckErrCode(t, 0*time.Second, te.clientConn(), defaultHealthService, codes.DeadlineExceeded)
958 awaitNewConnLogOutput()
959 }
960
961
962
963
964 func (s) TestHealthCheckOff(t *testing.T) {
965 for _, e := range listTestEnv() {
966
967 if e.name == "handler-tls" {
968 continue
969 }
970 testHealthCheckOff(t, e)
971 }
972 }
973
974 func testHealthCheckOff(t *testing.T, e env) {
975 te := newTest(t, e)
976 te.enableHealthServer = true
977 te.startServer(&testServer{security: e.security})
978 defer te.tearDown()
979
980 verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.NotFound)
981 }
982
983
984
985 func (s) TestHealthWatchMultipleClients(t *testing.T) {
986 for _, e := range listTestEnv() {
987 testHealthWatchMultipleClients(t, e)
988 }
989 }
990
991 func testHealthWatchMultipleClients(t *testing.T, e env) {
992 te := newTest(t, e)
993 te.enableHealthServer = true
994 te.startServer(&testServer{security: e.security})
995 defer te.tearDown()
996
997 cc := te.clientConn()
998 stream1, cf1 := newHealthCheckStream(t, cc, defaultHealthService)
999 defer cf1()
1000 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
1001
1002 stream2, cf2 := newHealthCheckStream(t, cc, defaultHealthService)
1003 defer cf2()
1004 healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
1005
1006 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
1007 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING)
1008 healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING)
1009 }
1010
1011
1012
1013
1014 func (s) TestHealthWatchSameStatus(t *testing.T) {
1015 for _, e := range listTestEnv() {
1016 testHealthWatchSameStatus(t, e)
1017 }
1018 }
1019
1020 func testHealthWatchSameStatus(t *testing.T, e env) {
1021 te := newTest(t, e)
1022 te.enableHealthServer = true
1023 te.startServer(&testServer{security: e.security})
1024 defer te.tearDown()
1025
1026 stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
1027 defer cf()
1028
1029 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
1030 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
1031 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
1032 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
1033 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
1034 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING)
1035 }
1036
1037
1038
1039
1040 func (s) TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) {
1041 for _, e := range listTestEnv() {
1042 testHealthWatchSetServiceStatusBeforeStartingServer(t, e)
1043 }
1044 }
1045
1046 func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) {
1047 hs := health.NewServer()
1048 te := newTest(t, e)
1049 te.healthServer = hs
1050 hs.SetServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
1051 te.startServer(&testServer{security: e.security})
1052 defer te.tearDown()
1053
1054 stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
1055 defer cf()
1056 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
1057 }
1058
1059
1060
1061
1062
1063 func (s) TestHealthWatchDefaultStatusChange(t *testing.T) {
1064 for _, e := range listTestEnv() {
1065 testHealthWatchDefaultStatusChange(t, e)
1066 }
1067 }
1068
1069 func testHealthWatchDefaultStatusChange(t *testing.T, e env) {
1070 te := newTest(t, e)
1071 te.enableHealthServer = true
1072 te.startServer(&testServer{security: e.security})
1073 defer te.tearDown()
1074
1075 stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
1076 defer cf()
1077 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
1078 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
1079 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
1080 }
1081
1082
1083
1084 func (s) TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) {
1085 for _, e := range listTestEnv() {
1086 testHealthWatchSetServiceStatusBeforeClientCallsWatch(t, e)
1087 }
1088 }
1089
1090 func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) {
1091 te := newTest(t, e)
1092 te.enableHealthServer = true
1093 te.startServer(&testServer{security: e.security})
1094 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
1095 defer te.tearDown()
1096
1097 stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
1098 defer cf()
1099 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
1100 }
1101
1102
1103
1104 func (s) TestHealthWatchOverallServerHealthChange(t *testing.T) {
1105 for _, e := range listTestEnv() {
1106 testHealthWatchOverallServerHealthChange(t, e)
1107 }
1108 }
1109
1110 func testHealthWatchOverallServerHealthChange(t *testing.T, e env) {
1111 te := newTest(t, e)
1112 te.enableHealthServer = true
1113 te.startServer(&testServer{security: e.security})
1114 defer te.tearDown()
1115
1116 stream, cf := newHealthCheckStream(t, te.clientConn(), "")
1117 defer cf()
1118 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
1119 te.setHealthServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
1120 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING)
1121 }
1122
1123
1124
1125
1126 func (s) TestUnknownHandler(t *testing.T) {
1127
1128
1129
1130 unknownHandler := func(srv any, stream grpc.ServerStream) error {
1131 return status.Error(codes.Unauthenticated, "user unauthenticated")
1132 }
1133 for _, e := range listTestEnv() {
1134
1135 if e.name == "handler-tls" {
1136 continue
1137 }
1138 testUnknownHandler(t, e, unknownHandler)
1139 }
1140 }
1141
1142 func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) {
1143 te := newTest(t, e)
1144 te.unknownHandler = unknownHandler
1145 te.startServer(&testServer{security: e.security})
1146 defer te.tearDown()
1147 verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), "", codes.Unauthenticated)
1148 }
1149
1150
1151
1152 func (s) TestHealthCheckServingStatus(t *testing.T) {
1153 for _, e := range listTestEnv() {
1154 testHealthCheckServingStatus(t, e)
1155 }
1156 }
1157
1158 func testHealthCheckServingStatus(t *testing.T, e env) {
1159 te := newTest(t, e)
1160 te.enableHealthServer = true
1161 te.startServer(&testServer{security: e.security})
1162 defer te.tearDown()
1163
1164 cc := te.clientConn()
1165 verifyHealthCheckStatus(t, 1*time.Second, cc, "", healthpb.HealthCheckResponse_SERVING)
1166 verifyHealthCheckErrCode(t, 1*time.Second, cc, defaultHealthService, codes.NotFound)
1167 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
1168 verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_SERVING)
1169 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
1170 verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
1171 }
1172
View as plain text