...

Source file src/google.golang.org/grpc/resolver_balancer_ext_test.go

Documentation: google.golang.org/grpc

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package grpc_test
    20  
    21  import (
    22  	"context"
    23  	"errors"
    24  	"fmt"
    25  	"runtime"
    26  	"strings"
    27  	"testing"
    28  	"time"
    29  
    30  	"google.golang.org/grpc"
    31  	"google.golang.org/grpc/balancer"
    32  	"google.golang.org/grpc/connectivity"
    33  	"google.golang.org/grpc/credentials/insecure"
    34  	"google.golang.org/grpc/internal"
    35  	"google.golang.org/grpc/internal/balancer/stub"
    36  	"google.golang.org/grpc/internal/channelz"
    37  	"google.golang.org/grpc/resolver"
    38  	"google.golang.org/grpc/resolver/manual"
    39  )
    40  
    41  // TestResolverBalancerInteraction tests:
    42  // 1. resolver.Builder.Build() ->
    43  // 2. resolver.ClientConn.UpdateState() ->
    44  // 3. balancer.Balancer.UpdateClientConnState() ->
    45  // 4. balancer.ClientConn.ResolveNow() ->
    46  // 5. resolver.Resolver.ResolveNow() ->
    47  func (s) TestResolverBalancerInteraction(t *testing.T) {
    48  	name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1)
    49  	fmt.Println(name)
    50  	bf := stub.BalancerFuncs{
    51  		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
    52  			bd.ClientConn.ResolveNow(resolver.ResolveNowOptions{})
    53  			return nil
    54  		},
    55  	}
    56  	stub.Register(name, bf)
    57  
    58  	rb := manual.NewBuilderWithScheme(name)
    59  	rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) {
    60  		sc := cc.ParseServiceConfig(`{"loadBalancingConfig": [{"` + name + `":{}}]}`)
    61  		cc.UpdateState(resolver.State{
    62  			Addresses:     []resolver.Address{{Addr: "test"}},
    63  			ServiceConfig: sc,
    64  		})
    65  	}
    66  	rnCh := make(chan struct{})
    67  	rb.ResolveNowCallback = func(resolver.ResolveNowOptions) { close(rnCh) }
    68  	resolver.Register(rb)
    69  
    70  	cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials()))
    71  	if err != nil {
    72  		t.Fatalf("grpc.Dial error: %v", err)
    73  	}
    74  	defer cc.Close()
    75  	select {
    76  	case <-rnCh:
    77  	case <-time.After(defaultTestTimeout):
    78  		t.Fatalf("timed out waiting for resolver.ResolveNow")
    79  	}
    80  }
    81  
    82  type resolverBuilderWithErr struct {
    83  	resolver.Resolver
    84  	errCh  <-chan error
    85  	scheme string
    86  }
    87  
    88  func (b *resolverBuilderWithErr) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
    89  	if err := <-b.errCh; err != nil {
    90  		return nil, err
    91  	}
    92  	return b, nil
    93  }
    94  
    95  func (b *resolverBuilderWithErr) Scheme() string {
    96  	return b.scheme
    97  }
    98  
    99  func (b *resolverBuilderWithErr) Close() {}
   100  
   101  // TestResolverBuildFailure tests:
   102  // 1. resolver.Builder.Build() passes.
   103  // 2. Channel enters idle mode.
   104  // 3. An RPC happens.
   105  // 4. resolver.Builder.Build() fails.
   106  func (s) TestResolverBuildFailure(t *testing.T) {
   107  	enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))
   108  	name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1)
   109  	resErrCh := make(chan error, 1)
   110  	resolver.Register(&resolverBuilderWithErr{errCh: resErrCh, scheme: name})
   111  
   112  	resErrCh <- nil
   113  	cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials()))
   114  	if err != nil {
   115  		t.Fatalf("grpc.Dial error: %v", err)
   116  	}
   117  	defer cc.Close()
   118  	enterIdle(cc)
   119  	const errStr = "test error from resolver builder"
   120  	t.Log("pushing res err")
   121  	resErrCh <- errors.New(errStr)
   122  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   123  	defer cancel()
   124  	if err := cc.Invoke(ctx, "/a/b", nil, nil); err == nil || !strings.Contains(err.Error(), errStr) {
   125  		t.Fatalf("Invoke = %v; want %v", err, errStr)
   126  	}
   127  }
   128  
   129  // TestEnterIdleDuringResolverUpdateState tests a scenario that used to deadlock
   130  // while calling UpdateState at the same time as the resolver being closed while
   131  // the channel enters idle mode.
   132  func (s) TestEnterIdleDuringResolverUpdateState(t *testing.T) {
   133  	enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))
   134  	name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1)
   135  
   136  	// Create a manual resolver that spams UpdateState calls until it is closed.
   137  	rb := manual.NewBuilderWithScheme(name)
   138  	var cancel context.CancelFunc
   139  	rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) {
   140  		var ctx context.Context
   141  		ctx, cancel = context.WithCancel(context.Background())
   142  		go func() {
   143  			for ctx.Err() == nil {
   144  				cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}})
   145  			}
   146  		}()
   147  	}
   148  	rb.CloseCallback = func() {
   149  		cancel()
   150  	}
   151  	resolver.Register(rb)
   152  
   153  	cc, err := grpc.NewClient(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials()))
   154  	if err != nil {
   155  		t.Fatalf("grpc.NewClient error: %v", err)
   156  	}
   157  	defer cc.Close()
   158  
   159  	// Enter/exit idle mode repeatedly.
   160  	for i := 0; i < 2000; i++ {
   161  		// Start a timer so we panic out of the deadlock and can see all the
   162  		// stack traces to debug the problem.
   163  		p := time.AfterFunc(time.Second, func() {
   164  			buf := make([]byte, 8192)
   165  			buf = buf[0:runtime.Stack(buf, true)]
   166  			t.Error("Timed out waiting for enterIdle")
   167  			panic(fmt.Sprint("Stack trace:\n", string(buf)))
   168  		})
   169  		enterIdle(cc)
   170  		p.Stop()
   171  		cc.Connect()
   172  	}
   173  }
   174  
   175  // TestEnterIdleDuringBalancerUpdateState tests calling UpdateState at the same
   176  // time as the balancer being closed while the channel enters idle mode.
   177  func (s) TestEnterIdleDuringBalancerUpdateState(t *testing.T) {
   178  	enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))
   179  	name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1)
   180  
   181  	// Create a balancer that calls UpdateState once asynchronously, attempting
   182  	// to make the channel appear ready even after entering idle.
   183  	bf := stub.BalancerFuncs{
   184  		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
   185  			go func() {
   186  				bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready})
   187  			}()
   188  			return nil
   189  		},
   190  	}
   191  	stub.Register(name, bf)
   192  
   193  	rb := manual.NewBuilderWithScheme(name)
   194  	rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) {
   195  		cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}})
   196  	}
   197  	resolver.Register(rb)
   198  
   199  	cc, err := grpc.NewClient(
   200  		name+":///",
   201  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   202  		grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`))
   203  	if err != nil {
   204  		t.Fatalf("grpc.NewClient error: %v", err)
   205  	}
   206  	defer cc.Close()
   207  
   208  	// Enter/exit idle mode repeatedly.
   209  	for i := 0; i < 2000; i++ {
   210  		enterIdle(cc)
   211  		if got, want := cc.GetState(), connectivity.Idle; got != want {
   212  			t.Fatalf("cc state = %v; want %v", got, want)
   213  		}
   214  		cc.Connect()
   215  	}
   216  }
   217  
   218  // TestEnterIdleDuringBalancerNewSubConn tests calling NewSubConn at the same
   219  // time as the balancer being closed while the channel enters idle mode.
   220  func (s) TestEnterIdleDuringBalancerNewSubConn(t *testing.T) {
   221  	channelz.TurnOn()
   222  	defer internal.ChannelzTurnOffForTesting()
   223  	enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))
   224  	name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1)
   225  
   226  	// Create a balancer that calls NewSubConn once asynchronously, attempting
   227  	// to create a subchannel after going idle.
   228  	bf := stub.BalancerFuncs{
   229  		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
   230  			go func() {
   231  				bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "test"}}, balancer.NewSubConnOptions{})
   232  			}()
   233  			return nil
   234  		},
   235  	}
   236  	stub.Register(name, bf)
   237  
   238  	rb := manual.NewBuilderWithScheme(name)
   239  	rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) {
   240  		cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}})
   241  	}
   242  	resolver.Register(rb)
   243  
   244  	cc, err := grpc.NewClient(
   245  		name+":///",
   246  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   247  		grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`))
   248  	if err != nil {
   249  		t.Fatalf("grpc.NewClient error: %v", err)
   250  	}
   251  	defer cc.Close()
   252  
   253  	// Enter/exit idle mode repeatedly.
   254  	for i := 0; i < 2000; i++ {
   255  		enterIdle(cc)
   256  		tcs, _ := channelz.GetTopChannels(0, 0)
   257  		if len(tcs) != 1 {
   258  			t.Fatalf("Found channels: %v; expected 1 entry", tcs)
   259  		}
   260  		if got := tcs[0].SubChans(); len(got) != 0 {
   261  			t.Fatalf("Found subchannels: %v; expected 0 entries", got)
   262  		}
   263  		cc.Connect()
   264  	}
   265  }
   266  

View as plain text