...

Source file src/google.golang.org/grpc/balancer/grpclb/grpclb_util_test.go

Documentation: google.golang.org/grpc/balancer/grpclb

     1  /*
     2   *
     3   * Copyright 2018 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package grpclb
    20  
    21  import (
    22  	"fmt"
    23  	"sync"
    24  	"testing"
    25  	"time"
    26  
    27  	"google.golang.org/grpc/balancer"
    28  	"google.golang.org/grpc/resolver"
    29  )
    30  
    31  type mockSubConn struct {
    32  	balancer.SubConn
    33  	mcc *mockClientConn
    34  }
    35  
    36  func (msc *mockSubConn) Shutdown() {
    37  	msc.mcc.mu.Lock()
    38  	defer msc.mcc.mu.Unlock()
    39  	delete(msc.mcc.subConns, msc)
    40  }
    41  
    42  type mockClientConn struct {
    43  	balancer.ClientConn
    44  
    45  	mu       sync.Mutex
    46  	subConns map[balancer.SubConn]resolver.Address
    47  }
    48  
    49  func newMockClientConn() *mockClientConn {
    50  	return &mockClientConn{
    51  		subConns: make(map[balancer.SubConn]resolver.Address),
    52  	}
    53  }
    54  
    55  func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
    56  	sc := &mockSubConn{mcc: mcc}
    57  	mcc.mu.Lock()
    58  	defer mcc.mu.Unlock()
    59  	mcc.subConns[sc] = addrs[0]
    60  	return sc, nil
    61  }
    62  
    63  func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
    64  	panic(fmt.Sprintf("RemoveSubConn(%v) called unexpectedly", sc))
    65  }
    66  
    67  const testCacheTimeout = 100 * time.Millisecond
    68  
    69  func checkMockCC(mcc *mockClientConn, scLen int) error {
    70  	mcc.mu.Lock()
    71  	defer mcc.mu.Unlock()
    72  	if len(mcc.subConns) != scLen {
    73  		return fmt.Errorf("mcc = %+v, want len(mcc.subConns) = %v", mcc.subConns, scLen)
    74  	}
    75  	return nil
    76  }
    77  
    78  func checkCacheCC(ccc *lbCacheClientConn, sccLen, sctaLen int) error {
    79  	ccc.mu.Lock()
    80  	defer ccc.mu.Unlock()
    81  	if len(ccc.subConnCache) != sccLen {
    82  		return fmt.Errorf("ccc = %+v, want len(ccc.subConnCache) = %v", ccc.subConnCache, sccLen)
    83  	}
    84  	if len(ccc.subConnToAddr) != sctaLen {
    85  		return fmt.Errorf("ccc = %+v, want len(ccc.subConnToAddr) = %v", ccc.subConnToAddr, sctaLen)
    86  	}
    87  	return nil
    88  }
    89  
    90  // Test that SubConn won't be immediately shut down.
    91  func (s) TestLBCacheClientConnExpire(t *testing.T) {
    92  	mcc := newMockClientConn()
    93  	if err := checkMockCC(mcc, 0); err != nil {
    94  		t.Fatal(err)
    95  	}
    96  
    97  	ccc := newLBCacheClientConn(mcc)
    98  	ccc.timeout = testCacheTimeout
    99  	if err := checkCacheCC(ccc, 0, 0); err != nil {
   100  		t.Fatal(err)
   101  	}
   102  
   103  	sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
   104  	// One subconn in MockCC.
   105  	if err := checkMockCC(mcc, 1); err != nil {
   106  		t.Fatal(err)
   107  	}
   108  	// No subconn being deleted, and one in CacheCC.
   109  	if err := checkCacheCC(ccc, 0, 1); err != nil {
   110  		t.Fatal(err)
   111  	}
   112  
   113  	sc.Shutdown()
   114  	// One subconn in MockCC before timeout.
   115  	if err := checkMockCC(mcc, 1); err != nil {
   116  		t.Fatal(err)
   117  	}
   118  	// One subconn being deleted, and one in CacheCC.
   119  	if err := checkCacheCC(ccc, 1, 1); err != nil {
   120  		t.Fatal(err)
   121  	}
   122  
   123  	// Should all become empty after timeout.
   124  	var err error
   125  	for i := 0; i < 2; i++ {
   126  		time.Sleep(testCacheTimeout)
   127  		err = checkMockCC(mcc, 0)
   128  		if err != nil {
   129  			continue
   130  		}
   131  		err = checkCacheCC(ccc, 0, 0)
   132  		if err != nil {
   133  			continue
   134  		}
   135  	}
   136  	if err != nil {
   137  		t.Fatal(err)
   138  	}
   139  }
   140  
   141  // Test that NewSubConn with the same address of a SubConn being shut down will
   142  // reuse the SubConn and cancel the removing.
   143  func (s) TestLBCacheClientConnReuse(t *testing.T) {
   144  	mcc := newMockClientConn()
   145  	if err := checkMockCC(mcc, 0); err != nil {
   146  		t.Fatal(err)
   147  	}
   148  
   149  	ccc := newLBCacheClientConn(mcc)
   150  	ccc.timeout = testCacheTimeout
   151  	if err := checkCacheCC(ccc, 0, 0); err != nil {
   152  		t.Fatal(err)
   153  	}
   154  
   155  	sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
   156  	// One subconn in MockCC.
   157  	if err := checkMockCC(mcc, 1); err != nil {
   158  		t.Fatal(err)
   159  	}
   160  	// No subconn being deleted, and one in CacheCC.
   161  	if err := checkCacheCC(ccc, 0, 1); err != nil {
   162  		t.Fatal(err)
   163  	}
   164  
   165  	sc.Shutdown()
   166  	// One subconn in MockCC before timeout.
   167  	if err := checkMockCC(mcc, 1); err != nil {
   168  		t.Fatal(err)
   169  	}
   170  	// One subconn being deleted, and one in CacheCC.
   171  	if err := checkCacheCC(ccc, 1, 1); err != nil {
   172  		t.Fatal(err)
   173  	}
   174  
   175  	// Recreate the old subconn, this should cancel the deleting process.
   176  	sc, _ = ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
   177  	// One subconn in MockCC.
   178  	if err := checkMockCC(mcc, 1); err != nil {
   179  		t.Fatal(err)
   180  	}
   181  	// No subconn being deleted, and one in CacheCC.
   182  	if err := checkCacheCC(ccc, 0, 1); err != nil {
   183  		t.Fatal(err)
   184  	}
   185  
   186  	var err error
   187  	// Should not become empty after 2*timeout.
   188  	time.Sleep(2 * testCacheTimeout)
   189  	err = checkMockCC(mcc, 1)
   190  	if err != nil {
   191  		t.Fatal(err)
   192  	}
   193  	err = checkCacheCC(ccc, 0, 1)
   194  	if err != nil {
   195  		t.Fatal(err)
   196  	}
   197  
   198  	// Call Shutdown again, will delete after timeout.
   199  	sc.Shutdown()
   200  	// One subconn in MockCC before timeout.
   201  	if err := checkMockCC(mcc, 1); err != nil {
   202  		t.Fatal(err)
   203  	}
   204  	// One subconn being deleted, and one in CacheCC.
   205  	if err := checkCacheCC(ccc, 1, 1); err != nil {
   206  		t.Fatal(err)
   207  	}
   208  
   209  	// Should all become empty after timeout.
   210  	for i := 0; i < 2; i++ {
   211  		time.Sleep(testCacheTimeout)
   212  		err = checkMockCC(mcc, 0)
   213  		if err != nil {
   214  			continue
   215  		}
   216  		err = checkCacheCC(ccc, 0, 0)
   217  		if err != nil {
   218  			continue
   219  		}
   220  	}
   221  	if err != nil {
   222  		t.Fatal(err)
   223  	}
   224  }
   225  
   226  // Test that if the timer to shut down a SubConn fires at the same time
   227  // NewSubConn cancels the timer, it doesn't cause deadlock.
   228  func (s) TestLBCache_ShutdownTimer_New_Race(t *testing.T) {
   229  	mcc := newMockClientConn()
   230  	if err := checkMockCC(mcc, 0); err != nil {
   231  		t.Fatal(err)
   232  	}
   233  
   234  	ccc := newLBCacheClientConn(mcc)
   235  	ccc.timeout = time.Nanosecond
   236  	if err := checkCacheCC(ccc, 0, 0); err != nil {
   237  		t.Fatal(err)
   238  	}
   239  
   240  	sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
   241  	// One subconn in MockCC.
   242  	if err := checkMockCC(mcc, 1); err != nil {
   243  		t.Fatal(err)
   244  	}
   245  	// No subconn being deleted, and one in CacheCC.
   246  	if err := checkCacheCC(ccc, 0, 1); err != nil {
   247  		t.Fatal(err)
   248  	}
   249  
   250  	done := make(chan struct{})
   251  
   252  	go func() {
   253  		for i := 0; i < 1000; i++ {
   254  			// Shutdown starts a timer with 1 ns timeout, the NewSubConn will
   255  			// race with with the timer.
   256  			sc.Shutdown()
   257  			sc, _ = ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
   258  		}
   259  		close(done)
   260  	}()
   261  
   262  	select {
   263  	case <-time.After(time.Second):
   264  		t.Fatalf("Test didn't finish within 1 second. Deadlock")
   265  	case <-done:
   266  	}
   267  }
   268  

View as plain text