/* * * Copyright 2023 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package grpc_test import ( "context" "io" "runtime" "sync" "testing" "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/status" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" ) // TestServer_MaxHandlers ensures that no more than MaxConcurrentStreams server // handlers are active at one time. func (s) TestServer_MaxHandlers(t *testing.T) { started := make(chan struct{}) blockCalls := grpcsync.NewEvent() // This stub server does not properly respect the stream context, so it will // not exit when the context is canceled. ss := stubserver.StubServer{ FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { started <- struct{}{} <-blockCalls.Done() return nil }, } if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}); err != nil { t.Fatal("Error starting server:", err) } defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() // Start one RPC to the server. ctx1, cancel1 := context.WithCancel(ctx) _, err := ss.Client.FullDuplexCall(ctx1) if err != nil { t.Fatal("Error staring call:", err) } // Wait for the handler to be invoked. select { case <-started: case <-ctx.Done(): t.Fatalf("Timed out waiting for RPC to start on server.") } // Cancel it on the client. The server handler will still be running. cancel1() ctx2, cancel2 := context.WithCancel(ctx) defer cancel2() s, err := ss.Client.FullDuplexCall(ctx2) if err != nil { t.Fatal("Error staring call:", err) } // After 100ms, allow the first call to unblock. That should allow the // second RPC to run and finish. select { case <-started: blockCalls.Fire() t.Fatalf("RPC started unexpectedly.") case <-time.After(100 * time.Millisecond): blockCalls.Fire() } select { case <-started: case <-ctx.Done(): t.Fatalf("Timed out waiting for second RPC to start on server.") } if _, err := s.Recv(); err != io.EOF { t.Fatal("Received unexpected RPC error:", err) } } // Tests the case where the stream worker goroutine option is enabled, and a // number of RPCs are initiated around the same time that Stop() is called. This // used to result in a write to a closed channel. This test verifies that there // is no panic. func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) { ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU()))) // This deferred stop takes care of stopping the server when one of the // below grpc.Dials fail, and the test exits early. defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() const numChannels = 20 const numRPCLoops = 20 // Create a bunch of clientconns and ensure that they are READY by making an // RPC on them. ccs := make([]*grpc.ClientConn, numChannels) for i := 0; i < numChannels; i++ { var err error ccs[i], err = grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("[iteration: %d] grpc.NewClient(%s) failed: %v", i, ss.Address, err) } defer ccs[i].Close() client := testgrpc.NewTestServiceClient(ccs[i]) if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("EmptyCall() failed: %v", err) } } // Make a bunch of concurrent RPCs on the above clientconns. These will // eventually race with Stop(), and will start to fail. var wg sync.WaitGroup for i := 0; i < numChannels; i++ { client := testgrpc.NewTestServiceClient(ccs[i]) for j := 0; j < numRPCLoops; j++ { wg.Add(1) go func(client testgrpc.TestServiceClient) { defer wg.Done() for { _, err := client.EmptyCall(ctx, &testpb.Empty{}) if err == nil { continue } if code := status.Code(err); code == codes.Unavailable { // Once Stop() has been called on the server, we expect // subsequent calls to fail with Unavailable. return } t.Errorf("EmptyCall() failed: %v", err) return } }(client) } } // Call Stop() concurrently with the above RPC attempts. ss.Stop() wg.Wait() } // Tests the case where the stream worker goroutine option is enabled, and both // Stop() and GracefulStop() care called. This used to result in a close of a // closed channel. This test verifies that there is no panic. func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) { ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU()))) defer ss.Stop() if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil { t.Fatalf("Failed to create client to stub server: %v", err) } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() client := testgrpc.NewTestServiceClient(ss.CC) if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("EmptyCall() failed: %v", err) } ss.S.GracefulStop() } // Tests the WaitForHandlers ServerOption by leaving an RPC running while Stop // is called, and ensures Stop doesn't return until the handler returns. func (s) TestServer_WaitForHandlers(t *testing.T) { started := grpcsync.NewEvent() blockCalls := grpcsync.NewEvent() // This stub server does not properly respect the stream context, so it will // not exit when the context is canceled. ss := stubserver.StubServer{ FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { started.Fire() <-blockCalls.Done() return nil }, } if err := ss.Start([]grpc.ServerOption{grpc.WaitForHandlers(true)}); err != nil { t.Fatal("Error starting server:", err) } defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() // Start one RPC to the server. ctx1, cancel1 := context.WithCancel(ctx) _, err := ss.Client.FullDuplexCall(ctx1) if err != nil { t.Fatal("Error staring call:", err) } // Wait for the handler to be invoked. select { case <-started.Done(): case <-ctx.Done(): t.Fatalf("Timed out waiting for RPC to start on server.") } // Cancel it on the client. The server handler will still be running. cancel1() // Close the connection. This might be sufficient to allow the server to // return if it doesn't properly wait for outstanding method handlers to // return. ss.CC.Close() // Try to Stop() the server, which should block indefinitely (until // blockCalls is fired). stopped := grpcsync.NewEvent() go func() { ss.S.Stop() stopped.Fire() }() // Wait 100ms and ensure stopped does not fire. select { case <-stopped.Done(): trace := make([]byte, 4096) trace = trace[0:runtime.Stack(trace, true)] blockCalls.Fire() t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace)) case <-time.After(100 * time.Millisecond): // Success; unblock the call and wait for stopped. blockCalls.Fire() } select { case <-stopped.Done(): case <-ctx.Done(): t.Fatalf("Timed out waiting for second RPC to start on server.") } } // Tests that GracefulStop will wait for all method handlers to return by // blocking a handler and ensuring GracefulStop doesn't return until after it is // unblocked. func (s) TestServer_GracefulStopWaits(t *testing.T) { started := grpcsync.NewEvent() blockCalls := grpcsync.NewEvent() // This stub server does not properly respect the stream context, so it will // not exit when the context is canceled. ss := stubserver.StubServer{ FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { started.Fire() <-blockCalls.Done() return nil }, } if err := ss.Start(nil); err != nil { t.Fatal("Error starting server:", err) } defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() // Start one RPC to the server. ctx1, cancel1 := context.WithCancel(ctx) _, err := ss.Client.FullDuplexCall(ctx1) if err != nil { t.Fatal("Error staring call:", err) } // Wait for the handler to be invoked. select { case <-started.Done(): case <-ctx.Done(): t.Fatalf("Timed out waiting for RPC to start on server.") } // Cancel it on the client. The server handler will still be running. cancel1() // Close the connection. This might be sufficient to allow the server to // return if it doesn't properly wait for outstanding method handlers to // return. ss.CC.Close() // Try to Stop() the server, which should block indefinitely (until // blockCalls is fired). stopped := grpcsync.NewEvent() go func() { ss.S.GracefulStop() stopped.Fire() }() // Wait 100ms and ensure stopped does not fire. select { case <-stopped.Done(): trace := make([]byte, 4096) trace = trace[0:runtime.Stack(trace, true)] blockCalls.Fire() t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace)) case <-time.After(100 * time.Millisecond): // Success; unblock the call and wait for stopped. blockCalls.Fire() } select { case <-stopped.Done(): case <-ctx.Done(): t.Fatalf("Timed out waiting for second RPC to start on server.") } }