...

Source file src/google.golang.org/grpc/test/clientconn_state_transition_test.go

Documentation: google.golang.org/grpc/test

     1  /*
     2   *
     3   * Copyright 2018 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"net"
    25  	"sync"
    26  	"testing"
    27  	"time"
    28  
    29  	"golang.org/x/net/http2"
    30  	"google.golang.org/grpc"
    31  	"google.golang.org/grpc/backoff"
    32  	"google.golang.org/grpc/balancer"
    33  	"google.golang.org/grpc/connectivity"
    34  	"google.golang.org/grpc/credentials/insecure"
    35  	"google.golang.org/grpc/internal"
    36  	"google.golang.org/grpc/internal/balancer/stub"
    37  	"google.golang.org/grpc/internal/grpcsync"
    38  	"google.golang.org/grpc/internal/testutils"
    39  	"google.golang.org/grpc/resolver"
    40  	"google.golang.org/grpc/resolver/manual"
    41  )
    42  
    43  const stateRecordingBalancerName = "state_recording_balancer"
    44  
    45  var testBalancerBuilder = newStateRecordingBalancerBuilder()
    46  
    47  func init() {
    48  	balancer.Register(testBalancerBuilder)
    49  }
    50  
    51  // These tests use a pipeListener. This listener is similar to net.Listener
    52  // except that it is unbuffered, so each read and write will wait for the other
    53  // side's corresponding write or read.
    54  func (s) TestStateTransitions_SingleAddress(t *testing.T) {
    55  	for _, test := range []struct {
    56  		desc   string
    57  		want   []connectivity.State
    58  		server func(net.Listener) net.Conn
    59  	}{
    60  		{
    61  			desc: "When the server returns server preface, the client enters READY.",
    62  			want: []connectivity.State{
    63  				connectivity.Connecting,
    64  				connectivity.Ready,
    65  			},
    66  			server: func(lis net.Listener) net.Conn {
    67  				conn, err := lis.Accept()
    68  				if err != nil {
    69  					t.Error(err)
    70  					return nil
    71  				}
    72  
    73  				go keepReading(conn)
    74  
    75  				framer := http2.NewFramer(conn, conn)
    76  				if err := framer.WriteSettings(http2.Setting{}); err != nil {
    77  					t.Errorf("Error while writing settings frame. %v", err)
    78  					return nil
    79  				}
    80  
    81  				return conn
    82  			},
    83  		},
    84  		{
    85  			desc: "When the connection is closed before the preface is sent, the client enters TRANSIENT FAILURE.",
    86  			want: []connectivity.State{
    87  				connectivity.Connecting,
    88  				connectivity.TransientFailure,
    89  			},
    90  			server: func(lis net.Listener) net.Conn {
    91  				conn, err := lis.Accept()
    92  				if err != nil {
    93  					t.Error(err)
    94  					return nil
    95  				}
    96  
    97  				conn.Close()
    98  				return nil
    99  			},
   100  		},
   101  		{
   102  			desc: `When the server sends its connection preface, but the connection dies before the client can write its
   103  connection preface, the client enters TRANSIENT FAILURE.`,
   104  			want: []connectivity.State{
   105  				connectivity.Connecting,
   106  				connectivity.TransientFailure,
   107  			},
   108  			server: func(lis net.Listener) net.Conn {
   109  				conn, err := lis.Accept()
   110  				if err != nil {
   111  					t.Error(err)
   112  					return nil
   113  				}
   114  
   115  				framer := http2.NewFramer(conn, conn)
   116  				if err := framer.WriteSettings(http2.Setting{}); err != nil {
   117  					t.Errorf("Error while writing settings frame. %v", err)
   118  					return nil
   119  				}
   120  
   121  				conn.Close()
   122  				return nil
   123  			},
   124  		},
   125  		{
   126  			desc: `When the server reads the client connection preface but does not send its connection preface, the
   127  client enters TRANSIENT FAILURE.`,
   128  			want: []connectivity.State{
   129  				connectivity.Connecting,
   130  				connectivity.TransientFailure,
   131  			},
   132  			server: func(lis net.Listener) net.Conn {
   133  				conn, err := lis.Accept()
   134  				if err != nil {
   135  					t.Error(err)
   136  					return nil
   137  				}
   138  
   139  				go keepReading(conn)
   140  
   141  				return conn
   142  			},
   143  		},
   144  	} {
   145  		t.Log(test.desc)
   146  		testStateTransitionSingleAddress(t, test.want, test.server)
   147  	}
   148  }
   149  
   150  func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
   151  	pl := testutils.NewPipeListener()
   152  	defer pl.Close()
   153  
   154  	// Launch the server.
   155  	var conn net.Conn
   156  	var connMu sync.Mutex
   157  	go func() {
   158  		connMu.Lock()
   159  		conn = server(pl)
   160  		connMu.Unlock()
   161  	}()
   162  
   163  	client, err := grpc.Dial("",
   164  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   165  		grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
   166  		grpc.WithDialer(pl.Dialer()),
   167  		grpc.WithConnectParams(grpc.ConnectParams{
   168  			Backoff:           backoff.Config{},
   169  			MinConnectTimeout: 100 * time.Millisecond,
   170  		}))
   171  	if err != nil {
   172  		t.Fatal(err)
   173  	}
   174  	defer client.Close()
   175  
   176  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   177  	defer cancel()
   178  	go testutils.StayConnected(ctx, client)
   179  
   180  	stateNotifications := testBalancerBuilder.nextStateNotifier()
   181  	for i := 0; i < len(want); i++ {
   182  		select {
   183  		case <-time.After(defaultTestTimeout):
   184  			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
   185  		case seen := <-stateNotifications:
   186  			if seen != want[i] {
   187  				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
   188  			}
   189  		}
   190  	}
   191  
   192  	connMu.Lock()
   193  	defer connMu.Unlock()
   194  	if conn != nil {
   195  		err = conn.Close()
   196  		if err != nil {
   197  			t.Fatal(err)
   198  		}
   199  	}
   200  }
   201  
   202  // When a READY connection is closed, the client enters IDLE then CONNECTING.
   203  func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
   204  	lis, err := net.Listen("tcp", "localhost:0")
   205  	if err != nil {
   206  		t.Fatalf("Error while listening. Err: %v", err)
   207  	}
   208  	defer lis.Close()
   209  
   210  	sawReady := make(chan struct{}, 1)
   211  	defer close(sawReady)
   212  
   213  	// Launch the server.
   214  	go func() {
   215  		conn, err := lis.Accept()
   216  		if err != nil {
   217  			t.Error(err)
   218  			return
   219  		}
   220  
   221  		go keepReading(conn)
   222  
   223  		framer := http2.NewFramer(conn, conn)
   224  		if err := framer.WriteSettings(http2.Setting{}); err != nil {
   225  			t.Errorf("Error while writing settings frame. %v", err)
   226  			return
   227  		}
   228  
   229  		// Prevents race between onPrefaceReceipt and onClose.
   230  		<-sawReady
   231  
   232  		conn.Close()
   233  	}()
   234  
   235  	client, err := grpc.Dial(lis.Addr().String(),
   236  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   237  		grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)))
   238  	if err != nil {
   239  		t.Fatal(err)
   240  	}
   241  	defer client.Close()
   242  
   243  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   244  	defer cancel()
   245  	go testutils.StayConnected(ctx, client)
   246  
   247  	stateNotifications := testBalancerBuilder.nextStateNotifier()
   248  
   249  	want := []connectivity.State{
   250  		connectivity.Connecting,
   251  		connectivity.Ready,
   252  		connectivity.Idle,
   253  		connectivity.Connecting,
   254  	}
   255  	for i := 0; i < len(want); i++ {
   256  		select {
   257  		case <-time.After(defaultTestTimeout):
   258  			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
   259  		case seen := <-stateNotifications:
   260  			if seen == connectivity.Ready {
   261  				sawReady <- struct{}{}
   262  			}
   263  			if seen != want[i] {
   264  				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
   265  			}
   266  		}
   267  	}
   268  }
   269  
   270  // When the first connection is closed, the client stays in CONNECTING until it
   271  // tries the second address (which succeeds, and then it enters READY).
   272  func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
   273  	lis1, err := net.Listen("tcp", "localhost:0")
   274  	if err != nil {
   275  		t.Fatalf("Error while listening. Err: %v", err)
   276  	}
   277  	defer lis1.Close()
   278  
   279  	lis2, err := net.Listen("tcp", "localhost:0")
   280  	if err != nil {
   281  		t.Fatalf("Error while listening. Err: %v", err)
   282  	}
   283  	defer lis2.Close()
   284  
   285  	server1Done := make(chan struct{})
   286  	server2Done := make(chan struct{})
   287  
   288  	// Launch server 1.
   289  	go func() {
   290  		conn, err := lis1.Accept()
   291  		if err != nil {
   292  			t.Error(err)
   293  			return
   294  		}
   295  
   296  		conn.Close()
   297  		close(server1Done)
   298  	}()
   299  	// Launch server 2.
   300  	go func() {
   301  		conn, err := lis2.Accept()
   302  		if err != nil {
   303  			t.Error(err)
   304  			return
   305  		}
   306  
   307  		go keepReading(conn)
   308  
   309  		framer := http2.NewFramer(conn, conn)
   310  		if err := framer.WriteSettings(http2.Setting{}); err != nil {
   311  			t.Errorf("Error while writing settings frame. %v", err)
   312  			return
   313  		}
   314  
   315  		close(server2Done)
   316  	}()
   317  
   318  	rb := manual.NewBuilderWithScheme("whatever")
   319  	rb.InitialState(resolver.State{Addresses: []resolver.Address{
   320  		{Addr: lis1.Addr().String()},
   321  		{Addr: lis2.Addr().String()},
   322  	}})
   323  	client, err := grpc.Dial("whatever:///this-gets-overwritten",
   324  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   325  		grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
   326  		grpc.WithResolvers(rb))
   327  	if err != nil {
   328  		t.Fatal(err)
   329  	}
   330  	defer client.Close()
   331  
   332  	stateNotifications := testBalancerBuilder.nextStateNotifier()
   333  	want := []connectivity.State{
   334  		connectivity.Connecting,
   335  		connectivity.Ready,
   336  	}
   337  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   338  	defer cancel()
   339  	for i := 0; i < len(want); i++ {
   340  		select {
   341  		case <-ctx.Done():
   342  			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
   343  		case seen := <-stateNotifications:
   344  			if seen != want[i] {
   345  				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
   346  			}
   347  		}
   348  	}
   349  	select {
   350  	case <-ctx.Done():
   351  		t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
   352  	case <-server1Done:
   353  	}
   354  	select {
   355  	case <-ctx.Done():
   356  		t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
   357  	case <-server2Done:
   358  	}
   359  }
   360  
   361  // When there are multiple addresses, and we enter READY on one of them, a
   362  // later closure should cause the client to enter CONNECTING
   363  func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
   364  	lis1, err := net.Listen("tcp", "localhost:0")
   365  	if err != nil {
   366  		t.Fatalf("Error while listening. Err: %v", err)
   367  	}
   368  	defer lis1.Close()
   369  
   370  	// Never actually gets used; we just want it to be alive so that the resolver has two addresses to target.
   371  	lis2, err := net.Listen("tcp", "localhost:0")
   372  	if err != nil {
   373  		t.Fatalf("Error while listening. Err: %v", err)
   374  	}
   375  	defer lis2.Close()
   376  
   377  	server1Done := make(chan struct{})
   378  	sawReady := make(chan struct{}, 1)
   379  	defer close(sawReady)
   380  
   381  	// Launch server 1.
   382  	go func() {
   383  		conn, err := lis1.Accept()
   384  		if err != nil {
   385  			t.Error(err)
   386  			return
   387  		}
   388  
   389  		go keepReading(conn)
   390  
   391  		framer := http2.NewFramer(conn, conn)
   392  		if err := framer.WriteSettings(http2.Setting{}); err != nil {
   393  			t.Errorf("Error while writing settings frame. %v", err)
   394  			return
   395  		}
   396  
   397  		<-sawReady
   398  
   399  		conn.Close()
   400  
   401  		close(server1Done)
   402  	}()
   403  
   404  	rb := manual.NewBuilderWithScheme("whatever")
   405  	rb.InitialState(resolver.State{Addresses: []resolver.Address{
   406  		{Addr: lis1.Addr().String()},
   407  		{Addr: lis2.Addr().String()},
   408  	}})
   409  	client, err := grpc.Dial("whatever:///this-gets-overwritten",
   410  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   411  		grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
   412  		grpc.WithResolvers(rb))
   413  	if err != nil {
   414  		t.Fatal(err)
   415  	}
   416  	defer client.Close()
   417  
   418  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   419  	defer cancel()
   420  	go testutils.StayConnected(ctx, client)
   421  
   422  	stateNotifications := testBalancerBuilder.nextStateNotifier()
   423  	want := []connectivity.State{
   424  		connectivity.Connecting,
   425  		connectivity.Ready,
   426  		connectivity.Idle,
   427  		connectivity.Connecting,
   428  	}
   429  	for i := 0; i < len(want); i++ {
   430  		select {
   431  		case <-ctx.Done():
   432  			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
   433  		case seen := <-stateNotifications:
   434  			if seen == connectivity.Ready {
   435  				sawReady <- struct{}{}
   436  			}
   437  			if seen != want[i] {
   438  				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
   439  			}
   440  		}
   441  	}
   442  	select {
   443  	case <-ctx.Done():
   444  		t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
   445  	case <-server1Done:
   446  	}
   447  }
   448  
   449  type stateRecordingBalancer struct {
   450  	balancer.Balancer
   451  }
   452  
   453  func (b *stateRecordingBalancer) Close() {
   454  	b.Balancer.Close()
   455  }
   456  
   457  type stateRecordingBalancerBuilder struct {
   458  	mu       sync.Mutex
   459  	notifier chan connectivity.State // The notifier used in the last Balancer.
   460  }
   461  
   462  func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder {
   463  	return &stateRecordingBalancerBuilder{}
   464  }
   465  
   466  func (b *stateRecordingBalancerBuilder) Name() string {
   467  	return stateRecordingBalancerName
   468  }
   469  
   470  func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
   471  	stateNotifications := make(chan connectivity.State, 10)
   472  	b.mu.Lock()
   473  	b.notifier = stateNotifications
   474  	b.mu.Unlock()
   475  	return &stateRecordingBalancer{
   476  		Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
   477  	}
   478  }
   479  
   480  func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State {
   481  	b.mu.Lock()
   482  	defer b.mu.Unlock()
   483  	ret := b.notifier
   484  	b.notifier = nil
   485  	return ret
   486  }
   487  
   488  type stateRecordingCCWrapper struct {
   489  	balancer.ClientConn
   490  	notifier chan<- connectivity.State
   491  }
   492  
   493  func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
   494  	oldListener := opts.StateListener
   495  	opts.StateListener = func(s balancer.SubConnState) {
   496  		ccw.notifier <- s.ConnectivityState
   497  		oldListener(s)
   498  	}
   499  	return ccw.ClientConn.NewSubConn(addrs, opts)
   500  }
   501  
   502  // Keep reading until something causes the connection to die (EOF, server
   503  // closed, etc). Useful as a tool for mindlessly keeping the connection
   504  // healthy, since the client will error if things like client prefaces are not
   505  // accepted in a timely fashion.
   506  func keepReading(conn net.Conn) {
   507  	buf := make([]byte, 1024)
   508  	for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
   509  	}
   510  }
   511  
   512  type funcConnectivityStateSubscriber struct {
   513  	onMsg func(connectivity.State)
   514  }
   515  
   516  func (f *funcConnectivityStateSubscriber) OnMessage(msg any) {
   517  	f.onMsg(msg.(connectivity.State))
   518  }
   519  
   520  // TestConnectivityStateSubscriber confirms updates sent by the balancer in
   521  // rapid succession are not missed by the subscriber.
   522  func (s) TestConnectivityStateSubscriber(t *testing.T) {
   523  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   524  	defer cancel()
   525  
   526  	sendStates := []connectivity.State{
   527  		connectivity.Connecting,
   528  		connectivity.Ready,
   529  		connectivity.Idle,
   530  		connectivity.Connecting,
   531  		connectivity.Idle,
   532  		connectivity.Connecting,
   533  		connectivity.Ready,
   534  	}
   535  	wantStates := append(sendStates, connectivity.Shutdown)
   536  
   537  	const testBalName = "any"
   538  	bf := stub.BalancerFuncs{
   539  		UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
   540  			// Send the expected states in rapid succession.
   541  			for _, s := range sendStates {
   542  				t.Logf("Sending state update %s", s)
   543  				bd.ClientConn.UpdateState(balancer.State{ConnectivityState: s})
   544  			}
   545  			return nil
   546  		},
   547  	}
   548  	stub.Register(testBalName, bf)
   549  
   550  	// Create the ClientConn.
   551  	const testResName = "any"
   552  	rb := manual.NewBuilderWithScheme(testResName)
   553  	cc, err := grpc.Dial(testResName+":///",
   554  		grpc.WithResolvers(rb),
   555  		grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalName)),
   556  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   557  	)
   558  	if err != nil {
   559  		t.Fatalf("Unexpected error from grpc.Dial: %v", err)
   560  	}
   561  
   562  	// Subscribe to state updates.  Use a buffer size of 1 to allow the
   563  	// Shutdown state to go into the channel when Close()ing.
   564  	connCh := make(chan connectivity.State, 1)
   565  	s := &funcConnectivityStateSubscriber{
   566  		onMsg: func(s connectivity.State) {
   567  			select {
   568  			case connCh <- s:
   569  			case <-ctx.Done():
   570  			}
   571  			if s == connectivity.Shutdown {
   572  				close(connCh)
   573  			}
   574  		},
   575  	}
   576  
   577  	internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)
   578  
   579  	// Send an update from the resolver that will trigger the LB policy's UpdateClientConnState.
   580  	go rb.UpdateState(resolver.State{})
   581  
   582  	// Verify the resulting states.
   583  	for i, want := range wantStates {
   584  		if i == len(sendStates) {
   585  			// Trigger Shutdown to be sent by the channel.  Use a goroutine to
   586  			// ensure the operation does not block.
   587  			cc.Close()
   588  		}
   589  		select {
   590  		case got := <-connCh:
   591  			if got != want {
   592  				t.Errorf("Update %v was %s; want %s", i, got, want)
   593  			} else {
   594  				t.Logf("Update %v was %s as expected", i, got)
   595  			}
   596  		case <-ctx.Done():
   597  			t.Fatalf("Timed out waiting for state update %v: %s", i, want)
   598  		}
   599  	}
   600  }
   601  

View as plain text