...

Source file src/google.golang.org/grpc/internal/stubserver/stubserver.go

Documentation: google.golang.org/grpc/internal/stubserver

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package stubserver is a stubbable implementation of
    20  // google.golang.org/grpc/interop/grpc_testing for testing purposes.
    21  package stubserver
    22  
    23  import (
    24  	"context"
    25  	"fmt"
    26  	"net"
    27  	"testing"
    28  	"time"
    29  
    30  	"golang.org/x/net/http2"
    31  	"google.golang.org/grpc"
    32  	"google.golang.org/grpc/connectivity"
    33  	"google.golang.org/grpc/credentials/insecure"
    34  	"google.golang.org/grpc/resolver"
    35  	"google.golang.org/grpc/resolver/manual"
    36  	"google.golang.org/grpc/serviceconfig"
    37  
    38  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    39  	testpb "google.golang.org/grpc/interop/grpc_testing"
    40  )
    41  
    42  // StubServer is a server that is easy to customize within individual test
    43  // cases.
    44  type StubServer struct {
    45  	// Guarantees we satisfy this interface; panics if unimplemented methods are called.
    46  	testgrpc.TestServiceServer
    47  
    48  	// Customizable implementations of server handlers.
    49  	EmptyCallF      func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
    50  	UnaryCallF      func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
    51  	FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error
    52  
    53  	// A client connected to this service the test may use.  Created in Start().
    54  	Client testgrpc.TestServiceClient
    55  	CC     *grpc.ClientConn
    56  	S      *grpc.Server
    57  
    58  	// Parameters for Listen and Dial. Defaults will be used if these are empty
    59  	// before Start.
    60  	Network string
    61  	Address string
    62  	Target  string
    63  
    64  	// Custom listener to use for serving. If unspecified, a new listener is
    65  	// created on a local port.
    66  	Listener net.Listener
    67  
    68  	cleanups []func() // Lambdas executed in Stop(); populated by Start().
    69  
    70  	// Set automatically if Target == ""
    71  	R *manual.Resolver
    72  }
    73  
    74  // EmptyCall is the handler for testpb.EmptyCall
    75  func (ss *StubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
    76  	return ss.EmptyCallF(ctx, in)
    77  }
    78  
    79  // UnaryCall is the handler for testpb.UnaryCall
    80  func (ss *StubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
    81  	return ss.UnaryCallF(ctx, in)
    82  }
    83  
    84  // FullDuplexCall is the handler for testpb.FullDuplexCall
    85  func (ss *StubServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
    86  	return ss.FullDuplexCallF(stream)
    87  }
    88  
    89  // Start starts the server and creates a client connected to it.
    90  func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error {
    91  	if err := ss.StartServer(sopts...); err != nil {
    92  		return err
    93  	}
    94  	if err := ss.StartClient(dopts...); err != nil {
    95  		ss.Stop()
    96  		return err
    97  	}
    98  	return nil
    99  }
   100  
   101  type registerServiceServerOption struct {
   102  	grpc.EmptyServerOption
   103  	f func(*grpc.Server)
   104  }
   105  
   106  // RegisterServiceServerOption returns a ServerOption that will run f() in
   107  // Start or StartServer with the grpc.Server created before serving.  This
   108  // allows other services to be registered on the test server (e.g. ORCA,
   109  // health, or reflection).
   110  func RegisterServiceServerOption(f func(*grpc.Server)) grpc.ServerOption {
   111  	return &registerServiceServerOption{f: f}
   112  }
   113  
   114  func (ss *StubServer) setupServer(sopts ...grpc.ServerOption) (net.Listener, error) {
   115  	if ss.Network == "" {
   116  		ss.Network = "tcp"
   117  	}
   118  	if ss.Address == "" {
   119  		ss.Address = "localhost:0"
   120  	}
   121  	if ss.Target == "" {
   122  		ss.R = manual.NewBuilderWithScheme("whatever")
   123  	}
   124  
   125  	lis := ss.Listener
   126  	if lis == nil {
   127  		var err error
   128  		lis, err = net.Listen(ss.Network, ss.Address)
   129  		if err != nil {
   130  			return nil, fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
   131  		}
   132  	}
   133  	ss.Address = lis.Addr().String()
   134  
   135  	ss.S = grpc.NewServer(sopts...)
   136  	for _, so := range sopts {
   137  		switch x := so.(type) {
   138  		case *registerServiceServerOption:
   139  			x.f(ss.S)
   140  		}
   141  	}
   142  
   143  	testgrpc.RegisterTestServiceServer(ss.S, ss)
   144  	ss.cleanups = append(ss.cleanups, ss.S.Stop)
   145  	return lis, nil
   146  }
   147  
   148  // StartHandlerServer only starts an HTTP server with a gRPC server as the
   149  // handler. It does not create a client to it.  Cannot be used in a StubServer
   150  // that also used StartServer.
   151  func (ss *StubServer) StartHandlerServer(sopts ...grpc.ServerOption) error {
   152  	lis, err := ss.setupServer(sopts...)
   153  	if err != nil {
   154  		return err
   155  	}
   156  
   157  	go func() {
   158  		hs := &http2.Server{}
   159  		opts := &http2.ServeConnOpts{Handler: ss.S}
   160  		for {
   161  			conn, err := lis.Accept()
   162  			if err != nil {
   163  				return
   164  			}
   165  			hs.ServeConn(conn, opts)
   166  		}
   167  	}()
   168  	ss.cleanups = append(ss.cleanups, func() { lis.Close() })
   169  
   170  	return nil
   171  }
   172  
   173  // StartServer only starts the server. It does not create a client to it.
   174  // Cannot be used in a StubServer that also used StartHandlerServer.
   175  func (ss *StubServer) StartServer(sopts ...grpc.ServerOption) error {
   176  	lis, err := ss.setupServer(sopts...)
   177  	if err != nil {
   178  		return err
   179  	}
   180  
   181  	go ss.S.Serve(lis)
   182  
   183  	return nil
   184  }
   185  
   186  // StartClient creates a client connected to this service that the test may use.
   187  // The newly created client will be available in the Client field of StubServer.
   188  func (ss *StubServer) StartClient(dopts ...grpc.DialOption) error {
   189  	opts := append([]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, dopts...)
   190  	if ss.R != nil {
   191  		ss.Target = ss.R.Scheme() + ":///" + ss.Address
   192  		opts = append(opts, grpc.WithResolvers(ss.R))
   193  	}
   194  
   195  	cc, err := grpc.Dial(ss.Target, opts...)
   196  	if err != nil {
   197  		return fmt.Errorf("grpc.Dial(%q) = %v", ss.Target, err)
   198  	}
   199  	ss.CC = cc
   200  	if ss.R != nil {
   201  		ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}})
   202  	}
   203  	if err := waitForReady(cc); err != nil {
   204  		cc.Close()
   205  		return err
   206  	}
   207  
   208  	ss.cleanups = append(ss.cleanups, func() { cc.Close() })
   209  
   210  	ss.Client = testgrpc.NewTestServiceClient(cc)
   211  	return nil
   212  }
   213  
   214  // NewServiceConfig applies sc to ss.Client using the resolver (if present).
   215  func (ss *StubServer) NewServiceConfig(sc string) {
   216  	if ss.R != nil {
   217  		ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}, ServiceConfig: parseCfg(ss.R, sc)})
   218  	}
   219  }
   220  
   221  func waitForReady(cc *grpc.ClientConn) error {
   222  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   223  	defer cancel()
   224  	for {
   225  		s := cc.GetState()
   226  		if s == connectivity.Ready {
   227  			return nil
   228  		}
   229  		if !cc.WaitForStateChange(ctx, s) {
   230  			// ctx got timeout or canceled.
   231  			return ctx.Err()
   232  		}
   233  	}
   234  }
   235  
   236  // Stop stops ss and cleans up all resources it consumed.
   237  func (ss *StubServer) Stop() {
   238  	for i := len(ss.cleanups) - 1; i >= 0; i-- {
   239  		ss.cleanups[i]()
   240  	}
   241  }
   242  
   243  func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
   244  	g := r.CC.ParseServiceConfig(s)
   245  	if g.Err != nil {
   246  		panic(fmt.Sprintf("Error parsing config %q: %v", s, g.Err))
   247  	}
   248  	return g
   249  }
   250  
   251  // StartTestService spins up a stub server exposing the TestService on a local
   252  // port. If the passed in server is nil, a stub server that implements only the
   253  // EmptyCall and UnaryCall RPCs is started.
   254  func StartTestService(t *testing.T, server *StubServer, sopts ...grpc.ServerOption) *StubServer {
   255  	if server == nil {
   256  		server = &StubServer{
   257  			EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
   258  			UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
   259  				return &testpb.SimpleResponse{}, nil
   260  			},
   261  		}
   262  	}
   263  	server.StartServer(sopts...)
   264  
   265  	t.Logf("Started test service backend at %q", server.Address)
   266  	return server
   267  }
   268  

View as plain text