...

Source file src/google.golang.org/grpc/server_ext_test.go

Documentation: google.golang.org/grpc

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package grpc_test
    20  
    21  import (
    22  	"context"
    23  	"io"
    24  	"runtime"
    25  	"sync"
    26  	"testing"
    27  	"time"
    28  
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/codes"
    31  	"google.golang.org/grpc/credentials/insecure"
    32  	"google.golang.org/grpc/internal/grpcsync"
    33  	"google.golang.org/grpc/internal/stubserver"
    34  	"google.golang.org/grpc/status"
    35  
    36  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    37  	testpb "google.golang.org/grpc/interop/grpc_testing"
    38  )
    39  
    40  // TestServer_MaxHandlers ensures that no more than MaxConcurrentStreams server
    41  // handlers are active at one time.
    42  func (s) TestServer_MaxHandlers(t *testing.T) {
    43  	started := make(chan struct{})
    44  	blockCalls := grpcsync.NewEvent()
    45  
    46  	// This stub server does not properly respect the stream context, so it will
    47  	// not exit when the context is canceled.
    48  	ss := stubserver.StubServer{
    49  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
    50  			started <- struct{}{}
    51  			<-blockCalls.Done()
    52  			return nil
    53  		},
    54  	}
    55  	if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}); err != nil {
    56  		t.Fatal("Error starting server:", err)
    57  	}
    58  	defer ss.Stop()
    59  
    60  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    61  	defer cancel()
    62  
    63  	// Start one RPC to the server.
    64  	ctx1, cancel1 := context.WithCancel(ctx)
    65  	_, err := ss.Client.FullDuplexCall(ctx1)
    66  	if err != nil {
    67  		t.Fatal("Error staring call:", err)
    68  	}
    69  
    70  	// Wait for the handler to be invoked.
    71  	select {
    72  	case <-started:
    73  	case <-ctx.Done():
    74  		t.Fatalf("Timed out waiting for RPC to start on server.")
    75  	}
    76  
    77  	// Cancel it on the client.  The server handler will still be running.
    78  	cancel1()
    79  
    80  	ctx2, cancel2 := context.WithCancel(ctx)
    81  	defer cancel2()
    82  	s, err := ss.Client.FullDuplexCall(ctx2)
    83  	if err != nil {
    84  		t.Fatal("Error staring call:", err)
    85  	}
    86  
    87  	// After 100ms, allow the first call to unblock.  That should allow the
    88  	// second RPC to run and finish.
    89  	select {
    90  	case <-started:
    91  		blockCalls.Fire()
    92  		t.Fatalf("RPC started unexpectedly.")
    93  	case <-time.After(100 * time.Millisecond):
    94  		blockCalls.Fire()
    95  	}
    96  
    97  	select {
    98  	case <-started:
    99  	case <-ctx.Done():
   100  		t.Fatalf("Timed out waiting for second RPC to start on server.")
   101  	}
   102  	if _, err := s.Recv(); err != io.EOF {
   103  		t.Fatal("Received unexpected RPC error:", err)
   104  	}
   105  }
   106  
   107  // Tests the case where the stream worker goroutine option is enabled, and a
   108  // number of RPCs are initiated around the same time that Stop() is called. This
   109  // used to result in a write to a closed channel. This test verifies that there
   110  // is no panic.
   111  func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) {
   112  	ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
   113  	// This deferred stop takes care of stopping the server when one of the
   114  	// below grpc.Dials fail, and the test exits early.
   115  	defer ss.Stop()
   116  
   117  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   118  	defer cancel()
   119  	const numChannels = 20
   120  	const numRPCLoops = 20
   121  
   122  	// Create a bunch of clientconns and ensure that they are READY by making an
   123  	// RPC on them.
   124  	ccs := make([]*grpc.ClientConn, numChannels)
   125  	for i := 0; i < numChannels; i++ {
   126  		var err error
   127  		ccs[i], err = grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
   128  		if err != nil {
   129  			t.Fatalf("[iteration: %d] grpc.NewClient(%s) failed: %v", i, ss.Address, err)
   130  		}
   131  		defer ccs[i].Close()
   132  		client := testgrpc.NewTestServiceClient(ccs[i])
   133  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   134  			t.Fatalf("EmptyCall() failed: %v", err)
   135  		}
   136  	}
   137  
   138  	// Make a bunch of concurrent RPCs on the above clientconns. These will
   139  	// eventually race with Stop(), and will start to fail.
   140  	var wg sync.WaitGroup
   141  	for i := 0; i < numChannels; i++ {
   142  		client := testgrpc.NewTestServiceClient(ccs[i])
   143  		for j := 0; j < numRPCLoops; j++ {
   144  			wg.Add(1)
   145  			go func(client testgrpc.TestServiceClient) {
   146  				defer wg.Done()
   147  				for {
   148  					_, err := client.EmptyCall(ctx, &testpb.Empty{})
   149  					if err == nil {
   150  						continue
   151  					}
   152  					if code := status.Code(err); code == codes.Unavailable {
   153  						// Once Stop() has been called on the server, we expect
   154  						// subsequent calls to fail with Unavailable.
   155  						return
   156  					}
   157  					t.Errorf("EmptyCall() failed: %v", err)
   158  					return
   159  				}
   160  			}(client)
   161  		}
   162  	}
   163  
   164  	// Call Stop() concurrently with the above RPC attempts.
   165  	ss.Stop()
   166  	wg.Wait()
   167  }
   168  
   169  // Tests the case where the stream worker goroutine option is enabled, and both
   170  // Stop() and GracefulStop() care called. This used to result in a close of a
   171  // closed channel. This test verifies that there is no panic.
   172  func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) {
   173  	ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
   174  	defer ss.Stop()
   175  
   176  	if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
   177  		t.Fatalf("Failed to create client to stub server: %v", err)
   178  	}
   179  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   180  	defer cancel()
   181  	client := testgrpc.NewTestServiceClient(ss.CC)
   182  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   183  		t.Fatalf("EmptyCall() failed: %v", err)
   184  	}
   185  
   186  	ss.S.GracefulStop()
   187  }
   188  
   189  // Tests the WaitForHandlers ServerOption by leaving an RPC running while Stop
   190  // is called, and ensures Stop doesn't return until the handler returns.
   191  func (s) TestServer_WaitForHandlers(t *testing.T) {
   192  	started := grpcsync.NewEvent()
   193  	blockCalls := grpcsync.NewEvent()
   194  
   195  	// This stub server does not properly respect the stream context, so it will
   196  	// not exit when the context is canceled.
   197  	ss := stubserver.StubServer{
   198  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
   199  			started.Fire()
   200  			<-blockCalls.Done()
   201  			return nil
   202  		},
   203  	}
   204  	if err := ss.Start([]grpc.ServerOption{grpc.WaitForHandlers(true)}); err != nil {
   205  		t.Fatal("Error starting server:", err)
   206  	}
   207  	defer ss.Stop()
   208  
   209  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   210  	defer cancel()
   211  
   212  	// Start one RPC to the server.
   213  	ctx1, cancel1 := context.WithCancel(ctx)
   214  	_, err := ss.Client.FullDuplexCall(ctx1)
   215  	if err != nil {
   216  		t.Fatal("Error staring call:", err)
   217  	}
   218  
   219  	// Wait for the handler to be invoked.
   220  	select {
   221  	case <-started.Done():
   222  	case <-ctx.Done():
   223  		t.Fatalf("Timed out waiting for RPC to start on server.")
   224  	}
   225  
   226  	// Cancel it on the client.  The server handler will still be running.
   227  	cancel1()
   228  
   229  	// Close the connection.  This might be sufficient to allow the server to
   230  	// return if it doesn't properly wait for outstanding method handlers to
   231  	// return.
   232  	ss.CC.Close()
   233  
   234  	// Try to Stop() the server, which should block indefinitely (until
   235  	// blockCalls is fired).
   236  	stopped := grpcsync.NewEvent()
   237  	go func() {
   238  		ss.S.Stop()
   239  		stopped.Fire()
   240  	}()
   241  
   242  	// Wait 100ms and ensure stopped does not fire.
   243  	select {
   244  	case <-stopped.Done():
   245  		trace := make([]byte, 4096)
   246  		trace = trace[0:runtime.Stack(trace, true)]
   247  		blockCalls.Fire()
   248  		t.Fatalf("Server returned from Stop() illegally.  Stack trace:\n%v", string(trace))
   249  	case <-time.After(100 * time.Millisecond):
   250  		// Success; unblock the call and wait for stopped.
   251  		blockCalls.Fire()
   252  	}
   253  
   254  	select {
   255  	case <-stopped.Done():
   256  	case <-ctx.Done():
   257  		t.Fatalf("Timed out waiting for second RPC to start on server.")
   258  	}
   259  }
   260  
   261  // Tests that GracefulStop will wait for all method handlers to return by
   262  // blocking a handler and ensuring GracefulStop doesn't return until after it is
   263  // unblocked.
   264  func (s) TestServer_GracefulStopWaits(t *testing.T) {
   265  	started := grpcsync.NewEvent()
   266  	blockCalls := grpcsync.NewEvent()
   267  
   268  	// This stub server does not properly respect the stream context, so it will
   269  	// not exit when the context is canceled.
   270  	ss := stubserver.StubServer{
   271  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
   272  			started.Fire()
   273  			<-blockCalls.Done()
   274  			return nil
   275  		},
   276  	}
   277  	if err := ss.Start(nil); err != nil {
   278  		t.Fatal("Error starting server:", err)
   279  	}
   280  	defer ss.Stop()
   281  
   282  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   283  	defer cancel()
   284  
   285  	// Start one RPC to the server.
   286  	ctx1, cancel1 := context.WithCancel(ctx)
   287  	_, err := ss.Client.FullDuplexCall(ctx1)
   288  	if err != nil {
   289  		t.Fatal("Error staring call:", err)
   290  	}
   291  
   292  	// Wait for the handler to be invoked.
   293  	select {
   294  	case <-started.Done():
   295  	case <-ctx.Done():
   296  		t.Fatalf("Timed out waiting for RPC to start on server.")
   297  	}
   298  
   299  	// Cancel it on the client.  The server handler will still be running.
   300  	cancel1()
   301  
   302  	// Close the connection.  This might be sufficient to allow the server to
   303  	// return if it doesn't properly wait for outstanding method handlers to
   304  	// return.
   305  	ss.CC.Close()
   306  
   307  	// Try to Stop() the server, which should block indefinitely (until
   308  	// blockCalls is fired).
   309  	stopped := grpcsync.NewEvent()
   310  	go func() {
   311  		ss.S.GracefulStop()
   312  		stopped.Fire()
   313  	}()
   314  
   315  	// Wait 100ms and ensure stopped does not fire.
   316  	select {
   317  	case <-stopped.Done():
   318  		trace := make([]byte, 4096)
   319  		trace = trace[0:runtime.Stack(trace, true)]
   320  		blockCalls.Fire()
   321  		t.Fatalf("Server returned from Stop() illegally.  Stack trace:\n%v", string(trace))
   322  	case <-time.After(100 * time.Millisecond):
   323  		// Success; unblock the call and wait for stopped.
   324  		blockCalls.Fire()
   325  	}
   326  
   327  	select {
   328  	case <-stopped.Done():
   329  	case <-ctx.Done():
   330  		t.Fatalf("Timed out waiting for second RPC to start on server.")
   331  	}
   332  }
   333  

View as plain text