...

Source file src/google.golang.org/grpc/xds/internal/balancer/ringhash/picker_test.go

Documentation: google.golang.org/grpc/xds/internal/balancer/ringhash

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package ringhash
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"testing"
    25  	"time"
    26  
    27  	"github.com/google/go-cmp/cmp"
    28  	"google.golang.org/grpc/balancer"
    29  	"google.golang.org/grpc/connectivity"
    30  	"google.golang.org/grpc/grpclog"
    31  	igrpclog "google.golang.org/grpc/internal/grpclog"
    32  	"google.golang.org/grpc/internal/testutils"
    33  )
    34  
    35  var testSubConns []*testutils.TestSubConn
    36  
    37  func init() {
    38  	for i := 0; i < 8; i++ {
    39  		testSubConns = append(testSubConns, testutils.NewTestSubConn(fmt.Sprint(i)))
    40  	}
    41  }
    42  
    43  func newTestRing(cStats []connectivity.State) *ring {
    44  	var items []*ringEntry
    45  	for i, st := range cStats {
    46  		testSC := testSubConns[i]
    47  		items = append(items, &ringEntry{
    48  			idx:  i,
    49  			hash: uint64((i + 1) * 10),
    50  			sc: &subConn{
    51  				addr:  testSC.String(),
    52  				sc:    testSC,
    53  				state: st,
    54  			},
    55  		})
    56  	}
    57  	return &ring{items: items}
    58  }
    59  
    60  func (s) TestPickerPickFirstTwo(t *testing.T) {
    61  	tests := []struct {
    62  		name            string
    63  		ring            *ring
    64  		hash            uint64
    65  		wantSC          balancer.SubConn
    66  		wantErr         error
    67  		wantSCToConnect balancer.SubConn
    68  	}{
    69  		{
    70  			name:   "picked is Ready",
    71  			ring:   newTestRing([]connectivity.State{connectivity.Ready, connectivity.Idle}),
    72  			hash:   5,
    73  			wantSC: testSubConns[0],
    74  		},
    75  		{
    76  			name:    "picked is connecting, queue",
    77  			ring:    newTestRing([]connectivity.State{connectivity.Connecting, connectivity.Idle}),
    78  			hash:    5,
    79  			wantErr: balancer.ErrNoSubConnAvailable,
    80  		},
    81  		{
    82  			name:            "picked is Idle, connect and queue",
    83  			ring:            newTestRing([]connectivity.State{connectivity.Idle, connectivity.Idle}),
    84  			hash:            5,
    85  			wantErr:         balancer.ErrNoSubConnAvailable,
    86  			wantSCToConnect: testSubConns[0],
    87  		},
    88  		{
    89  			name:   "picked is TransientFailure, next is ready, return",
    90  			ring:   newTestRing([]connectivity.State{connectivity.TransientFailure, connectivity.Ready}),
    91  			hash:   5,
    92  			wantSC: testSubConns[1],
    93  		},
    94  		{
    95  			name:    "picked is TransientFailure, next is connecting, queue",
    96  			ring:    newTestRing([]connectivity.State{connectivity.TransientFailure, connectivity.Connecting}),
    97  			hash:    5,
    98  			wantErr: balancer.ErrNoSubConnAvailable,
    99  		},
   100  		{
   101  			name:            "picked is TransientFailure, next is Idle, connect and queue",
   102  			ring:            newTestRing([]connectivity.State{connectivity.TransientFailure, connectivity.Idle}),
   103  			hash:            5,
   104  			wantErr:         balancer.ErrNoSubConnAvailable,
   105  			wantSCToConnect: testSubConns[1],
   106  		},
   107  	}
   108  	for _, tt := range tests {
   109  		t.Run(tt.name, func(t *testing.T) {
   110  			p := newPicker(tt.ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
   111  			got, err := p.Pick(balancer.PickInfo{
   112  				Ctx: SetRequestHash(context.Background(), tt.hash),
   113  			})
   114  			if err != tt.wantErr {
   115  				t.Errorf("Pick() error = %v, wantErr %v", err, tt.wantErr)
   116  				return
   117  			}
   118  			if got.SubConn != tt.wantSC {
   119  				t.Errorf("Pick() got = %v, want picked SubConn: %v", got, tt.wantSC)
   120  			}
   121  			if sc := tt.wantSCToConnect; sc != nil {
   122  				select {
   123  				case <-sc.(*testutils.TestSubConn).ConnectCh:
   124  				case <-time.After(defaultTestShortTimeout):
   125  					t.Errorf("timeout waiting for Connect() from SubConn %v", sc)
   126  				}
   127  			}
   128  		})
   129  	}
   130  }
   131  
   132  // TestPickerPickTriggerTFConnect covers that if the picked SubConn is
   133  // TransientFailures, all SubConns until a non-TransientFailure are queued for
   134  // Connect().
   135  func (s) TestPickerPickTriggerTFConnect(t *testing.T) {
   136  	ring := newTestRing([]connectivity.State{
   137  		connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure,
   138  		connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure,
   139  	})
   140  	p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
   141  	_, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
   142  	if err == nil {
   143  		t.Fatalf("Pick() error = %v, want non-nil", err)
   144  	}
   145  	// The first 4 SubConns, all in TransientFailure, should be queued to
   146  	// connect.
   147  	for i := 0; i < 4; i++ {
   148  		it := ring.items[i]
   149  		if !it.sc.connectQueued {
   150  			t.Errorf("the %d-th SubConn is not queued for connect", i)
   151  		}
   152  	}
   153  	// The other SubConns, after the first Idle, should not be queued to
   154  	// connect.
   155  	for i := 5; i < len(ring.items); i++ {
   156  		it := ring.items[i]
   157  		if it.sc.connectQueued {
   158  			t.Errorf("the %d-th SubConn is unexpected queued for connect", i)
   159  		}
   160  	}
   161  }
   162  
   163  // TestPickerPickTriggerTFReturnReady covers that if the picked SubConn is
   164  // TransientFailure, SubConn 2 and 3 are TransientFailure, 4 is Ready. SubConn 2
   165  // and 3 will Connect(), and 4 will be returned.
   166  func (s) TestPickerPickTriggerTFReturnReady(t *testing.T) {
   167  	ring := newTestRing([]connectivity.State{
   168  		connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Ready,
   169  	})
   170  	p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
   171  	pr, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
   172  	if err != nil {
   173  		t.Fatalf("Pick() error = %v, want nil", err)
   174  	}
   175  	if wantSC := testSubConns[3]; pr.SubConn != wantSC {
   176  		t.Fatalf("Pick() = %v, want %v", pr.SubConn, wantSC)
   177  	}
   178  	// The first 3 SubConns, all in TransientFailure, should be queued to
   179  	// connect.
   180  	for i := 0; i < 3; i++ {
   181  		it := ring.items[i]
   182  		if !it.sc.connectQueued {
   183  			t.Errorf("the %d-th SubConn is not queued for connect", i)
   184  		}
   185  	}
   186  }
   187  
   188  // TestPickerPickTriggerTFWithIdle covers that if the picked SubConn is
   189  // TransientFailure, SubConn 2 is TransientFailure, 3 is Idle (init Idle). Pick
   190  // will be queue, SubConn 3 will Connect(), SubConn 4 and 5 (in TransientFailure)
   191  // will not queue a Connect.
   192  func (s) TestPickerPickTriggerTFWithIdle(t *testing.T) {
   193  	ring := newTestRing([]connectivity.State{
   194  		connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure,
   195  	})
   196  	p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
   197  	_, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
   198  	if err == balancer.ErrNoSubConnAvailable {
   199  		t.Fatalf("Pick() error = %v, want %v", err, balancer.ErrNoSubConnAvailable)
   200  	}
   201  	// The first 2 SubConns, all in TransientFailure, should be queued to
   202  	// connect.
   203  	for i := 0; i < 2; i++ {
   204  		it := ring.items[i]
   205  		if !it.sc.connectQueued {
   206  			t.Errorf("the %d-th SubConn is not queued for connect", i)
   207  		}
   208  	}
   209  	// SubConn 3 was in Idle, so should Connect()
   210  	select {
   211  	case <-testSubConns[2].ConnectCh:
   212  	case <-time.After(defaultTestShortTimeout):
   213  		t.Errorf("timeout waiting for Connect() from SubConn %v", testSubConns[2])
   214  	}
   215  	// The other SubConns, after the first Idle, should not be queued to
   216  	// connect.
   217  	for i := 3; i < len(ring.items); i++ {
   218  		it := ring.items[i]
   219  		if it.sc.connectQueued {
   220  			t.Errorf("the %d-th SubConn is unexpected queued for connect", i)
   221  		}
   222  	}
   223  }
   224  
   225  func (s) TestNextSkippingDuplicatesNoDup(t *testing.T) {
   226  	testRing := newTestRing([]connectivity.State{connectivity.Idle, connectivity.Idle})
   227  	tests := []struct {
   228  		name string
   229  		ring *ring
   230  		cur  *ringEntry
   231  		want *ringEntry
   232  	}{
   233  		{
   234  			name: "no dup",
   235  			ring: testRing,
   236  			cur:  testRing.items[0],
   237  			want: testRing.items[1],
   238  		},
   239  		{
   240  			name: "only one entry",
   241  			ring: &ring{items: []*ringEntry{testRing.items[0]}},
   242  			cur:  testRing.items[0],
   243  			want: nil,
   244  		},
   245  	}
   246  	for _, tt := range tests {
   247  		t.Run(tt.name, func(t *testing.T) {
   248  			if got := nextSkippingDuplicates(tt.ring, tt.cur); !cmp.Equal(got, tt.want, cmpOpts) {
   249  				t.Errorf("nextSkippingDuplicates() = %v, want %v", got, tt.want)
   250  			}
   251  		})
   252  	}
   253  }
   254  
   255  // addDups adds duplicates of items[0] to the ring.
   256  func addDups(r *ring, count int) *ring {
   257  	var (
   258  		items []*ringEntry
   259  		idx   int
   260  	)
   261  	for i, it := range r.items {
   262  		itt := *it
   263  		itt.idx = idx
   264  		items = append(items, &itt)
   265  		idx++
   266  		if i == 0 {
   267  			// Add duplicate of items[0] to the ring
   268  			for j := 0; j < count; j++ {
   269  				itt2 := *it
   270  				itt2.idx = idx
   271  				items = append(items, &itt2)
   272  				idx++
   273  			}
   274  		}
   275  	}
   276  	return &ring{items: items}
   277  }
   278  
   279  func (s) TestNextSkippingDuplicatesMoreDup(t *testing.T) {
   280  	testRing := newTestRing([]connectivity.State{connectivity.Idle, connectivity.Idle})
   281  	// Make a new ring with duplicate SubConns.
   282  	dupTestRing := addDups(testRing, 3)
   283  	if got := nextSkippingDuplicates(dupTestRing, dupTestRing.items[0]); !cmp.Equal(got, dupTestRing.items[len(dupTestRing.items)-1], cmpOpts) {
   284  		t.Errorf("nextSkippingDuplicates() = %v, want %v", got, dupTestRing.items[len(dupTestRing.items)-1])
   285  	}
   286  }
   287  
   288  func (s) TestNextSkippingDuplicatesOnlyDup(t *testing.T) {
   289  	testRing := newTestRing([]connectivity.State{connectivity.Idle})
   290  	// Make a new ring with only duplicate SubConns.
   291  	dupTestRing := addDups(testRing, 3)
   292  	// This ring only has duplicates of items[0], should return nil.
   293  	if got := nextSkippingDuplicates(dupTestRing, dupTestRing.items[0]); got != nil {
   294  		t.Errorf("nextSkippingDuplicates() = %v, want nil", got)
   295  	}
   296  }
   297  

View as plain text