...

Source file src/google.golang.org/grpc/test/healthcheck_test.go

Documentation: google.golang.org/grpc/test

     1  /*
     2   *
     3   * Copyright 2018 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package test
    20  
    21  import (
    22  	"context"
    23  	"errors"
    24  	"fmt"
    25  	"net"
    26  	"sync"
    27  	"testing"
    28  	"time"
    29  
    30  	"google.golang.org/grpc"
    31  	"google.golang.org/grpc/codes"
    32  	"google.golang.org/grpc/connectivity"
    33  	"google.golang.org/grpc/credentials/insecure"
    34  	"google.golang.org/grpc/health"
    35  	"google.golang.org/grpc/internal"
    36  	"google.golang.org/grpc/internal/channelz"
    37  	"google.golang.org/grpc/internal/grpctest"
    38  	"google.golang.org/grpc/internal/testutils"
    39  	"google.golang.org/grpc/resolver"
    40  	"google.golang.org/grpc/resolver/manual"
    41  	"google.golang.org/grpc/status"
    42  
    43  	healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
    44  	healthpb "google.golang.org/grpc/health/grpc_health_v1"
    45  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    46  	testpb "google.golang.org/grpc/interop/grpc_testing"
    47  )
    48  
    49  var testHealthCheckFunc = internal.HealthCheckFunc
    50  
    51  func newTestHealthServer() *testHealthServer {
    52  	return newTestHealthServerWithWatchFunc(defaultWatchFunc)
    53  }
    54  
    55  func newTestHealthServerWithWatchFunc(f healthWatchFunc) *testHealthServer {
    56  	return &testHealthServer{
    57  		watchFunc: f,
    58  		update:    make(chan struct{}, 1),
    59  		status:    make(map[string]healthpb.HealthCheckResponse_ServingStatus),
    60  	}
    61  }
    62  
    63  // defaultWatchFunc will send a HealthCheckResponse to the client whenever SetServingStatus is called.
    64  func defaultWatchFunc(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
    65  	if in.Service != "foo" {
    66  		return status.Error(codes.FailedPrecondition,
    67  			"the defaultWatchFunc only handles request with service name to be \"foo\"")
    68  	}
    69  	var done bool
    70  	for {
    71  		select {
    72  		case <-stream.Context().Done():
    73  			done = true
    74  		case <-s.update:
    75  		}
    76  		if done {
    77  			break
    78  		}
    79  		s.mu.Lock()
    80  		resp := &healthpb.HealthCheckResponse{
    81  			Status: s.status[in.Service],
    82  		}
    83  		s.mu.Unlock()
    84  		stream.SendMsg(resp)
    85  	}
    86  	return nil
    87  }
    88  
    89  type healthWatchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
    90  
    91  type testHealthServer struct {
    92  	healthgrpc.UnimplementedHealthServer
    93  	watchFunc healthWatchFunc
    94  	mu        sync.Mutex
    95  	status    map[string]healthpb.HealthCheckResponse_ServingStatus
    96  	update    chan struct{}
    97  }
    98  
    99  func (s *testHealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
   100  	return &healthpb.HealthCheckResponse{
   101  		Status: healthpb.HealthCheckResponse_SERVING,
   102  	}, nil
   103  }
   104  
   105  func (s *testHealthServer) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
   106  	return s.watchFunc(s, in, stream)
   107  }
   108  
   109  // SetServingStatus is called when need to reset the serving status of a service
   110  // or insert a new service entry into the statusMap.
   111  func (s *testHealthServer) SetServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) {
   112  	s.mu.Lock()
   113  	s.status[service] = status
   114  	select {
   115  	case <-s.update:
   116  	default:
   117  	}
   118  	s.update <- struct{}{}
   119  	s.mu.Unlock()
   120  }
   121  
   122  func setupHealthCheckWrapper() (hcEnterChan chan struct{}, hcExitChan chan struct{}, wrapper internal.HealthChecker) {
   123  	hcEnterChan = make(chan struct{})
   124  	hcExitChan = make(chan struct{})
   125  	wrapper = func(ctx context.Context, newStream func(string) (any, error), update func(connectivity.State, error), service string) error {
   126  		close(hcEnterChan)
   127  		defer close(hcExitChan)
   128  		return testHealthCheckFunc(ctx, newStream, update, service)
   129  	}
   130  	return
   131  }
   132  
   133  func setupServer(t *testing.T, watchFunc healthWatchFunc) (*grpc.Server, net.Listener, *testHealthServer) {
   134  	t.Helper()
   135  
   136  	lis, err := net.Listen("tcp", "localhost:0")
   137  	if err != nil {
   138  		t.Fatalf("net.Listen() failed: %v", err)
   139  	}
   140  
   141  	var ts *testHealthServer
   142  	if watchFunc != nil {
   143  		ts = newTestHealthServerWithWatchFunc(watchFunc)
   144  	} else {
   145  		ts = newTestHealthServer()
   146  	}
   147  	s := grpc.NewServer()
   148  	healthgrpc.RegisterHealthServer(s, ts)
   149  	testgrpc.RegisterTestServiceServer(s, &testServer{})
   150  	go s.Serve(lis)
   151  	t.Cleanup(func() { s.Stop() })
   152  	return s, lis, ts
   153  }
   154  
   155  type clientConfig struct {
   156  	balancerName               string
   157  	testHealthCheckFuncWrapper internal.HealthChecker
   158  	extraDialOption            []grpc.DialOption
   159  }
   160  
   161  func setupClient(t *testing.T, c *clientConfig) (*grpc.ClientConn, *manual.Resolver) {
   162  	t.Helper()
   163  
   164  	r := manual.NewBuilderWithScheme("whatever")
   165  	opts := []grpc.DialOption{
   166  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   167  		grpc.WithResolvers(r),
   168  	}
   169  	if c != nil {
   170  		if c.balancerName != "" {
   171  			opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, c.balancerName)))
   172  		}
   173  		if c.testHealthCheckFuncWrapper != nil {
   174  			opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper))
   175  		}
   176  		opts = append(opts, c.extraDialOption...)
   177  	}
   178  
   179  	cc, err := grpc.Dial(r.Scheme()+":///test.server", opts...)
   180  	if err != nil {
   181  		t.Fatalf("grpc.Dial() failed: %v", err)
   182  	}
   183  	t.Cleanup(func() { cc.Close() })
   184  	return cc, r
   185  }
   186  
   187  func (s) TestHealthCheckWatchStateChange(t *testing.T) {
   188  	_, lis, ts := setupServer(t, nil)
   189  
   190  	// The table below shows the expected series of addrConn connectivity transitions when server
   191  	// updates its health status. As there's only one addrConn corresponds with the ClientConn in this
   192  	// test, we use ClientConn's connectivity state as the addrConn connectivity state.
   193  	//+------------------------------+-------------------------------------------+
   194  	//| Health Check Returned Status | Expected addrConn Connectivity Transition |
   195  	//+------------------------------+-------------------------------------------+
   196  	//| NOT_SERVING                  | ->TRANSIENT FAILURE                       |
   197  	//| SERVING                      | ->READY                                   |
   198  	//| SERVICE_UNKNOWN              | ->TRANSIENT FAILURE                       |
   199  	//| SERVING                      | ->READY                                   |
   200  	//| UNKNOWN                      | ->TRANSIENT FAILURE                       |
   201  	//+------------------------------+-------------------------------------------+
   202  	ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING)
   203  
   204  	cc, r := setupClient(t, nil)
   205  	r.UpdateState(resolver.State{
   206  		Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
   207  		ServiceConfig: parseServiceConfig(t, r, `{
   208  	"healthCheckConfig": {
   209  		"serviceName": "foo"
   210  	},
   211  	"loadBalancingConfig": [{"round_robin":{}}]
   212  }`)})
   213  
   214  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   215  	defer cancel()
   216  	testutils.AwaitNotState(ctx, t, cc, connectivity.Idle)
   217  	testutils.AwaitNotState(ctx, t, cc, connectivity.Connecting)
   218  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   219  	if s := cc.GetState(); s != connectivity.TransientFailure {
   220  		t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
   221  	}
   222  
   223  	ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
   224  	testutils.AwaitNotState(ctx, t, cc, connectivity.TransientFailure)
   225  	if s := cc.GetState(); s != connectivity.Ready {
   226  		t.Fatalf("ClientConn is in %v state, want READY", s)
   227  	}
   228  
   229  	ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
   230  	testutils.AwaitNotState(ctx, t, cc, connectivity.Ready)
   231  	if s := cc.GetState(); s != connectivity.TransientFailure {
   232  		t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
   233  	}
   234  
   235  	ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
   236  	testutils.AwaitNotState(ctx, t, cc, connectivity.TransientFailure)
   237  	if s := cc.GetState(); s != connectivity.Ready {
   238  		t.Fatalf("ClientConn is in %v state, want READY", s)
   239  	}
   240  
   241  	ts.SetServingStatus("foo", healthpb.HealthCheckResponse_UNKNOWN)
   242  	testutils.AwaitNotState(ctx, t, cc, connectivity.Ready)
   243  	if s := cc.GetState(); s != connectivity.TransientFailure {
   244  		t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
   245  	}
   246  }
   247  
   248  // If Watch returns Unimplemented, then the ClientConn should go into READY state.
   249  func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) {
   250  	grpctest.TLogger.ExpectError("Subchannel health check is unimplemented at server side, thus health check is disabled")
   251  	s := grpc.NewServer()
   252  	lis, err := net.Listen("tcp", "localhost:0")
   253  	if err != nil {
   254  		t.Fatalf("failed to listen due to err: %v", err)
   255  	}
   256  	go s.Serve(lis)
   257  	defer s.Stop()
   258  
   259  	cc, r := setupClient(t, nil)
   260  	r.UpdateState(resolver.State{
   261  		Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
   262  		ServiceConfig: parseServiceConfig(t, r, `{
   263  	"healthCheckConfig": {
   264  		"serviceName": "foo"
   265  	},
   266  	"loadBalancingConfig": [{"round_robin":{}}]
   267  }`)})
   268  
   269  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   270  	defer cancel()
   271  	testutils.AwaitNotState(ctx, t, cc, connectivity.Idle)
   272  	testutils.AwaitNotState(ctx, t, cc, connectivity.Connecting)
   273  	if s := cc.GetState(); s != connectivity.Ready {
   274  		t.Fatalf("ClientConn is in %v state, want READY", s)
   275  	}
   276  }
   277  
   278  // In the case of a goaway received, the health check stream should be terminated and health check
   279  // function should exit.
   280  func (s) TestHealthCheckWithGoAway(t *testing.T) {
   281  	s, lis, ts := setupServer(t, nil)
   282  	ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
   283  
   284  	hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
   285  	cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
   286  	tc := testgrpc.NewTestServiceClient(cc)
   287  	r.UpdateState(resolver.State{
   288  		Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
   289  		ServiceConfig: parseServiceConfig(t, r, `{
   290  	"healthCheckConfig": {
   291  		"serviceName": "foo"
   292  	},
   293  	"loadBalancingConfig": [{"round_robin":{}}]
   294  }`)})
   295  
   296  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   297  	defer cancel()
   298  	// make some rpcs to make sure connection is working.
   299  	if err := verifyResultWithDelay(func() (bool, error) {
   300  		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   301  			return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   302  		}
   303  		return true, nil
   304  	}); err != nil {
   305  		t.Fatal(err)
   306  	}
   307  
   308  	// the stream rpc will persist through goaway event.
   309  	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
   310  	if err != nil {
   311  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
   312  	}
   313  	respParam := []*testpb.ResponseParameters{{Size: 1}}
   314  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
   315  	if err != nil {
   316  		t.Fatal(err)
   317  	}
   318  	req := &testpb.StreamingOutputCallRequest{
   319  		ResponseParameters: respParam,
   320  		Payload:            payload,
   321  	}
   322  	if err := stream.Send(req); err != nil {
   323  		t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
   324  	}
   325  	if _, err := stream.Recv(); err != nil {
   326  		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
   327  	}
   328  
   329  	select {
   330  	case <-hcExitChan:
   331  		t.Fatal("Health check function has exited, which is not expected.")
   332  	default:
   333  	}
   334  
   335  	// server sends GoAway
   336  	go s.GracefulStop()
   337  
   338  	select {
   339  	case <-hcExitChan:
   340  	case <-time.After(5 * time.Second):
   341  		select {
   342  		case <-hcEnterChan:
   343  		default:
   344  			t.Fatal("Health check function has not entered after 5s.")
   345  		}
   346  		t.Fatal("Health check function has not exited after 5s.")
   347  	}
   348  
   349  	// The existing RPC should be still good to proceed.
   350  	if err := stream.Send(req); err != nil {
   351  		t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
   352  	}
   353  	if _, err := stream.Recv(); err != nil {
   354  		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
   355  	}
   356  }
   357  
   358  func (s) TestHealthCheckWithConnClose(t *testing.T) {
   359  	s, lis, ts := setupServer(t, nil)
   360  	ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
   361  
   362  	hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
   363  	cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
   364  	tc := testgrpc.NewTestServiceClient(cc)
   365  	r.UpdateState(resolver.State{
   366  		Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
   367  		ServiceConfig: parseServiceConfig(t, r, `{
   368  	"healthCheckConfig": {
   369  		"serviceName": "foo"
   370  	},
   371  	"loadBalancingConfig": [{"round_robin":{}}]
   372  }`)})
   373  
   374  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   375  	defer cancel()
   376  	// make some rpcs to make sure connection is working.
   377  	if err := verifyResultWithDelay(func() (bool, error) {
   378  		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   379  			return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   380  		}
   381  		return true, nil
   382  	}); err != nil {
   383  		t.Fatal(err)
   384  	}
   385  
   386  	select {
   387  	case <-hcExitChan:
   388  		t.Fatal("Health check function has exited, which is not expected.")
   389  	default:
   390  	}
   391  	// server closes the connection
   392  	s.Stop()
   393  
   394  	select {
   395  	case <-hcExitChan:
   396  	case <-time.After(5 * time.Second):
   397  		select {
   398  		case <-hcEnterChan:
   399  		default:
   400  			t.Fatal("Health check function has not entered after 5s.")
   401  		}
   402  		t.Fatal("Health check function has not exited after 5s.")
   403  	}
   404  }
   405  
   406  // addrConn drain happens when addrConn gets torn down due to its address being no longer in the
   407  // address list returned by the resolver.
   408  func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
   409  	_, lis, ts := setupServer(t, nil)
   410  	ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
   411  
   412  	hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
   413  	cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
   414  	tc := testgrpc.NewTestServiceClient(cc)
   415  	sc := parseServiceConfig(t, r, `{
   416  	"healthCheckConfig": {
   417  		"serviceName": "foo"
   418  	},
   419  	"loadBalancingConfig": [{"round_robin":{}}]
   420  }`)
   421  	r.UpdateState(resolver.State{
   422  		Addresses:     []resolver.Address{{Addr: lis.Addr().String()}},
   423  		ServiceConfig: sc,
   424  	})
   425  
   426  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   427  	defer cancel()
   428  	// make some rpcs to make sure connection is working.
   429  	if err := verifyResultWithDelay(func() (bool, error) {
   430  		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   431  			return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   432  		}
   433  		return true, nil
   434  	}); err != nil {
   435  		t.Fatal(err)
   436  	}
   437  
   438  	// the stream rpc will persist through goaway event.
   439  	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
   440  	if err != nil {
   441  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
   442  	}
   443  	respParam := []*testpb.ResponseParameters{{Size: 1}}
   444  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
   445  	if err != nil {
   446  		t.Fatal(err)
   447  	}
   448  	req := &testpb.StreamingOutputCallRequest{
   449  		ResponseParameters: respParam,
   450  		Payload:            payload,
   451  	}
   452  	if err := stream.Send(req); err != nil {
   453  		t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
   454  	}
   455  	if _, err := stream.Recv(); err != nil {
   456  		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
   457  	}
   458  
   459  	select {
   460  	case <-hcExitChan:
   461  		t.Fatal("Health check function has exited, which is not expected.")
   462  	default:
   463  	}
   464  	// trigger teardown of the ac
   465  	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})
   466  
   467  	select {
   468  	case <-hcExitChan:
   469  	case <-time.After(5 * time.Second):
   470  		select {
   471  		case <-hcEnterChan:
   472  		default:
   473  			t.Fatal("Health check function has not entered after 5s.")
   474  		}
   475  		t.Fatal("Health check function has not exited after 5s.")
   476  	}
   477  
   478  	// The existing RPC should be still good to proceed.
   479  	if err := stream.Send(req); err != nil {
   480  		t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
   481  	}
   482  	if _, err := stream.Recv(); err != nil {
   483  		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
   484  	}
   485  }
   486  
   487  // ClientConn close will lead to its addrConns being torn down.
   488  func (s) TestHealthCheckWithClientConnClose(t *testing.T) {
   489  	_, lis, ts := setupServer(t, nil)
   490  	ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
   491  
   492  	hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
   493  	cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
   494  	tc := testgrpc.NewTestServiceClient(cc)
   495  	r.UpdateState(resolver.State{
   496  		Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
   497  		ServiceConfig: parseServiceConfig(t, r, `{
   498  	"healthCheckConfig": {
   499  		"serviceName": "foo"
   500  	},
   501  	"loadBalancingConfig": [{"round_robin":{}}]
   502  }`)})
   503  
   504  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   505  	defer cancel()
   506  	// make some rpcs to make sure connection is working.
   507  	if err := verifyResultWithDelay(func() (bool, error) {
   508  		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   509  			return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   510  		}
   511  		return true, nil
   512  	}); err != nil {
   513  		t.Fatal(err)
   514  	}
   515  
   516  	select {
   517  	case <-hcExitChan:
   518  		t.Fatal("Health check function has exited, which is not expected.")
   519  	default:
   520  	}
   521  
   522  	// trigger addrConn teardown
   523  	cc.Close()
   524  
   525  	select {
   526  	case <-hcExitChan:
   527  	case <-time.After(5 * time.Second):
   528  		select {
   529  		case <-hcEnterChan:
   530  		default:
   531  			t.Fatal("Health check function has not entered after 5s.")
   532  		}
   533  		t.Fatal("Health check function has not exited after 5s.")
   534  	}
   535  }
   536  
   537  // This test is to test the logic in the createTransport after the health check function returns which
   538  // closes the skipReset channel(since it has not been closed inside health check func) to unblock
   539  // onGoAway/onClose goroutine.
   540  func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *testing.T) {
   541  	watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
   542  		if in.Service != "delay" {
   543  			return status.Error(codes.FailedPrecondition,
   544  				"this special Watch function only handles request with service name to be \"delay\"")
   545  		}
   546  		// Do nothing to mock a delay of health check response from server side.
   547  		// This case is to help with the test that covers the condition that setConnectivityState is not
   548  		// called inside HealthCheckFunc before the func returns.
   549  		select {
   550  		case <-stream.Context().Done():
   551  		case <-time.After(5 * time.Second):
   552  		}
   553  		return nil
   554  	}
   555  	_, lis, ts := setupServer(t, watchFunc)
   556  	ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
   557  
   558  	hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
   559  	_, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
   560  
   561  	// The serviceName "delay" is specially handled at server side, where response will not be sent
   562  	// back to client immediately upon receiving the request (client should receive no response until
   563  	// test ends).
   564  	sc := parseServiceConfig(t, r, `{
   565  	"healthCheckConfig": {
   566  		"serviceName": "delay"
   567  	},
   568  	"loadBalancingConfig": [{"round_robin":{}}]
   569  }`)
   570  	r.UpdateState(resolver.State{
   571  		Addresses:     []resolver.Address{{Addr: lis.Addr().String()}},
   572  		ServiceConfig: sc,
   573  	})
   574  
   575  	select {
   576  	case <-hcExitChan:
   577  		t.Fatal("Health check function has exited, which is not expected.")
   578  	default:
   579  	}
   580  
   581  	select {
   582  	case <-hcEnterChan:
   583  	case <-time.After(5 * time.Second):
   584  		t.Fatal("Health check function has not been invoked after 5s.")
   585  	}
   586  	// trigger teardown of the ac, ac in SHUTDOWN state
   587  	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})
   588  
   589  	// The health check func should exit without calling the setConnectivityState func, as server hasn't sent
   590  	// any response.
   591  	select {
   592  	case <-hcExitChan:
   593  	case <-time.After(5 * time.Second):
   594  		t.Fatal("Health check function has not exited after 5s.")
   595  	}
   596  	// The deferred leakcheck will check whether there's leaked goroutine, which is an indication
   597  	// whether we closes the skipReset channel to unblock onGoAway/onClose goroutine.
   598  }
   599  
   600  // This test is to test the logic in the createTransport after the health check function returns which
   601  // closes the allowedToReset channel(since it has not been closed inside health check func) to unblock
   602  // onGoAway/onClose goroutine.
   603  func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) {
   604  	watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
   605  		if in.Service != "delay" {
   606  			return status.Error(codes.FailedPrecondition,
   607  				"this special Watch function only handles request with service name to be \"delay\"")
   608  		}
   609  		// Do nothing to mock a delay of health check response from server side.
   610  		// This case is to help with the test that covers the condition that setConnectivityState is not
   611  		// called inside HealthCheckFunc before the func returns.
   612  		select {
   613  		case <-stream.Context().Done():
   614  		case <-time.After(5 * time.Second):
   615  		}
   616  		return nil
   617  	}
   618  	s, lis, ts := setupServer(t, watchFunc)
   619  	ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
   620  
   621  	hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
   622  	_, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
   623  
   624  	// The serviceName "delay" is specially handled at server side, where response will not be sent
   625  	// back to client immediately upon receiving the request (client should receive no response until
   626  	// test ends).
   627  	r.UpdateState(resolver.State{
   628  		Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
   629  		ServiceConfig: parseServiceConfig(t, r, `{
   630  	"healthCheckConfig": {
   631  		"serviceName": "delay"
   632  	},
   633  	"loadBalancingConfig": [{"round_robin":{}}]
   634  }`)})
   635  
   636  	select {
   637  	case <-hcExitChan:
   638  		t.Fatal("Health check function has exited, which is not expected.")
   639  	default:
   640  	}
   641  
   642  	select {
   643  	case <-hcEnterChan:
   644  	case <-time.After(5 * time.Second):
   645  		t.Fatal("Health check function has not been invoked after 5s.")
   646  	}
   647  	// trigger transport being closed
   648  	s.Stop()
   649  
   650  	// The health check func should exit without calling the setConnectivityState func, as server hasn't sent
   651  	// any response.
   652  	select {
   653  	case <-hcExitChan:
   654  	case <-time.After(5 * time.Second):
   655  		t.Fatal("Health check function has not exited after 5s.")
   656  	}
   657  	// The deferred leakcheck will check whether there's leaked goroutine, which is an indication
   658  	// whether we closes the allowedToReset channel to unblock onGoAway/onClose goroutine.
   659  }
   660  
   661  func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
   662  	hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
   663  	cc, r := setupClient(t, &clientConfig{
   664  		testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
   665  		extraDialOption:            []grpc.DialOption{grpc.WithDisableHealthCheck()},
   666  	})
   667  	tc := testgrpc.NewTestServiceClient(cc)
   668  	r.UpdateState(resolver.State{
   669  		Addresses: []resolver.Address{{Addr: addr}},
   670  		ServiceConfig: parseServiceConfig(t, r, `{
   671  	"healthCheckConfig": {
   672  		"serviceName": "foo"
   673  	},
   674  	"loadBalancingConfig": [{"round_robin":{}}]
   675  }`)})
   676  
   677  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   678  	defer cancel()
   679  	// send some rpcs to make sure transport has been created and is ready for use.
   680  	if err := verifyResultWithDelay(func() (bool, error) {
   681  		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   682  			return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   683  		}
   684  		return true, nil
   685  	}); err != nil {
   686  		t.Fatal(err)
   687  	}
   688  
   689  	select {
   690  	case <-hcEnterChan:
   691  		t.Fatal("Health check function has exited, which is not expected.")
   692  	default:
   693  	}
   694  }
   695  
   696  func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
   697  	hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
   698  	cc, r := setupClient(t, &clientConfig{
   699  		testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
   700  	})
   701  	tc := testgrpc.NewTestServiceClient(cc)
   702  	r.UpdateState(resolver.State{
   703  		Addresses: []resolver.Address{{Addr: addr}},
   704  		ServiceConfig: parseServiceConfig(t, r, `{
   705  	"healthCheckConfig": {
   706  		"serviceName": "foo"
   707  	},
   708  	"loadBalancingConfig": [{"pick_first":{}}]
   709  }`)})
   710  
   711  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   712  	defer cancel()
   713  	// send some rpcs to make sure transport has been created and is ready for use.
   714  	if err := verifyResultWithDelay(func() (bool, error) {
   715  		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   716  			return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   717  		}
   718  		return true, nil
   719  	}); err != nil {
   720  		t.Fatal(err)
   721  	}
   722  
   723  	select {
   724  	case <-hcEnterChan:
   725  		t.Fatal("Health check function has started, which is not expected.")
   726  	default:
   727  	}
   728  }
   729  
   730  func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
   731  	hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
   732  	cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
   733  	tc := testgrpc.NewTestServiceClient(cc)
   734  	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: addr}}})
   735  
   736  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   737  	defer cancel()
   738  	// send some rpcs to make sure transport has been created and is ready for use.
   739  	if err := verifyResultWithDelay(func() (bool, error) {
   740  		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   741  			return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   742  		}
   743  		return true, nil
   744  	}); err != nil {
   745  		t.Fatal(err)
   746  	}
   747  
   748  	select {
   749  	case <-hcEnterChan:
   750  		t.Fatal("Health check function has started, which is not expected.")
   751  	default:
   752  	}
   753  }
   754  
   755  func (s) TestHealthCheckDisable(t *testing.T) {
   756  	_, lis, ts := setupServer(t, nil)
   757  	ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
   758  
   759  	// test client side disabling configuration.
   760  	testHealthCheckDisableWithDialOption(t, lis.Addr().String())
   761  	testHealthCheckDisableWithBalancer(t, lis.Addr().String())
   762  	testHealthCheckDisableWithServiceConfig(t, lis.Addr().String())
   763  }
   764  
   765  func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
   766  	watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
   767  		if in.Service != "channelzSuccess" {
   768  			return status.Error(codes.FailedPrecondition,
   769  				"this special Watch function only handles request with service name to be \"channelzSuccess\"")
   770  		}
   771  		return status.Error(codes.OK, "fake success")
   772  	}
   773  	_, lis, _ := setupServer(t, watchFunc)
   774  
   775  	_, r := setupClient(t, nil)
   776  	r.UpdateState(resolver.State{
   777  		Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
   778  		ServiceConfig: parseServiceConfig(t, r, `{
   779  	"healthCheckConfig": {
   780  		"serviceName": "channelzSuccess"
   781  	},
   782  	"loadBalancingConfig": [{"round_robin":{}}]
   783  }`)})
   784  
   785  	if err := verifyResultWithDelay(func() (bool, error) {
   786  		cm, _ := channelz.GetTopChannels(0, 0)
   787  		if len(cm) == 0 {
   788  			return false, errors.New("channelz.GetTopChannels return 0 top channel")
   789  		}
   790  		subChans := cm[0].SubChans()
   791  		if len(subChans) == 0 {
   792  			return false, errors.New("there is 0 subchannel")
   793  		}
   794  		var id int64
   795  		for k := range subChans {
   796  			id = k
   797  			break
   798  		}
   799  		scm := channelz.GetSubChannel(id)
   800  		if scm == nil {
   801  			return false, errors.New("nil subchannel returned")
   802  		}
   803  		// exponential backoff retry may result in more than one health check call.
   804  		cstart, csucc, cfail := scm.ChannelMetrics.CallsStarted.Load(), scm.ChannelMetrics.CallsSucceeded.Load(), scm.ChannelMetrics.CallsFailed.Load()
   805  		if cstart > 0 && csucc > 0 && cfail == 0 {
   806  			return true, nil
   807  		}
   808  		return false, fmt.Errorf("got %d CallsStarted, %d CallsSucceeded %d CallsFailed, want >0 >0 =0", cstart, csucc, cfail)
   809  	}); err != nil {
   810  		t.Fatal(err)
   811  	}
   812  }
   813  
   814  func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
   815  	watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
   816  		if in.Service != "channelzFailure" {
   817  			return status.Error(codes.FailedPrecondition,
   818  				"this special Watch function only handles request with service name to be \"channelzFailure\"")
   819  		}
   820  		return status.Error(codes.Internal, "fake failure")
   821  	}
   822  	_, lis, _ := setupServer(t, watchFunc)
   823  
   824  	_, r := setupClient(t, nil)
   825  	r.UpdateState(resolver.State{
   826  		Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
   827  		ServiceConfig: parseServiceConfig(t, r, `{
   828  	"healthCheckConfig": {
   829  		"serviceName": "channelzFailure"
   830  	},
   831  	"loadBalancingConfig": [{"round_robin":{}}]
   832  }`)})
   833  
   834  	if err := verifyResultWithDelay(func() (bool, error) {
   835  		cm, _ := channelz.GetTopChannels(0, 0)
   836  		if len(cm) == 0 {
   837  			return false, errors.New("channelz.GetTopChannels return 0 top channel")
   838  		}
   839  		subChans := cm[0].SubChans()
   840  		if len(subChans) == 0 {
   841  			return false, errors.New("there is 0 subchannel")
   842  		}
   843  		var id int64
   844  		for k := range subChans {
   845  			id = k
   846  			break
   847  		}
   848  		scm := channelz.GetSubChannel(id)
   849  		if scm == nil {
   850  			return false, errors.New("nil subchannel returned")
   851  		}
   852  		// exponential backoff retry may result in more than one health check call.
   853  		cstart, cfail, csucc := scm.ChannelMetrics.CallsStarted.Load(), scm.ChannelMetrics.CallsFailed.Load(), scm.ChannelMetrics.CallsSucceeded.Load()
   854  		if cstart > 0 && cfail > 0 && csucc == 0 {
   855  			return true, nil
   856  		}
   857  		return false, fmt.Errorf("got %d CallsStarted, %d CallsFailed, %d CallsSucceeded, want >0, >0", cstart, cfail, csucc)
   858  	}); err != nil {
   859  		t.Fatal(err)
   860  	}
   861  }
   862  
   863  // healthCheck is a helper function to make a unary health check RPC and return
   864  // the response.
   865  func healthCheck(d time.Duration, cc *grpc.ClientConn, service string) (*healthpb.HealthCheckResponse, error) {
   866  	ctx, cancel := context.WithTimeout(context.Background(), d)
   867  	defer cancel()
   868  	hc := healthgrpc.NewHealthClient(cc)
   869  	return hc.Check(ctx, &healthpb.HealthCheckRequest{Service: service})
   870  }
   871  
   872  // verifyHealthCheckStatus is a helper function to verify that the current
   873  // health status of the service matches the one passed in 'wantStatus'.
   874  func verifyHealthCheckStatus(t *testing.T, d time.Duration, cc *grpc.ClientConn, service string, wantStatus healthpb.HealthCheckResponse_ServingStatus) {
   875  	t.Helper()
   876  	resp, err := healthCheck(d, cc, service)
   877  	if err != nil {
   878  		t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
   879  	}
   880  	if resp.Status != wantStatus {
   881  		t.Fatalf("Got the serving status %v, want %v", resp.Status, wantStatus)
   882  	}
   883  }
   884  
   885  // verifyHealthCheckErrCode is a helper function to verify that a unary health
   886  // check RPC returns an error with a code set to 'wantCode'.
   887  func verifyHealthCheckErrCode(t *testing.T, d time.Duration, cc *grpc.ClientConn, service string, wantCode codes.Code) {
   888  	t.Helper()
   889  	if _, err := healthCheck(d, cc, service); status.Code(err) != wantCode {
   890  		t.Fatalf("Health/Check() got errCode %v, want %v", status.Code(err), wantCode)
   891  	}
   892  }
   893  
   894  // newHealthCheckStream is a helper function to start a health check streaming
   895  // RPC, and returns the stream.
   896  func newHealthCheckStream(t *testing.T, cc *grpc.ClientConn, service string) (healthgrpc.Health_WatchClient, context.CancelFunc) {
   897  	t.Helper()
   898  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   899  	hc := healthgrpc.NewHealthClient(cc)
   900  	stream, err := hc.Watch(ctx, &healthpb.HealthCheckRequest{Service: service})
   901  	if err != nil {
   902  		t.Fatalf("hc.Watch(_, %v) failed: %v", service, err)
   903  	}
   904  	return stream, cancel
   905  }
   906  
   907  // healthWatchChecker is a helper function to verify that the next health
   908  // status returned on the given stream matches the one passed in 'wantStatus'.
   909  func healthWatchChecker(t *testing.T, stream healthgrpc.Health_WatchClient, wantStatus healthpb.HealthCheckResponse_ServingStatus) {
   910  	t.Helper()
   911  	response, err := stream.Recv()
   912  	if err != nil {
   913  		t.Fatalf("stream.Recv() failed: %v", err)
   914  	}
   915  	if response.Status != wantStatus {
   916  		t.Fatalf("got servingStatus %v, want %v", response.Status, wantStatus)
   917  	}
   918  }
   919  
   920  // TestHealthCheckSuccess invokes the unary Check() RPC on the health server in
   921  // a successful case.
   922  func (s) TestHealthCheckSuccess(t *testing.T) {
   923  	for _, e := range listTestEnv() {
   924  		testHealthCheckSuccess(t, e)
   925  	}
   926  }
   927  
   928  func testHealthCheckSuccess(t *testing.T, e env) {
   929  	te := newTest(t, e)
   930  	te.enableHealthServer = true
   931  	te.startServer(&testServer{security: e.security})
   932  	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
   933  	defer te.tearDown()
   934  
   935  	verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.OK)
   936  }
   937  
   938  // TestHealthCheckFailure invokes the unary Check() RPC on the health server
   939  // with an expired context and expects the RPC to fail.
   940  func (s) TestHealthCheckFailure(t *testing.T) {
   941  	for _, e := range listTestEnv() {
   942  		testHealthCheckFailure(t, e)
   943  	}
   944  }
   945  
   946  func testHealthCheckFailure(t *testing.T, e env) {
   947  	te := newTest(t, e)
   948  	te.declareLogNoise(
   949  		"Failed to dial ",
   950  		"grpc: the client connection is closing; please retry",
   951  	)
   952  	te.enableHealthServer = true
   953  	te.startServer(&testServer{security: e.security})
   954  	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
   955  	defer te.tearDown()
   956  
   957  	verifyHealthCheckErrCode(t, 0*time.Second, te.clientConn(), defaultHealthService, codes.DeadlineExceeded)
   958  	awaitNewConnLogOutput()
   959  }
   960  
   961  // TestHealthCheckOff makes a unary Check() RPC on the health server where the
   962  // health status of the defaultHealthService is not set, and therefore expects
   963  // an error code 'codes.NotFound'.
   964  func (s) TestHealthCheckOff(t *testing.T) {
   965  	for _, e := range listTestEnv() {
   966  		// TODO(bradfitz): Temporarily skip this env due to #619.
   967  		if e.name == "handler-tls" {
   968  			continue
   969  		}
   970  		testHealthCheckOff(t, e)
   971  	}
   972  }
   973  
   974  func testHealthCheckOff(t *testing.T, e env) {
   975  	te := newTest(t, e)
   976  	te.enableHealthServer = true
   977  	te.startServer(&testServer{security: e.security})
   978  	defer te.tearDown()
   979  
   980  	verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.NotFound)
   981  }
   982  
   983  // TestHealthWatchMultipleClients makes a streaming Watch() RPC on the health
   984  // server with multiple clients and expects the same status on both streams.
   985  func (s) TestHealthWatchMultipleClients(t *testing.T) {
   986  	for _, e := range listTestEnv() {
   987  		testHealthWatchMultipleClients(t, e)
   988  	}
   989  }
   990  
   991  func testHealthWatchMultipleClients(t *testing.T, e env) {
   992  	te := newTest(t, e)
   993  	te.enableHealthServer = true
   994  	te.startServer(&testServer{security: e.security})
   995  	defer te.tearDown()
   996  
   997  	cc := te.clientConn()
   998  	stream1, cf1 := newHealthCheckStream(t, cc, defaultHealthService)
   999  	defer cf1()
  1000  	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
  1001  
  1002  	stream2, cf2 := newHealthCheckStream(t, cc, defaultHealthService)
  1003  	defer cf2()
  1004  	healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
  1005  
  1006  	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
  1007  	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING)
  1008  	healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING)
  1009  }
  1010  
  1011  // TestHealthWatchSameStatusmakes a streaming Watch() RPC on the health server
  1012  // and makes sure that the health status of the server is as expected after
  1013  // multiple calls to SetServingStatus with the same status.
  1014  func (s) TestHealthWatchSameStatus(t *testing.T) {
  1015  	for _, e := range listTestEnv() {
  1016  		testHealthWatchSameStatus(t, e)
  1017  	}
  1018  }
  1019  
  1020  func testHealthWatchSameStatus(t *testing.T, e env) {
  1021  	te := newTest(t, e)
  1022  	te.enableHealthServer = true
  1023  	te.startServer(&testServer{security: e.security})
  1024  	defer te.tearDown()
  1025  
  1026  	stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
  1027  	defer cf()
  1028  
  1029  	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
  1030  	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
  1031  	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
  1032  	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
  1033  	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
  1034  	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING)
  1035  }
  1036  
  1037  // TestHealthWatchServiceStatusSetBeforeStartingServer starts a health server
  1038  // on which the health status for the defaultService is set before the gRPC
  1039  // server is started, and expects the correct health status to be returned.
  1040  func (s) TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) {
  1041  	for _, e := range listTestEnv() {
  1042  		testHealthWatchSetServiceStatusBeforeStartingServer(t, e)
  1043  	}
  1044  }
  1045  
  1046  func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) {
  1047  	hs := health.NewServer()
  1048  	te := newTest(t, e)
  1049  	te.healthServer = hs
  1050  	hs.SetServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
  1051  	te.startServer(&testServer{security: e.security})
  1052  	defer te.tearDown()
  1053  
  1054  	stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
  1055  	defer cf()
  1056  	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
  1057  }
  1058  
  1059  // TestHealthWatchDefaultStatusChange verifies the simple case where the
  1060  // service starts off with a SERVICE_UNKNOWN status (because SetServingStatus
  1061  // hasn't been called yet) and then moves to SERVING after SetServingStatus is
  1062  // called.
  1063  func (s) TestHealthWatchDefaultStatusChange(t *testing.T) {
  1064  	for _, e := range listTestEnv() {
  1065  		testHealthWatchDefaultStatusChange(t, e)
  1066  	}
  1067  }
  1068  
  1069  func testHealthWatchDefaultStatusChange(t *testing.T, e env) {
  1070  	te := newTest(t, e)
  1071  	te.enableHealthServer = true
  1072  	te.startServer(&testServer{security: e.security})
  1073  	defer te.tearDown()
  1074  
  1075  	stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
  1076  	defer cf()
  1077  	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
  1078  	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
  1079  	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
  1080  }
  1081  
  1082  // TestHealthWatchSetServiceStatusBeforeClientCallsWatch verifies the case
  1083  // where the health status is set to SERVING before the client calls Watch().
  1084  func (s) TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) {
  1085  	for _, e := range listTestEnv() {
  1086  		testHealthWatchSetServiceStatusBeforeClientCallsWatch(t, e)
  1087  	}
  1088  }
  1089  
  1090  func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) {
  1091  	te := newTest(t, e)
  1092  	te.enableHealthServer = true
  1093  	te.startServer(&testServer{security: e.security})
  1094  	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
  1095  	defer te.tearDown()
  1096  
  1097  	stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
  1098  	defer cf()
  1099  	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
  1100  }
  1101  
  1102  // TestHealthWatchOverallServerHealthChange verifies setting the overall status
  1103  // of the server by using the empty service name.
  1104  func (s) TestHealthWatchOverallServerHealthChange(t *testing.T) {
  1105  	for _, e := range listTestEnv() {
  1106  		testHealthWatchOverallServerHealthChange(t, e)
  1107  	}
  1108  }
  1109  
  1110  func testHealthWatchOverallServerHealthChange(t *testing.T, e env) {
  1111  	te := newTest(t, e)
  1112  	te.enableHealthServer = true
  1113  	te.startServer(&testServer{security: e.security})
  1114  	defer te.tearDown()
  1115  
  1116  	stream, cf := newHealthCheckStream(t, te.clientConn(), "")
  1117  	defer cf()
  1118  	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
  1119  	te.setHealthServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
  1120  	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING)
  1121  }
  1122  
  1123  // TestUnknownHandler verifies that an expected error is returned (by setting
  1124  // the unknownHandler on the server) for a service which is not exposed to the
  1125  // client.
  1126  func (s) TestUnknownHandler(t *testing.T) {
  1127  	// An example unknownHandler that returns a different code and a different
  1128  	// method, making sure that we do not expose what methods are implemented to
  1129  	// a client that is not authenticated.
  1130  	unknownHandler := func(srv any, stream grpc.ServerStream) error {
  1131  		return status.Error(codes.Unauthenticated, "user unauthenticated")
  1132  	}
  1133  	for _, e := range listTestEnv() {
  1134  		// TODO(bradfitz): Temporarily skip this env due to #619.
  1135  		if e.name == "handler-tls" {
  1136  			continue
  1137  		}
  1138  		testUnknownHandler(t, e, unknownHandler)
  1139  	}
  1140  }
  1141  
  1142  func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) {
  1143  	te := newTest(t, e)
  1144  	te.unknownHandler = unknownHandler
  1145  	te.startServer(&testServer{security: e.security})
  1146  	defer te.tearDown()
  1147  	verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), "", codes.Unauthenticated)
  1148  }
  1149  
  1150  // TestHealthCheckServingStatus makes a streaming Watch() RPC on the health
  1151  // server and verifies a bunch of health status transitions.
  1152  func (s) TestHealthCheckServingStatus(t *testing.T) {
  1153  	for _, e := range listTestEnv() {
  1154  		testHealthCheckServingStatus(t, e)
  1155  	}
  1156  }
  1157  
  1158  func testHealthCheckServingStatus(t *testing.T, e env) {
  1159  	te := newTest(t, e)
  1160  	te.enableHealthServer = true
  1161  	te.startServer(&testServer{security: e.security})
  1162  	defer te.tearDown()
  1163  
  1164  	cc := te.clientConn()
  1165  	verifyHealthCheckStatus(t, 1*time.Second, cc, "", healthpb.HealthCheckResponse_SERVING)
  1166  	verifyHealthCheckErrCode(t, 1*time.Second, cc, defaultHealthService, codes.NotFound)
  1167  	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
  1168  	verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_SERVING)
  1169  	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
  1170  	verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
  1171  }
  1172  

View as plain text