...

Source file src/google.golang.org/grpc/test/channelz_test.go

Documentation: google.golang.org/grpc/test

     1  /*
     2   *
     3   * Copyright 2018 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package test
    20  
    21  import (
    22  	"context"
    23  	"crypto/tls"
    24  	"fmt"
    25  	"net"
    26  	"regexp"
    27  	"strings"
    28  	"sync"
    29  	"testing"
    30  	"time"
    31  
    32  	"github.com/google/go-cmp/cmp"
    33  	"golang.org/x/net/http2"
    34  	"google.golang.org/grpc"
    35  	_ "google.golang.org/grpc/balancer/grpclb"
    36  	grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
    37  	"google.golang.org/grpc/balancer/roundrobin"
    38  	"google.golang.org/grpc/codes"
    39  	"google.golang.org/grpc/connectivity"
    40  	"google.golang.org/grpc/credentials"
    41  	"google.golang.org/grpc/internal"
    42  	"google.golang.org/grpc/internal/channelz"
    43  	"google.golang.org/grpc/internal/stubserver"
    44  	"google.golang.org/grpc/internal/testutils"
    45  	"google.golang.org/grpc/keepalive"
    46  	"google.golang.org/grpc/resolver"
    47  	"google.golang.org/grpc/resolver/manual"
    48  	"google.golang.org/grpc/status"
    49  	"google.golang.org/grpc/testdata"
    50  
    51  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    52  	testpb "google.golang.org/grpc/interop/grpc_testing"
    53  )
    54  
    55  func verifyResultWithDelay(f func() (bool, error)) error {
    56  	var ok bool
    57  	var err error
    58  	for i := 0; i < 1000; i++ {
    59  		if ok, err = f(); ok {
    60  			return nil
    61  		}
    62  		time.Sleep(10 * time.Millisecond)
    63  	}
    64  	return err
    65  }
    66  
    67  func (s) TestCZServerRegistrationAndDeletion(t *testing.T) {
    68  	testcases := []struct {
    69  		total  int
    70  		start  int64
    71  		max    int
    72  		length int
    73  		end    bool
    74  	}{
    75  		{total: int(channelz.EntriesPerPage), start: 0, max: 0, length: channelz.EntriesPerPage, end: true},
    76  		{total: int(channelz.EntriesPerPage) - 1, start: 0, max: 0, length: channelz.EntriesPerPage - 1, end: true},
    77  		{total: int(channelz.EntriesPerPage) + 1, start: 0, max: 0, length: channelz.EntriesPerPage, end: false},
    78  		{total: int(channelz.EntriesPerPage) + 1, start: int64(2*(channelz.EntriesPerPage+1) + 1), max: 0, length: 0, end: true},
    79  		{total: int(channelz.EntriesPerPage), start: 0, max: 1, length: 1, end: false},
    80  		{total: int(channelz.EntriesPerPage), start: 0, max: channelz.EntriesPerPage - 1, length: channelz.EntriesPerPage - 1, end: false},
    81  	}
    82  
    83  	for i, c := range testcases {
    84  		// Reset channelz IDs so `start` is valid.
    85  		channelz.IDGen.Reset()
    86  
    87  		e := tcpClearRREnv
    88  		te := newTest(t, e)
    89  		te.startServers(&testServer{security: e.security}, c.total)
    90  
    91  		ss, end := channelz.GetServers(c.start, c.max)
    92  		if len(ss) != c.length || end != c.end {
    93  			t.Fatalf("%d: GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", i, c.start, ss, len(ss), end, c.start, c.length, c.end)
    94  		}
    95  		te.tearDown()
    96  		ss, end = channelz.GetServers(c.start, c.max)
    97  		if len(ss) != 0 || !end {
    98  			t.Fatalf("%d: GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", i, ss, len(ss), end)
    99  		}
   100  	}
   101  }
   102  
   103  func (s) TestCZGetChannel(t *testing.T) {
   104  	e := tcpClearRREnv
   105  	e.balancer = ""
   106  	te := newTest(t, e)
   107  	te.startServer(&testServer{security: e.security})
   108  	r := manual.NewBuilderWithScheme("whatever")
   109  	addrs := []resolver.Address{{Addr: te.srvAddr}}
   110  	r.InitialState(resolver.State{Addresses: addrs})
   111  	te.resolverScheme = r.Scheme()
   112  	te.clientConn(grpc.WithResolvers(r))
   113  	defer te.tearDown()
   114  	if err := verifyResultWithDelay(func() (bool, error) {
   115  		tcs, _ := channelz.GetTopChannels(0, 0)
   116  		if len(tcs) != 1 {
   117  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
   118  		}
   119  		target := tcs[0].ChannelMetrics.Target.Load()
   120  		wantTarget := "whatever:///" + te.srvAddr
   121  		if target == nil || *target != wantTarget {
   122  			return false, fmt.Errorf("Got channelz target=%v; want %q", target, wantTarget)
   123  		}
   124  		state := tcs[0].ChannelMetrics.State.Load()
   125  		if state == nil || *state != connectivity.Ready {
   126  			return false, fmt.Errorf("Got channelz state=%v; want %q", state, connectivity.Ready)
   127  		}
   128  		return true, nil
   129  	}); err != nil {
   130  		t.Fatal(err)
   131  	}
   132  }
   133  
   134  func (s) TestCZGetSubChannel(t *testing.T) {
   135  	e := tcpClearRREnv
   136  	e.balancer = ""
   137  	te := newTest(t, e)
   138  	te.startServer(&testServer{security: e.security})
   139  	r := manual.NewBuilderWithScheme("whatever")
   140  	addrs := []resolver.Address{{Addr: te.srvAddr}}
   141  	r.InitialState(resolver.State{Addresses: addrs})
   142  	te.resolverScheme = r.Scheme()
   143  	te.clientConn(grpc.WithResolvers(r))
   144  	defer te.tearDown()
   145  	if err := verifyResultWithDelay(func() (bool, error) {
   146  		tcs, _ := channelz.GetTopChannels(0, 0)
   147  		if len(tcs) != 1 {
   148  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
   149  		}
   150  		scs := tcs[0].SubChans()
   151  		if len(scs) != 1 {
   152  			return false, fmt.Errorf("there should be one subchannel, not %d", len(scs))
   153  		}
   154  		var scid int64
   155  		for scid = range scs {
   156  		}
   157  		sc := channelz.GetSubChannel(scid)
   158  		if sc == nil {
   159  			return false, fmt.Errorf("subchannel with id %v is nil", scid)
   160  		}
   161  		target := sc.ChannelMetrics.Target.Load()
   162  		if target == nil || !strings.HasPrefix(*target, "localhost") {
   163  			t.Fatalf("subchannel target must never be set incorrectly; got: %v, want <HasPrefix('localhost')>", target)
   164  		}
   165  		state := sc.ChannelMetrics.State.Load()
   166  		if state == nil || *state != connectivity.Ready {
   167  			return false, fmt.Errorf("Got subchannel state=%v; want %q", state, connectivity.Ready)
   168  		}
   169  		return true, nil
   170  	}); err != nil {
   171  		t.Fatal(err)
   172  	}
   173  }
   174  
   175  func (s) TestCZGetServer(t *testing.T) {
   176  	e := tcpClearRREnv
   177  	te := newTest(t, e)
   178  	te.startServer(&testServer{security: e.security})
   179  	defer te.tearDown()
   180  
   181  	ss, _ := channelz.GetServers(0, 0)
   182  	if len(ss) != 1 {
   183  		t.Fatalf("there should only be one server, not %d", len(ss))
   184  	}
   185  
   186  	serverID := ss[0].ID
   187  	srv := channelz.GetServer(serverID)
   188  	if srv == nil {
   189  		t.Fatalf("server %d does not exist", serverID)
   190  	}
   191  	if srv.ID != serverID {
   192  		t.Fatalf("server want id %d, but got %d", serverID, srv.ID)
   193  	}
   194  
   195  	te.tearDown()
   196  
   197  	if err := verifyResultWithDelay(func() (bool, error) {
   198  		srv := channelz.GetServer(serverID)
   199  		if srv != nil {
   200  			return false, fmt.Errorf("server %d should not exist", serverID)
   201  		}
   202  
   203  		return true, nil
   204  	}); err != nil {
   205  		t.Fatal(err)
   206  	}
   207  }
   208  
   209  func (s) TestCZGetSocket(t *testing.T) {
   210  	e := tcpClearRREnv
   211  	te := newTest(t, e)
   212  	lis := te.listenAndServe(&testServer{security: e.security}, net.Listen)
   213  	defer te.tearDown()
   214  
   215  	if err := verifyResultWithDelay(func() (bool, error) {
   216  		ss, _ := channelz.GetServers(0, 0)
   217  		if len(ss) != 1 {
   218  			return false, fmt.Errorf("len(ss) = %v; want %v", len(ss), 1)
   219  		}
   220  
   221  		serverID := ss[0].ID
   222  		srv := channelz.GetServer(serverID)
   223  		if srv == nil {
   224  			return false, fmt.Errorf("server %d does not exist", serverID)
   225  		}
   226  		if srv.ID != serverID {
   227  			return false, fmt.Errorf("srv.ID = %d; want %v", srv.ID, serverID)
   228  		}
   229  
   230  		skts := srv.ListenSockets()
   231  		if got, want := len(skts), 1; got != want {
   232  			return false, fmt.Errorf("len(skts) = %v; want %v", got, want)
   233  		}
   234  		var sktID int64
   235  		for sktID = range skts {
   236  		}
   237  
   238  		skt := channelz.GetSocket(sktID)
   239  		if skt == nil {
   240  			return false, fmt.Errorf("socket %v does not exist", sktID)
   241  		}
   242  
   243  		if got, want := skt.LocalAddr, lis.Addr(); got != want {
   244  			return false, fmt.Errorf("socket %v LocalAddr=%v; want %v", sktID, got, want)
   245  		}
   246  		return true, nil
   247  	}); err != nil {
   248  		t.Fatal(err)
   249  	}
   250  }
   251  
   252  func (s) TestCZTopChannelRegistrationAndDeletion(t *testing.T) {
   253  	testcases := []struct {
   254  		total  int
   255  		start  int64
   256  		max    int
   257  		length int
   258  		end    bool
   259  	}{
   260  		{total: int(channelz.EntriesPerPage), start: 0, max: 0, length: channelz.EntriesPerPage, end: true},
   261  		{total: int(channelz.EntriesPerPage) - 1, start: 0, max: 0, length: channelz.EntriesPerPage - 1, end: true},
   262  		{total: int(channelz.EntriesPerPage) + 1, start: 0, max: 0, length: channelz.EntriesPerPage, end: false},
   263  		{total: int(channelz.EntriesPerPage) + 1, start: int64(2*(channelz.EntriesPerPage+1) + 1), max: 0, length: 0, end: true},
   264  		{total: int(channelz.EntriesPerPage), start: 0, max: 1, length: 1, end: false},
   265  		{total: int(channelz.EntriesPerPage), start: 0, max: channelz.EntriesPerPage - 1, length: channelz.EntriesPerPage - 1, end: false},
   266  	}
   267  
   268  	for _, c := range testcases {
   269  		// Reset channelz IDs so `start` is valid.
   270  		channelz.IDGen.Reset()
   271  
   272  		e := tcpClearRREnv
   273  		te := newTest(t, e)
   274  		var ccs []*grpc.ClientConn
   275  		for i := 0; i < c.total; i++ {
   276  			cc := te.clientConn()
   277  			te.cc = nil
   278  			// avoid making next dial blocking
   279  			te.srvAddr = ""
   280  			ccs = append(ccs, cc)
   281  		}
   282  		if err := verifyResultWithDelay(func() (bool, error) {
   283  			if tcs, end := channelz.GetTopChannels(c.start, c.max); len(tcs) != c.length || end != c.end {
   284  				return false, fmt.Errorf("getTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end)
   285  			}
   286  			return true, nil
   287  		}); err != nil {
   288  			t.Fatal(err)
   289  		}
   290  
   291  		for _, cc := range ccs {
   292  			cc.Close()
   293  		}
   294  
   295  		if err := verifyResultWithDelay(func() (bool, error) {
   296  			if tcs, end := channelz.GetTopChannels(c.start, c.max); len(tcs) != 0 || !end {
   297  				return false, fmt.Errorf("getTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end)
   298  			}
   299  			return true, nil
   300  		}); err != nil {
   301  			t.Fatal(err)
   302  		}
   303  		te.tearDown()
   304  	}
   305  }
   306  
   307  func (s) TestCZTopChannelRegistrationAndDeletionWhenDialFail(t *testing.T) {
   308  	// Make dial fails (due to no transport security specified)
   309  	_, err := grpc.NewClient("fake.addr")
   310  	if err == nil {
   311  		t.Fatal("expecting dial to fail")
   312  	}
   313  	if tcs, end := channelz.GetTopChannels(0, 0); tcs != nil || !end {
   314  		t.Fatalf("GetTopChannels(0, 0) = %v, %v, want <nil>, true", tcs, end)
   315  	}
   316  }
   317  
   318  func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
   319  	e := tcpClearRREnv
   320  	// avoid calling API to set balancer type, which will void service config's change of balancer.
   321  	e.balancer = ""
   322  	te := newTest(t, e)
   323  	r := manual.NewBuilderWithScheme("whatever")
   324  	te.resolverScheme = r.Scheme()
   325  	te.clientConn(grpc.WithResolvers(r))
   326  	resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", ServerName: "grpclb.server"}}
   327  	grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
   328  	r.UpdateState(grpclbstate.Set(resolver.State{ServiceConfig: grpclbConfig}, &grpclbstate.State{BalancerAddresses: resolvedAddrs}))
   329  	defer te.tearDown()
   330  
   331  	if err := verifyResultWithDelay(func() (bool, error) {
   332  		tcs, _ := channelz.GetTopChannels(0, 0)
   333  		if len(tcs) != 1 {
   334  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
   335  		}
   336  		if nestedChans := tcs[0].NestedChans(); len(nestedChans) != 1 {
   337  			return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(nestedChans))
   338  		}
   339  		return true, nil
   340  	}); err != nil {
   341  		t.Fatal(err)
   342  	}
   343  
   344  	r.UpdateState(resolver.State{
   345  		Addresses:     []resolver.Address{{Addr: "127.0.0.1:0"}},
   346  		ServiceConfig: parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`),
   347  	})
   348  
   349  	// wait for the shutdown of grpclb balancer
   350  	if err := verifyResultWithDelay(func() (bool, error) {
   351  		tcs, _ := channelz.GetTopChannels(0, 0)
   352  		if len(tcs) != 1 {
   353  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
   354  		}
   355  		if nestedChans := tcs[0].NestedChans(); len(nestedChans) != 0 {
   356  			return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(nestedChans))
   357  		}
   358  		return true, nil
   359  	}); err != nil {
   360  		t.Fatal(err)
   361  	}
   362  }
   363  
   364  func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
   365  	e := tcpClearRREnv
   366  	num := 3 // number of backends
   367  	te := newTest(t, e)
   368  	var svrAddrs []resolver.Address
   369  	te.startServers(&testServer{security: e.security}, num)
   370  	r := manual.NewBuilderWithScheme("whatever")
   371  	for _, a := range te.srvAddrs {
   372  		svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
   373  	}
   374  	r.InitialState(resolver.State{Addresses: svrAddrs})
   375  	te.resolverScheme = r.Scheme()
   376  	te.clientConn(grpc.WithResolvers(r))
   377  	defer te.tearDown()
   378  	// Here, we just wait for all sockets to be up. In the future, if we implement
   379  	// IDLE, we may need to make several rpc calls to create the sockets.
   380  	if err := verifyResultWithDelay(func() (bool, error) {
   381  		tcs, _ := channelz.GetTopChannels(0, 0)
   382  		if len(tcs) != 1 {
   383  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
   384  		}
   385  		subChans := tcs[0].SubChans()
   386  		if len(subChans) != num {
   387  			return false, fmt.Errorf("there should be %d subchannel not %d", num, len(subChans))
   388  		}
   389  		count := 0
   390  		for k := range subChans {
   391  			sc := channelz.GetSubChannel(k)
   392  			if sc == nil {
   393  				return false, fmt.Errorf("got <nil> subchannel")
   394  			}
   395  			count += len(sc.Sockets())
   396  		}
   397  		if count != num {
   398  			return false, fmt.Errorf("there should be %d sockets not %d", num, count)
   399  		}
   400  
   401  		return true, nil
   402  	}); err != nil {
   403  		t.Fatal(err)
   404  	}
   405  
   406  	r.UpdateState(resolver.State{Addresses: svrAddrs[:len(svrAddrs)-1]})
   407  
   408  	if err := verifyResultWithDelay(func() (bool, error) {
   409  		tcs, _ := channelz.GetTopChannels(0, 0)
   410  		if len(tcs) != 1 {
   411  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
   412  		}
   413  		subChans := tcs[0].SubChans()
   414  		if len(subChans) != num-1 {
   415  			return false, fmt.Errorf("there should be %d subchannel not %d", num-1, len(subChans))
   416  		}
   417  		count := 0
   418  		for k := range subChans {
   419  			sc := channelz.GetSubChannel(k)
   420  			if sc == nil {
   421  				return false, fmt.Errorf("got <nil> subchannel")
   422  			}
   423  			count += len(sc.Sockets())
   424  		}
   425  		if count != num-1 {
   426  			return false, fmt.Errorf("there should be %d sockets not %d", num-1, count)
   427  		}
   428  
   429  		return true, nil
   430  	}); err != nil {
   431  		t.Fatal(err)
   432  	}
   433  }
   434  
   435  func (s) TestCZServerSocketRegistrationAndDeletion(t *testing.T) {
   436  	testcases := []struct {
   437  		total  int
   438  		start  int64
   439  		max    int
   440  		length int
   441  		end    bool
   442  	}{
   443  		{total: int(channelz.EntriesPerPage), start: 0, max: 0, length: channelz.EntriesPerPage, end: true},
   444  		{total: int(channelz.EntriesPerPage) - 1, start: 0, max: 0, length: channelz.EntriesPerPage - 1, end: true},
   445  		{total: int(channelz.EntriesPerPage) + 1, start: 0, max: 0, length: channelz.EntriesPerPage, end: false},
   446  		{total: int(channelz.EntriesPerPage), start: 1, max: 0, length: channelz.EntriesPerPage - 1, end: true},
   447  		{total: int(channelz.EntriesPerPage) + 1, start: int64(channelz.EntriesPerPage) + 1, max: 0, length: 0, end: true},
   448  		{total: int(channelz.EntriesPerPage), start: 0, max: 1, length: 1, end: false},
   449  		{total: int(channelz.EntriesPerPage), start: 0, max: channelz.EntriesPerPage - 1, length: channelz.EntriesPerPage - 1, end: false},
   450  	}
   451  
   452  	for _, c := range testcases {
   453  		// Reset channelz IDs so `start` is valid.
   454  		channelz.IDGen.Reset()
   455  
   456  		e := tcpClearRREnv
   457  		te := newTest(t, e)
   458  		te.startServer(&testServer{security: e.security})
   459  		var ccs []*grpc.ClientConn
   460  		for i := 0; i < c.total; i++ {
   461  			cc := te.clientConn()
   462  			te.cc = nil
   463  			ccs = append(ccs, cc)
   464  		}
   465  
   466  		var svrID int64
   467  		if err := verifyResultWithDelay(func() (bool, error) {
   468  			ss, _ := channelz.GetServers(0, 0)
   469  			if len(ss) != 1 {
   470  				return false, fmt.Errorf("there should only be one server, not %d", len(ss))
   471  			}
   472  			if got := len(ss[0].ListenSockets()); got != 1 {
   473  				return false, fmt.Errorf("there should only be one server listen socket, not %d", got)
   474  			}
   475  
   476  			startID := c.start
   477  			if startID != 0 {
   478  				ns, _ := channelz.GetServerSockets(ss[0].ID, 0, c.total)
   479  				if int64(len(ns)) < c.start {
   480  					return false, fmt.Errorf("there should more than %d sockets, not %d", len(ns), c.start)
   481  				}
   482  				startID = ns[c.start-1].ID + 1
   483  			}
   484  
   485  			ns, end := channelz.GetServerSockets(ss[0].ID, startID, c.max)
   486  			if len(ns) != c.length || end != c.end {
   487  				return false, fmt.Errorf("GetServerSockets(%d) = %+v (len of which: %d), end: %+v, want len(GetServerSockets(%d)) = %d, end: %+v", c.start, ns, len(ns), end, c.start, c.length, c.end)
   488  			}
   489  
   490  			svrID = ss[0].ID
   491  			return true, nil
   492  		}); err != nil {
   493  			t.Fatal(err)
   494  		}
   495  
   496  		for _, cc := range ccs {
   497  			cc.Close()
   498  		}
   499  
   500  		if err := verifyResultWithDelay(func() (bool, error) {
   501  			ns, _ := channelz.GetServerSockets(svrID, c.start, c.max)
   502  			if len(ns) != 0 {
   503  				return false, fmt.Errorf("there should be %d normal sockets not %d", 0, len(ns))
   504  			}
   505  			return true, nil
   506  		}); err != nil {
   507  			t.Fatal(err)
   508  		}
   509  		te.tearDown()
   510  	}
   511  }
   512  
   513  func (s) TestCZServerListenSocketDeletion(t *testing.T) {
   514  	s := grpc.NewServer()
   515  	lis, err := net.Listen("tcp", "localhost:0")
   516  	if err != nil {
   517  		t.Fatalf("failed to listen: %v", err)
   518  	}
   519  	go s.Serve(lis)
   520  	if err := verifyResultWithDelay(func() (bool, error) {
   521  		ss, _ := channelz.GetServers(0, 0)
   522  		if len(ss) != 1 {
   523  			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
   524  		}
   525  		skts := ss[0].ListenSockets()
   526  		if len(skts) != 1 {
   527  			return false, fmt.Errorf("there should only be one server listen socket, not %v", skts)
   528  		}
   529  		return true, nil
   530  	}); err != nil {
   531  		t.Fatal(err)
   532  	}
   533  
   534  	lis.Close()
   535  	if err := verifyResultWithDelay(func() (bool, error) {
   536  		ss, _ := channelz.GetServers(0, 0)
   537  		if len(ss) != 1 {
   538  			return false, fmt.Errorf("there should be 1 server, not %d", len(ss))
   539  		}
   540  		skts := ss[0].ListenSockets()
   541  		if len(skts) != 0 {
   542  			return false, fmt.Errorf("there should only be %d server listen socket, not %v", 0, skts)
   543  		}
   544  		return true, nil
   545  	}); err != nil {
   546  		t.Fatal(err)
   547  	}
   548  	s.Stop()
   549  }
   550  
   551  func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) {
   552  	//           +--+TopChan+---+
   553  	//           |              |
   554  	//           v              v
   555  	//    +-+SubChan1+--+   SubChan2
   556  	//    |             |
   557  	//    v             v
   558  	// Socket1       Socket2
   559  
   560  	topChan := channelz.RegisterChannel(nil, "")
   561  	subChan1 := channelz.RegisterSubChannel(topChan, "")
   562  	subChan2 := channelz.RegisterSubChannel(topChan, "")
   563  	skt1 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan1})
   564  	skt2 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan1})
   565  
   566  	tcs, _ := channelz.GetTopChannels(0, 0)
   567  	if tcs == nil || len(tcs) != 1 {
   568  		t.Fatalf("There should be one TopChannel entry")
   569  	}
   570  	if len(tcs[0].SubChans()) != 2 {
   571  		t.Fatalf("There should be two SubChannel entries")
   572  	}
   573  	sc := channelz.GetSubChannel(subChan1.ID)
   574  	if sc == nil || len(sc.Sockets()) != 2 {
   575  		t.Fatalf("There should be two Socket entries")
   576  	}
   577  
   578  	channelz.RemoveEntry(topChan.ID)
   579  	tcs, _ = channelz.GetTopChannels(0, 0)
   580  	if tcs == nil || len(tcs) != 1 {
   581  		t.Fatalf("There should be one TopChannel entry")
   582  	}
   583  
   584  	channelz.RemoveEntry(subChan1.ID)
   585  	channelz.RemoveEntry(subChan2.ID)
   586  	tcs, _ = channelz.GetTopChannels(0, 0)
   587  	if tcs == nil || len(tcs) != 1 {
   588  		t.Fatalf("There should be one TopChannel entry")
   589  	}
   590  	if len(tcs[0].SubChans()) != 1 {
   591  		t.Fatalf("There should be one SubChannel entry")
   592  	}
   593  
   594  	channelz.RemoveEntry(skt1.ID)
   595  	channelz.RemoveEntry(skt2.ID)
   596  	tcs, _ = channelz.GetTopChannels(0, 0)
   597  	if tcs != nil {
   598  		t.Fatalf("There should be no TopChannel entry")
   599  	}
   600  }
   601  
   602  func (s) TestCZChannelMetrics(t *testing.T) {
   603  	e := tcpClearRREnv
   604  	num := 3 // number of backends
   605  	te := newTest(t, e)
   606  	te.maxClientSendMsgSize = newInt(8)
   607  	var svrAddrs []resolver.Address
   608  	te.startServers(&testServer{security: e.security}, num)
   609  	r := manual.NewBuilderWithScheme("whatever")
   610  	for _, a := range te.srvAddrs {
   611  		svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
   612  	}
   613  	r.InitialState(resolver.State{Addresses: svrAddrs})
   614  	te.resolverScheme = r.Scheme()
   615  	cc := te.clientConn(grpc.WithResolvers(r))
   616  	defer te.tearDown()
   617  	tc := testgrpc.NewTestServiceClient(cc)
   618  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   619  	defer cancel()
   620  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   621  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   622  	}
   623  
   624  	const smallSize = 1
   625  	const largeSize = 8
   626  
   627  	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
   628  	if err != nil {
   629  		t.Fatal(err)
   630  	}
   631  	req := &testpb.SimpleRequest{
   632  		ResponseType: testpb.PayloadType_COMPRESSABLE,
   633  		ResponseSize: int32(smallSize),
   634  		Payload:      largePayload,
   635  	}
   636  
   637  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
   638  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
   639  	}
   640  
   641  	stream, err := tc.FullDuplexCall(ctx)
   642  	if err != nil {
   643  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
   644  	}
   645  	defer stream.CloseSend()
   646  	// Here, we just wait for all sockets to be up. In the future, if we implement
   647  	// IDLE, we may need to make several rpc calls to create the sockets.
   648  	if err := verifyResultWithDelay(func() (bool, error) {
   649  		tcs, _ := channelz.GetTopChannels(0, 0)
   650  		if len(tcs) != 1 {
   651  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
   652  		}
   653  		subChans := tcs[0].SubChans()
   654  		if len(subChans) != num {
   655  			return false, fmt.Errorf("there should be %d subchannel not %d", num, len(subChans))
   656  		}
   657  		var cst, csu, cf int64
   658  		for k := range subChans {
   659  			sc := channelz.GetSubChannel(k)
   660  			if sc == nil {
   661  				return false, fmt.Errorf("got <nil> subchannel")
   662  			}
   663  			cst += sc.ChannelMetrics.CallsStarted.Load()
   664  			csu += sc.ChannelMetrics.CallsSucceeded.Load()
   665  			cf += sc.ChannelMetrics.CallsFailed.Load()
   666  		}
   667  		if cst != 3 {
   668  			return false, fmt.Errorf("there should be 3 CallsStarted not %d", cst)
   669  		}
   670  		if csu != 1 {
   671  			return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", csu)
   672  		}
   673  		if cf != 1 {
   674  			return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf)
   675  		}
   676  		if got := tcs[0].ChannelMetrics.CallsStarted.Load(); got != 3 {
   677  			return false, fmt.Errorf("there should be 3 CallsStarted not %d", got)
   678  		}
   679  		if got := tcs[0].ChannelMetrics.CallsSucceeded.Load(); got != 1 {
   680  			return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", got)
   681  		}
   682  		if got := tcs[0].ChannelMetrics.CallsFailed.Load(); got != 1 {
   683  			return false, fmt.Errorf("there should be 1 CallsFailed not %d", got)
   684  		}
   685  		return true, nil
   686  	}); err != nil {
   687  		t.Fatal(err)
   688  	}
   689  }
   690  
   691  func (s) TestCZServerMetrics(t *testing.T) {
   692  	e := tcpClearRREnv
   693  	te := newTest(t, e)
   694  	te.maxServerReceiveMsgSize = newInt(8)
   695  	te.startServer(&testServer{security: e.security})
   696  	defer te.tearDown()
   697  	cc := te.clientConn()
   698  	tc := testgrpc.NewTestServiceClient(cc)
   699  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   700  	defer cancel()
   701  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   702  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   703  	}
   704  
   705  	const smallSize = 1
   706  	const largeSize = 8
   707  
   708  	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
   709  	if err != nil {
   710  		t.Fatal(err)
   711  	}
   712  	req := &testpb.SimpleRequest{
   713  		ResponseType: testpb.PayloadType_COMPRESSABLE,
   714  		ResponseSize: int32(smallSize),
   715  		Payload:      largePayload,
   716  	}
   717  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
   718  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
   719  	}
   720  
   721  	stream, err := tc.FullDuplexCall(ctx)
   722  	if err != nil {
   723  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
   724  	}
   725  	defer stream.CloseSend()
   726  
   727  	if err := verifyResultWithDelay(func() (bool, error) {
   728  		ss, _ := channelz.GetServers(0, 0)
   729  		if len(ss) != 1 {
   730  			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
   731  		}
   732  		if cs := ss[0].ServerMetrics.CallsStarted.Load(); cs != 3 {
   733  			return false, fmt.Errorf("there should be 3 CallsStarted not %d", cs)
   734  		}
   735  		if cs := ss[0].ServerMetrics.CallsSucceeded.Load(); cs != 1 {
   736  			return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", cs)
   737  		}
   738  		if cf := ss[0].ServerMetrics.CallsFailed.Load(); cf != 1 {
   739  			return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf)
   740  		}
   741  		return true, nil
   742  	}); err != nil {
   743  		t.Fatal(err)
   744  	}
   745  }
   746  
   747  type testServiceClientWrapper struct {
   748  	testgrpc.TestServiceClient
   749  	mu             sync.RWMutex
   750  	streamsCreated int
   751  }
   752  
   753  func (t *testServiceClientWrapper) getCurrentStreamID() uint32 {
   754  	t.mu.RLock()
   755  	defer t.mu.RUnlock()
   756  	return uint32(2*t.streamsCreated - 1)
   757  }
   758  
   759  func (t *testServiceClientWrapper) EmptyCall(ctx context.Context, in *testpb.Empty, opts ...grpc.CallOption) (*testpb.Empty, error) {
   760  	t.mu.Lock()
   761  	defer t.mu.Unlock()
   762  	t.streamsCreated++
   763  	return t.TestServiceClient.EmptyCall(ctx, in, opts...)
   764  }
   765  
   766  func (t *testServiceClientWrapper) UnaryCall(ctx context.Context, in *testpb.SimpleRequest, opts ...grpc.CallOption) (*testpb.SimpleResponse, error) {
   767  	t.mu.Lock()
   768  	defer t.mu.Unlock()
   769  	t.streamsCreated++
   770  	return t.TestServiceClient.UnaryCall(ctx, in, opts...)
   771  }
   772  
   773  func (t *testServiceClientWrapper) StreamingOutputCall(ctx context.Context, in *testpb.StreamingOutputCallRequest, opts ...grpc.CallOption) (testgrpc.TestService_StreamingOutputCallClient, error) {
   774  	t.mu.Lock()
   775  	defer t.mu.Unlock()
   776  	t.streamsCreated++
   777  	return t.TestServiceClient.StreamingOutputCall(ctx, in, opts...)
   778  }
   779  
   780  func (t *testServiceClientWrapper) StreamingInputCall(ctx context.Context, opts ...grpc.CallOption) (testgrpc.TestService_StreamingInputCallClient, error) {
   781  	t.mu.Lock()
   782  	defer t.mu.Unlock()
   783  	t.streamsCreated++
   784  	return t.TestServiceClient.StreamingInputCall(ctx, opts...)
   785  }
   786  
   787  func (t *testServiceClientWrapper) FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testgrpc.TestService_FullDuplexCallClient, error) {
   788  	t.mu.Lock()
   789  	defer t.mu.Unlock()
   790  	t.streamsCreated++
   791  	return t.TestServiceClient.FullDuplexCall(ctx, opts...)
   792  }
   793  
   794  func (t *testServiceClientWrapper) HalfDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testgrpc.TestService_HalfDuplexCallClient, error) {
   795  	t.mu.Lock()
   796  	defer t.mu.Unlock()
   797  	t.streamsCreated++
   798  	return t.TestServiceClient.HalfDuplexCall(ctx, opts...)
   799  }
   800  
   801  func doSuccessfulUnaryCall(tc testgrpc.TestServiceClient, t *testing.T) {
   802  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   803  	defer cancel()
   804  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   805  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   806  	}
   807  }
   808  
   809  func doStreamingInputCallWithLargePayload(tc testgrpc.TestServiceClient, t *testing.T) {
   810  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   811  	defer cancel()
   812  	s, err := tc.StreamingInputCall(ctx)
   813  	if err != nil {
   814  		t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want <nil>", err)
   815  	}
   816  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10000)
   817  	if err != nil {
   818  		t.Fatal(err)
   819  	}
   820  	s.Send(&testpb.StreamingInputCallRequest{Payload: payload})
   821  }
   822  
   823  func doServerSideFailedUnaryCall(tc testgrpc.TestServiceClient, t *testing.T) {
   824  	const smallSize = 1
   825  	const largeSize = 2000
   826  
   827  	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
   828  	if err != nil {
   829  		t.Fatal(err)
   830  	}
   831  	req := &testpb.SimpleRequest{
   832  		ResponseType: testpb.PayloadType_COMPRESSABLE,
   833  		ResponseSize: int32(smallSize),
   834  		Payload:      largePayload,
   835  	}
   836  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   837  	defer cancel()
   838  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
   839  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
   840  	}
   841  }
   842  
   843  func doClientSideInitiatedFailedStream(tc testgrpc.TestServiceClient, t *testing.T) {
   844  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   845  	stream, err := tc.FullDuplexCall(ctx)
   846  	if err != nil {
   847  		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
   848  	}
   849  
   850  	const smallSize = 1
   851  	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
   852  	if err != nil {
   853  		t.Fatal(err)
   854  	}
   855  
   856  	sreq := &testpb.StreamingOutputCallRequest{
   857  		ResponseType: testpb.PayloadType_COMPRESSABLE,
   858  		ResponseParameters: []*testpb.ResponseParameters{
   859  			{Size: smallSize},
   860  		},
   861  		Payload: smallPayload,
   862  	}
   863  
   864  	if err := stream.Send(sreq); err != nil {
   865  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
   866  	}
   867  	if _, err := stream.Recv(); err != nil {
   868  		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
   869  	}
   870  	// By canceling the call, the client will send rst_stream to end the call, and
   871  	// the stream will failed as a result.
   872  	cancel()
   873  }
   874  
   875  // This func is to be used to test client side counting of failed streams.
   876  func doServerSideInitiatedFailedStreamWithRSTStream(tc testgrpc.TestServiceClient, t *testing.T, l *listenerWrapper) {
   877  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   878  	defer cancel()
   879  	stream, err := tc.FullDuplexCall(ctx)
   880  	if err != nil {
   881  		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
   882  	}
   883  
   884  	const smallSize = 1
   885  	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
   886  	if err != nil {
   887  		t.Fatal(err)
   888  	}
   889  
   890  	sreq := &testpb.StreamingOutputCallRequest{
   891  		ResponseType: testpb.PayloadType_COMPRESSABLE,
   892  		ResponseParameters: []*testpb.ResponseParameters{
   893  			{Size: smallSize},
   894  		},
   895  		Payload: smallPayload,
   896  	}
   897  
   898  	if err := stream.Send(sreq); err != nil {
   899  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
   900  	}
   901  	if _, err := stream.Recv(); err != nil {
   902  		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
   903  	}
   904  
   905  	rcw := l.getLastConn()
   906  
   907  	if rcw != nil {
   908  		rcw.writeRSTStream(tc.(*testServiceClientWrapper).getCurrentStreamID(), http2.ErrCodeCancel)
   909  	}
   910  	if _, err := stream.Recv(); err == nil {
   911  		t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
   912  	}
   913  }
   914  
   915  // this func is to be used to test client side counting of failed streams.
   916  func doServerSideInitiatedFailedStreamWithGoAway(ctx context.Context, tc testgrpc.TestServiceClient, t *testing.T, l *listenerWrapper) {
   917  	// This call is just to keep the transport from shutting down (socket will be deleted
   918  	// in this case, and we will not be able to get metrics).
   919  	s, err := tc.FullDuplexCall(ctx)
   920  	if err != nil {
   921  		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
   922  	}
   923  	if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
   924  		{
   925  			Size: 1,
   926  		},
   927  	}}); err != nil {
   928  		t.Fatalf("s.Send() failed with error: %v", err)
   929  	}
   930  	if _, err := s.Recv(); err != nil {
   931  		t.Fatalf("s.Recv() failed with error: %v", err)
   932  	}
   933  
   934  	s, err = tc.FullDuplexCall(ctx)
   935  	if err != nil {
   936  		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
   937  	}
   938  	if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
   939  		{
   940  			Size: 1,
   941  		},
   942  	}}); err != nil {
   943  		t.Fatalf("s.Send() failed with error: %v", err)
   944  	}
   945  	if _, err := s.Recv(); err != nil {
   946  		t.Fatalf("s.Recv() failed with error: %v", err)
   947  	}
   948  
   949  	rcw := l.getLastConn()
   950  	if rcw != nil {
   951  		rcw.writeGoAway(tc.(*testServiceClientWrapper).getCurrentStreamID()-2, http2.ErrCodeCancel, []byte{})
   952  	}
   953  	if _, err := s.Recv(); err == nil {
   954  		t.Fatalf("%v.Recv() = %v, want <non-nil>", s, err)
   955  	}
   956  }
   957  
   958  func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
   959  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   960  	defer cancel()
   961  
   962  	e := tcpClearRREnv
   963  	te := newTest(t, e)
   964  	te.maxServerReceiveMsgSize = newInt(20)
   965  	te.maxClientReceiveMsgSize = newInt(20)
   966  	rcw := te.startServerWithConnControl(&testServer{security: e.security})
   967  	defer te.tearDown()
   968  	cc := te.clientConn()
   969  	tc := &testServiceClientWrapper{TestServiceClient: testgrpc.NewTestServiceClient(cc)}
   970  
   971  	doSuccessfulUnaryCall(tc, t)
   972  	var scID, skID int64
   973  	if err := verifyResultWithDelay(func() (bool, error) {
   974  		tchan, _ := channelz.GetTopChannels(0, 0)
   975  		if len(tchan) != 1 {
   976  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
   977  		}
   978  		subChans := tchan[0].SubChans()
   979  		if len(subChans) != 1 {
   980  			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(subChans))
   981  		}
   982  
   983  		for scID = range subChans {
   984  			break
   985  		}
   986  		sc := channelz.GetSubChannel(scID)
   987  		if sc == nil {
   988  			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", scID)
   989  		}
   990  		skts := sc.Sockets()
   991  		if len(skts) != 1 {
   992  			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(skts))
   993  		}
   994  		for skID = range skts {
   995  			break
   996  		}
   997  		skt := channelz.GetSocket(skID)
   998  		sktData := &skt.SocketMetrics
   999  		if sktData.StreamsStarted.Load() != 1 || sktData.StreamsSucceeded.Load() != 1 || sktData.MessagesSent.Load() != 1 || sktData.MessagesReceived.Load() != 1 {
  1000  			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (1, 1, 1, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
  1001  		}
  1002  		return true, nil
  1003  	}); err != nil {
  1004  		t.Fatal(err)
  1005  	}
  1006  
  1007  	doServerSideFailedUnaryCall(tc, t)
  1008  	if err := verifyResultWithDelay(func() (bool, error) {
  1009  		skt := channelz.GetSocket(skID)
  1010  		sktData := &skt.SocketMetrics
  1011  		if sktData.StreamsStarted.Load() != 2 || sktData.StreamsSucceeded.Load() != 2 || sktData.MessagesSent.Load() != 2 || sktData.MessagesReceived.Load() != 1 {
  1012  			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (2, 2, 2, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
  1013  		}
  1014  		return true, nil
  1015  	}); err != nil {
  1016  		t.Fatal(err)
  1017  	}
  1018  
  1019  	doClientSideInitiatedFailedStream(tc, t)
  1020  	if err := verifyResultWithDelay(func() (bool, error) {
  1021  		skt := channelz.GetSocket(skID)
  1022  		sktData := &skt.SocketMetrics
  1023  		if sktData.StreamsStarted.Load() != 3 || sktData.StreamsSucceeded.Load() != 2 || sktData.StreamsFailed.Load() != 1 || sktData.MessagesSent.Load() != 3 || sktData.MessagesReceived.Load() != 2 {
  1024  			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (3, 2, 1, 3, 2), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
  1025  		}
  1026  		return true, nil
  1027  	}); err != nil {
  1028  		t.Fatal(err)
  1029  	}
  1030  
  1031  	doServerSideInitiatedFailedStreamWithRSTStream(tc, t, rcw)
  1032  	if err := verifyResultWithDelay(func() (bool, error) {
  1033  		skt := channelz.GetSocket(skID)
  1034  		sktData := &skt.SocketMetrics
  1035  		if sktData.StreamsStarted.Load() != 4 || sktData.StreamsSucceeded.Load() != 2 || sktData.StreamsFailed.Load() != 2 || sktData.MessagesSent.Load() != 4 || sktData.MessagesReceived.Load() != 3 {
  1036  			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (4, 2, 2, 4, 3), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
  1037  		}
  1038  		return true, nil
  1039  	}); err != nil {
  1040  		t.Fatal(err)
  1041  	}
  1042  
  1043  	doServerSideInitiatedFailedStreamWithGoAway(ctx, tc, t, rcw)
  1044  	if err := verifyResultWithDelay(func() (bool, error) {
  1045  		skt := channelz.GetSocket(skID)
  1046  		sktData := &skt.SocketMetrics
  1047  		if sktData.StreamsStarted.Load() != 6 || sktData.StreamsSucceeded.Load() != 2 || sktData.StreamsFailed.Load() != 3 || sktData.MessagesSent.Load() != 6 || sktData.MessagesReceived.Load() != 5 {
  1048  			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (6, 2, 3, 6, 5), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
  1049  		}
  1050  		return true, nil
  1051  	}); err != nil {
  1052  		t.Fatal(err)
  1053  	}
  1054  }
  1055  
  1056  // This test is to complete TestCZClientSocketMetricsStreamsAndMessagesCount and
  1057  // TestCZServerSocketMetricsStreamsAndMessagesCount by adding the test case of
  1058  // server sending RST_STREAM to client due to client side flow control violation.
  1059  // It is separated from other cases due to setup incompatibly, i.e. max receive
  1060  // size violation will mask flow control violation.
  1061  func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) {
  1062  	e := tcpClearRREnv
  1063  	te := newTest(t, e)
  1064  	te.serverInitialWindowSize = 65536
  1065  	// Avoid overflowing connection level flow control window, which will lead to
  1066  	// transport being closed.
  1067  	te.serverInitialConnWindowSize = 65536 * 2
  1068  	ts := &stubserver.StubServer{FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  1069  		stream.Send(&testpb.StreamingOutputCallResponse{})
  1070  		<-stream.Context().Done()
  1071  		return status.Errorf(codes.DeadlineExceeded, "deadline exceeded or cancelled")
  1072  	}}
  1073  	te.startServer(ts)
  1074  	defer te.tearDown()
  1075  	cc, dw := te.clientConnWithConnControl()
  1076  	tc := &testServiceClientWrapper{TestServiceClient: testgrpc.NewTestServiceClient(cc)}
  1077  
  1078  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1079  	stream, err := tc.FullDuplexCall(ctx)
  1080  	if err != nil {
  1081  		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  1082  	}
  1083  	if _, err := stream.Recv(); err != nil {
  1084  		t.Fatalf("stream.Recv() = %v, want nil", err)
  1085  	}
  1086  	go func() {
  1087  		payload := make([]byte, 16384)
  1088  		for i := 0; i < 6; i++ {
  1089  			dw.getRawConnWrapper().writeRawFrame(http2.FrameData, 0, tc.getCurrentStreamID(), payload)
  1090  		}
  1091  	}()
  1092  	if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted {
  1093  		t.Fatalf("stream.Recv() = %v, want error code: %v", err, codes.ResourceExhausted)
  1094  	}
  1095  	cancel()
  1096  
  1097  	if err := verifyResultWithDelay(func() (bool, error) {
  1098  		tchan, _ := channelz.GetTopChannels(0, 0)
  1099  		if len(tchan) != 1 {
  1100  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  1101  		}
  1102  		subChans := tchan[0].SubChans()
  1103  		if len(subChans) != 1 {
  1104  			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(subChans))
  1105  		}
  1106  		var id int64
  1107  		for id = range subChans {
  1108  			break
  1109  		}
  1110  		sc := channelz.GetSubChannel(id)
  1111  		if sc == nil {
  1112  			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  1113  		}
  1114  		skts := sc.Sockets()
  1115  		if len(skts) != 1 {
  1116  			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(skts))
  1117  		}
  1118  		for id = range skts {
  1119  			break
  1120  		}
  1121  		skt := channelz.GetSocket(id)
  1122  		sktData := &skt.SocketMetrics
  1123  		if sktData.StreamsStarted.Load() != 1 || sktData.StreamsSucceeded.Load() != 0 || sktData.StreamsFailed.Load() != 1 {
  1124  			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load()) = (1, 0, 1), got (%d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load())
  1125  		}
  1126  		ss, _ := channelz.GetServers(0, 0)
  1127  		if len(ss) != 1 {
  1128  			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  1129  		}
  1130  
  1131  		ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
  1132  		if len(ns) != 1 {
  1133  			return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
  1134  		}
  1135  		sktData = &ns[0].SocketMetrics
  1136  		if sktData.StreamsStarted.Load() != 1 || sktData.StreamsSucceeded.Load() != 0 || sktData.StreamsFailed.Load() != 1 {
  1137  			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load()) = (1, 0, 1), got (%d, %d, %d)", ns[0].ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load())
  1138  		}
  1139  		return true, nil
  1140  	}); err != nil {
  1141  		t.Fatal(err)
  1142  	}
  1143  }
  1144  
  1145  func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
  1146  	e := tcpClearRREnv
  1147  	te := newTest(t, e)
  1148  	// disable BDP
  1149  	te.serverInitialWindowSize = 65536
  1150  	te.serverInitialConnWindowSize = 65536
  1151  	te.clientInitialWindowSize = 65536
  1152  	te.clientInitialConnWindowSize = 65536
  1153  	te.startServer(&testServer{security: e.security})
  1154  	defer te.tearDown()
  1155  	cc := te.clientConn()
  1156  	tc := testgrpc.NewTestServiceClient(cc)
  1157  
  1158  	for i := 0; i < 10; i++ {
  1159  		doSuccessfulUnaryCall(tc, t)
  1160  	}
  1161  
  1162  	var cliSktID, svrSktID int64
  1163  	if err := verifyResultWithDelay(func() (bool, error) {
  1164  		tchan, _ := channelz.GetTopChannels(0, 0)
  1165  		if len(tchan) != 1 {
  1166  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  1167  		}
  1168  		subChans := tchan[0].SubChans()
  1169  		if len(subChans) != 1 {
  1170  			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(subChans))
  1171  		}
  1172  		var id int64
  1173  		for id = range subChans {
  1174  			break
  1175  		}
  1176  		sc := channelz.GetSubChannel(id)
  1177  		if sc == nil {
  1178  			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  1179  		}
  1180  		skts := sc.Sockets()
  1181  		if len(skts) != 1 {
  1182  			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(skts))
  1183  		}
  1184  		for id = range skts {
  1185  			break
  1186  		}
  1187  		skt := channelz.GetSocket(id)
  1188  		sktData := skt.EphemeralMetrics()
  1189  		// 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
  1190  		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
  1191  			return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  1192  		}
  1193  		ss, _ := channelz.GetServers(0, 0)
  1194  		if len(ss) != 1 {
  1195  			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  1196  		}
  1197  		ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
  1198  		sktData = ns[0].EphemeralMetrics()
  1199  		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
  1200  			return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  1201  		}
  1202  		cliSktID, svrSktID = id, ss[0].ID
  1203  		return true, nil
  1204  	}); err != nil {
  1205  		t.Fatal(err)
  1206  	}
  1207  
  1208  	doStreamingInputCallWithLargePayload(tc, t)
  1209  
  1210  	if err := verifyResultWithDelay(func() (bool, error) {
  1211  		skt := channelz.GetSocket(cliSktID)
  1212  		sktData := skt.EphemeralMetrics()
  1213  		// Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
  1214  		// Remote: 65536 - 5 (Length-Prefixed-Message size) * 10 - 10011 = 55475
  1215  		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 55475 {
  1216  			return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  1217  		}
  1218  		ss, _ := channelz.GetServers(0, 0)
  1219  		if len(ss) != 1 {
  1220  			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  1221  		}
  1222  		ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
  1223  		sktData = ns[0].EphemeralMetrics()
  1224  		if sktData.LocalFlowControlWindow != 55475 || sktData.RemoteFlowControlWindow != 65486 {
  1225  			return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  1226  		}
  1227  		return true, nil
  1228  	}); err != nil {
  1229  		t.Fatal(err)
  1230  	}
  1231  
  1232  	// triggers transport flow control window update on server side, since unacked
  1233  	// bytes should be larger than limit now. i.e. 50 + 20022 > 65536/4.
  1234  	doStreamingInputCallWithLargePayload(tc, t)
  1235  	if err := verifyResultWithDelay(func() (bool, error) {
  1236  		skt := channelz.GetSocket(cliSktID)
  1237  		sktData := skt.EphemeralMetrics()
  1238  		// Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
  1239  		// Remote: 65536
  1240  		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65536 {
  1241  			return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  1242  		}
  1243  		ss, _ := channelz.GetServers(0, 0)
  1244  		if len(ss) != 1 {
  1245  			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  1246  		}
  1247  		ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
  1248  		sktData = ns[0].EphemeralMetrics()
  1249  		if sktData.LocalFlowControlWindow != 65536 || sktData.RemoteFlowControlWindow != 65486 {
  1250  			return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  1251  		}
  1252  		return true, nil
  1253  	}); err != nil {
  1254  		t.Fatal(err)
  1255  	}
  1256  }
  1257  
  1258  func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
  1259  	const keepaliveRate = 50 * time.Millisecond
  1260  	defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime)
  1261  	internal.KeepaliveMinPingTime = keepaliveRate
  1262  	e := tcpClearRREnv
  1263  	te := newTest(t, e)
  1264  	te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams(
  1265  		keepalive.ClientParameters{
  1266  			Time:                keepaliveRate,
  1267  			Timeout:             500 * time.Millisecond,
  1268  			PermitWithoutStream: true,
  1269  		}))
  1270  	te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy(
  1271  		keepalive.EnforcementPolicy{
  1272  			MinTime:             keepaliveRate,
  1273  			PermitWithoutStream: true,
  1274  		}))
  1275  	te.startServer(&testServer{security: e.security})
  1276  	cc := te.clientConn() // Dial the server
  1277  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1278  	defer cancel()
  1279  	testutils.AwaitState(ctx, t, cc, connectivity.Ready)
  1280  	start := time.Now()
  1281  	// Wait for at least two keepalives to be able to occur.
  1282  	time.Sleep(2 * keepaliveRate)
  1283  	defer te.tearDown()
  1284  	if err := verifyResultWithDelay(func() (bool, error) {
  1285  		tchan, _ := channelz.GetTopChannels(0, 0)
  1286  		if len(tchan) != 1 {
  1287  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  1288  		}
  1289  		subChans := tchan[0].SubChans()
  1290  		if len(subChans) != 1 {
  1291  			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(subChans))
  1292  		}
  1293  		var id int64
  1294  		for id = range subChans {
  1295  			break
  1296  		}
  1297  		sc := channelz.GetSubChannel(id)
  1298  		if sc == nil {
  1299  			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  1300  		}
  1301  		skts := sc.Sockets()
  1302  		if len(skts) != 1 {
  1303  			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(skts))
  1304  		}
  1305  		for id = range skts {
  1306  			break
  1307  		}
  1308  		skt := channelz.GetSocket(id)
  1309  		want := int64(time.Since(start) / keepaliveRate)
  1310  		if got := skt.SocketMetrics.KeepAlivesSent.Load(); got != want {
  1311  			return false, fmt.Errorf("there should be %v KeepAlives sent, not %d", want, got)
  1312  		}
  1313  		return true, nil
  1314  	}); err != nil {
  1315  		t.Fatal(err)
  1316  	}
  1317  }
  1318  
  1319  func (s) TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
  1320  	e := tcpClearRREnv
  1321  	te := newTest(t, e)
  1322  	te.maxServerReceiveMsgSize = newInt(20)
  1323  	te.maxClientReceiveMsgSize = newInt(20)
  1324  	te.startServer(&testServer{security: e.security})
  1325  	defer te.tearDown()
  1326  	cc, _ := te.clientConnWithConnControl()
  1327  	tc := &testServiceClientWrapper{TestServiceClient: testgrpc.NewTestServiceClient(cc)}
  1328  
  1329  	var svrID int64
  1330  	if err := verifyResultWithDelay(func() (bool, error) {
  1331  		ss, _ := channelz.GetServers(0, 0)
  1332  		if len(ss) != 1 {
  1333  			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  1334  		}
  1335  		svrID = ss[0].ID
  1336  		return true, nil
  1337  	}); err != nil {
  1338  		t.Fatal(err)
  1339  	}
  1340  
  1341  	doSuccessfulUnaryCall(tc, t)
  1342  	if err := verifyResultWithDelay(func() (bool, error) {
  1343  		ns, _ := channelz.GetServerSockets(svrID, 0, 0)
  1344  		sktData := &ns[0].SocketMetrics
  1345  		if sktData.StreamsStarted.Load() != 1 || sktData.StreamsSucceeded.Load() != 1 || sktData.StreamsFailed.Load() != 0 || sktData.MessagesSent.Load() != 1 || sktData.MessagesReceived.Load() != 1 {
  1346  			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted.Load(), StreamsSucceeded.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
  1347  		}
  1348  		return true, nil
  1349  	}); err != nil {
  1350  		t.Fatal(err)
  1351  	}
  1352  
  1353  	doServerSideFailedUnaryCall(tc, t)
  1354  	if err := verifyResultWithDelay(func() (bool, error) {
  1355  		ns, _ := channelz.GetServerSockets(svrID, 0, 0)
  1356  		sktData := &ns[0].SocketMetrics
  1357  		if sktData.StreamsStarted.Load() != 2 || sktData.StreamsSucceeded.Load() != 2 || sktData.StreamsFailed.Load() != 0 || sktData.MessagesSent.Load() != 1 || sktData.MessagesReceived.Load() != 1 {
  1358  			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
  1359  		}
  1360  		return true, nil
  1361  	}); err != nil {
  1362  		t.Fatal(err)
  1363  	}
  1364  
  1365  	doClientSideInitiatedFailedStream(tc, t)
  1366  	if err := verifyResultWithDelay(func() (bool, error) {
  1367  		ns, _ := channelz.GetServerSockets(svrID, 0, 0)
  1368  		sktData := &ns[0].SocketMetrics
  1369  		if sktData.StreamsStarted.Load() != 3 || sktData.StreamsSucceeded.Load() != 2 || sktData.StreamsFailed.Load() != 1 || sktData.MessagesSent.Load() != 2 || sktData.MessagesReceived.Load() != 2 {
  1370  			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
  1371  		}
  1372  		return true, nil
  1373  	}); err != nil {
  1374  		t.Fatal(err)
  1375  	}
  1376  }
  1377  
  1378  func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) {
  1379  	defer func(t time.Duration) { internal.KeepaliveMinServerPingTime = t }(internal.KeepaliveMinServerPingTime)
  1380  	internal.KeepaliveMinServerPingTime = 50 * time.Millisecond
  1381  
  1382  	e := tcpClearRREnv
  1383  	te := newTest(t, e)
  1384  	// We setup the server keepalive parameters to send one keepalive every
  1385  	// 50ms, and verify that the actual number of keepalives is very close to
  1386  	// Time/50ms.  We had a bug wherein the server was sending one keepalive
  1387  	// every [Time+Timeout] instead of every [Time] period, and since Timeout
  1388  	// is configured to a high value here, we should be able to verify that the
  1389  	// fix works with the above mentioned logic.
  1390  	kpOption := grpc.KeepaliveParams(keepalive.ServerParameters{
  1391  		Time:    50 * time.Millisecond,
  1392  		Timeout: 5 * time.Second,
  1393  	})
  1394  	te.customServerOptions = append(te.customServerOptions, kpOption)
  1395  	te.startServer(&testServer{security: e.security})
  1396  	defer te.tearDown()
  1397  	cc := te.clientConn()
  1398  
  1399  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1400  	defer cancel()
  1401  	testutils.AwaitState(ctx, t, cc, connectivity.Ready)
  1402  
  1403  	// Allow about 5 pings to happen (250ms/50ms).
  1404  	time.Sleep(255 * time.Millisecond)
  1405  
  1406  	ss, _ := channelz.GetServers(0, 0)
  1407  	if len(ss) != 1 {
  1408  		t.Fatalf("there should be one server, not %d", len(ss))
  1409  	}
  1410  	ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
  1411  	if len(ns) != 1 {
  1412  		t.Fatalf("there should be one server normal socket, not %d", len(ns))
  1413  	}
  1414  	const wantMin, wantMax = 3, 7
  1415  	if got := ns[0].SocketMetrics.KeepAlivesSent.Load(); got < wantMin || got > wantMax {
  1416  		t.Fatalf("got keepalivesCount: %v, want keepalivesCount: [%v,%v]", got, wantMin, wantMax)
  1417  	}
  1418  }
  1419  
  1420  var cipherSuites = []string{
  1421  	"TLS_RSA_WITH_RC4_128_SHA",
  1422  	"TLS_RSA_WITH_3DES_EDE_CBC_SHA",
  1423  	"TLS_RSA_WITH_AES_128_CBC_SHA",
  1424  	"TLS_RSA_WITH_AES_256_CBC_SHA",
  1425  	"TLS_RSA_WITH_AES_128_GCM_SHA256",
  1426  	"TLS_RSA_WITH_AES_256_GCM_SHA384",
  1427  	"TLS_ECDHE_ECDSA_WITH_RC4_128_SHA",
  1428  	"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
  1429  	"TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
  1430  	"TLS_ECDHE_RSA_WITH_RC4_128_SHA",
  1431  	"TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
  1432  	"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
  1433  	"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
  1434  	"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
  1435  	"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
  1436  	"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
  1437  	"TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
  1438  	"TLS_FALLBACK_SCSV",
  1439  	"TLS_RSA_WITH_AES_128_CBC_SHA256",
  1440  	"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
  1441  	"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
  1442  	"TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
  1443  	"TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
  1444  	"TLS_AES_128_GCM_SHA256",
  1445  	"TLS_AES_256_GCM_SHA384",
  1446  	"TLS_CHACHA20_POLY1305_SHA256",
  1447  }
  1448  
  1449  func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) {
  1450  	e := tcpTLSRREnv
  1451  	te := newTest(t, e)
  1452  	te.startServer(&testServer{security: e.security})
  1453  	defer te.tearDown()
  1454  	te.clientConn()
  1455  	if err := verifyResultWithDelay(func() (bool, error) {
  1456  		tchan, _ := channelz.GetTopChannels(0, 0)
  1457  		if len(tchan) != 1 {
  1458  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  1459  		}
  1460  		subChans := tchan[0].SubChans()
  1461  		if len(subChans) != 1 {
  1462  			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(subChans))
  1463  		}
  1464  		var id int64
  1465  		for id = range subChans {
  1466  			break
  1467  		}
  1468  		sc := channelz.GetSubChannel(id)
  1469  		if sc == nil {
  1470  			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  1471  		}
  1472  		skts := sc.Sockets()
  1473  		if len(skts) != 1 {
  1474  			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(skts))
  1475  		}
  1476  		for id = range skts {
  1477  			break
  1478  		}
  1479  		skt := channelz.GetSocket(id)
  1480  		cert, _ := tls.LoadX509KeyPair(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem"))
  1481  		securityVal, ok := skt.Security.(*credentials.TLSChannelzSecurityValue)
  1482  		if !ok {
  1483  			return false, fmt.Errorf("the Security is of type: %T, want: *credentials.TLSChannelzSecurityValue", skt.Security)
  1484  		}
  1485  		if !cmp.Equal(securityVal.RemoteCertificate, cert.Certificate[0]) {
  1486  			return false, fmt.Errorf("Security.RemoteCertificate got: %v, want: %v", securityVal.RemoteCertificate, cert.Certificate[0])
  1487  		}
  1488  		for _, v := range cipherSuites {
  1489  			if v == securityVal.StandardName {
  1490  				return true, nil
  1491  			}
  1492  		}
  1493  		return false, fmt.Errorf("Security.StandardName got: %v, want it to be one of %v", securityVal.StandardName, cipherSuites)
  1494  	}); err != nil {
  1495  		t.Fatal(err)
  1496  	}
  1497  }
  1498  
  1499  func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
  1500  	e := tcpClearRREnv
  1501  	// avoid calling API to set balancer type, which will void service config's change of balancer.
  1502  	e.balancer = ""
  1503  	te := newTest(t, e)
  1504  	r := manual.NewBuilderWithScheme("whatever")
  1505  	te.resolverScheme = r.Scheme()
  1506  	te.clientConn(grpc.WithResolvers(r))
  1507  	resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", ServerName: "grpclb.server"}}
  1508  	grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
  1509  	r.UpdateState(grpclbstate.Set(resolver.State{ServiceConfig: grpclbConfig}, &grpclbstate.State{BalancerAddresses: resolvedAddrs}))
  1510  	defer te.tearDown()
  1511  
  1512  	var nestedConn int64
  1513  	if err := verifyResultWithDelay(func() (bool, error) {
  1514  		tcs, _ := channelz.GetTopChannels(0, 0)
  1515  		if len(tcs) != 1 {
  1516  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1517  		}
  1518  		nestedChans := tcs[0].NestedChans()
  1519  		if len(nestedChans) != 1 {
  1520  			return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(nestedChans))
  1521  		}
  1522  		for k := range nestedChans {
  1523  			nestedConn = k
  1524  		}
  1525  		trace := tcs[0].Trace()
  1526  		for _, e := range trace.Events {
  1527  			if e.RefID == nestedConn && e.RefType != channelz.RefChannel {
  1528  				return false, fmt.Errorf("nested channel trace event shoud have RefChannel as RefType")
  1529  			}
  1530  		}
  1531  		ncm := channelz.GetChannel(nestedConn)
  1532  		ncmTrace := ncm.Trace()
  1533  		if ncmTrace == nil {
  1534  			return false, fmt.Errorf("trace for nested channel should not be empty")
  1535  		}
  1536  		if len(ncmTrace.Events) == 0 {
  1537  			return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
  1538  		}
  1539  		pattern := `Channel created`
  1540  		if ok, _ := regexp.MatchString(pattern, ncmTrace.Events[0].Desc); !ok {
  1541  			return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, ncmTrace.Events[0].Desc)
  1542  		}
  1543  		return true, nil
  1544  	}); err != nil {
  1545  		t.Fatal(err)
  1546  	}
  1547  
  1548  	r.UpdateState(resolver.State{
  1549  		Addresses:     []resolver.Address{{Addr: "127.0.0.1:0"}},
  1550  		ServiceConfig: parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`),
  1551  	})
  1552  
  1553  	// wait for the shutdown of grpclb balancer
  1554  	if err := verifyResultWithDelay(func() (bool, error) {
  1555  		tcs, _ := channelz.GetTopChannels(0, 0)
  1556  		if len(tcs) != 1 {
  1557  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1558  		}
  1559  		nestedChans := tcs[0].NestedChans()
  1560  		if len(nestedChans) != 0 {
  1561  			return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(nestedChans))
  1562  		}
  1563  		ncm := channelz.GetChannel(nestedConn)
  1564  		if ncm == nil {
  1565  			return false, fmt.Errorf("nested channel should still exist due to parent's trace reference")
  1566  		}
  1567  		trace := ncm.Trace()
  1568  		if trace == nil {
  1569  			return false, fmt.Errorf("trace for nested channel should not be empty")
  1570  		}
  1571  		if len(trace.Events) == 0 {
  1572  			return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
  1573  		}
  1574  		pattern := `Channel created`
  1575  		if ok, _ := regexp.MatchString(pattern, trace.Events[0].Desc); !ok {
  1576  			return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, trace.Events[0].Desc)
  1577  		}
  1578  		return true, nil
  1579  	}); err != nil {
  1580  		t.Fatal(err)
  1581  	}
  1582  }
  1583  
  1584  func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
  1585  	e := tcpClearRREnv
  1586  	te := newTest(t, e)
  1587  	te.startServer(&testServer{security: e.security})
  1588  	r := manual.NewBuilderWithScheme("whatever")
  1589  	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
  1590  	te.resolverScheme = r.Scheme()
  1591  	te.clientConn(grpc.WithResolvers(r))
  1592  	defer te.tearDown()
  1593  	var subConn int64
  1594  	// Here, we just wait for all sockets to be up. In the future, if we implement
  1595  	// IDLE, we may need to make several rpc calls to create the sockets.
  1596  	if err := verifyResultWithDelay(func() (bool, error) {
  1597  		tcs, _ := channelz.GetTopChannels(0, 0)
  1598  		if len(tcs) != 1 {
  1599  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1600  		}
  1601  		subChans := tcs[0].SubChans()
  1602  		if len(subChans) != 1 {
  1603  			return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
  1604  		}
  1605  		for k := range subChans {
  1606  			subConn = k
  1607  		}
  1608  		trace := tcs[0].Trace()
  1609  		for _, e := range trace.Events {
  1610  			if e.RefID == subConn && e.RefType != channelz.RefSubChannel {
  1611  				return false, fmt.Errorf("subchannel trace event shoud have RefType to be RefSubChannel")
  1612  			}
  1613  		}
  1614  		scm := channelz.GetSubChannel(subConn)
  1615  		if scm == nil {
  1616  			return false, fmt.Errorf("subChannel does not exist")
  1617  		}
  1618  		scTrace := scm.Trace()
  1619  		if scTrace == nil {
  1620  			return false, fmt.Errorf("trace for subChannel should not be empty")
  1621  		}
  1622  		if len(scTrace.Events) == 0 {
  1623  			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1624  		}
  1625  		pattern := `Subchannel created`
  1626  		if ok, _ := regexp.MatchString(pattern, scTrace.Events[0].Desc); !ok {
  1627  			return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, scTrace.Events[0].Desc)
  1628  		}
  1629  		return true, nil
  1630  	}); err != nil {
  1631  		t.Fatal(err)
  1632  	}
  1633  
  1634  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1635  	defer cancel()
  1636  	testutils.AwaitState(ctx, t, te.cc, connectivity.Ready)
  1637  	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
  1638  	testutils.AwaitNotState(ctx, t, te.cc, connectivity.Ready)
  1639  
  1640  	if err := verifyResultWithDelay(func() (bool, error) {
  1641  		tcs, _ := channelz.GetTopChannels(0, 0)
  1642  		if len(tcs) != 1 {
  1643  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1644  		}
  1645  		subChans := tcs[0].SubChans()
  1646  		if len(subChans) != 1 {
  1647  			return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
  1648  		}
  1649  		scm := channelz.GetSubChannel(subConn)
  1650  		if scm == nil {
  1651  			return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
  1652  		}
  1653  		trace := scm.Trace()
  1654  		if trace == nil {
  1655  			return false, fmt.Errorf("trace for SubChannel should not be empty")
  1656  		}
  1657  		if len(trace.Events) == 0 {
  1658  			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1659  		}
  1660  
  1661  		pattern := `Subchannel deleted`
  1662  		desc := trace.Events[len(trace.Events)-1].Desc
  1663  		if ok, _ := regexp.MatchString(pattern, desc); !ok {
  1664  			return false, fmt.Errorf("the last trace event should be %q, not %q", pattern, desc)
  1665  		}
  1666  		return true, nil
  1667  	}); err != nil {
  1668  		t.Fatal(err)
  1669  	}
  1670  }
  1671  
  1672  func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
  1673  	e := tcpClearRREnv
  1674  	e.balancer = ""
  1675  	te := newTest(t, e)
  1676  	te.startServer(&testServer{security: e.security})
  1677  	r := manual.NewBuilderWithScheme("whatever")
  1678  	addrs := []resolver.Address{{Addr: te.srvAddr}}
  1679  	r.InitialState(resolver.State{Addresses: addrs})
  1680  	te.resolverScheme = r.Scheme()
  1681  	te.clientConn(grpc.WithResolvers(r))
  1682  	defer te.tearDown()
  1683  	var cid int64
  1684  	// Here, we just wait for all sockets to be up. In the future, if we implement
  1685  	// IDLE, we may need to make several rpc calls to create the sockets.
  1686  	if err := verifyResultWithDelay(func() (bool, error) {
  1687  		tcs, _ := channelz.GetTopChannels(0, 0)
  1688  		if len(tcs) != 1 {
  1689  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1690  		}
  1691  		cid = tcs[0].ID
  1692  		trace := tcs[0].Trace()
  1693  		for i := len(trace.Events) - 1; i >= 0; i-- {
  1694  			if strings.Contains(trace.Events[i].Desc, "resolver returned new addresses") {
  1695  				break
  1696  			}
  1697  			if i == 0 {
  1698  				return false, fmt.Errorf("events do not contain expected address resolution from empty address state.  Got: %+v", trace.Events)
  1699  			}
  1700  		}
  1701  		return true, nil
  1702  	}); err != nil {
  1703  		t.Fatal(err)
  1704  	}
  1705  	r.UpdateState(resolver.State{
  1706  		Addresses:     addrs,
  1707  		ServiceConfig: parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`),
  1708  	})
  1709  
  1710  	if err := verifyResultWithDelay(func() (bool, error) {
  1711  		cm := channelz.GetChannel(cid)
  1712  		trace := cm.Trace()
  1713  		for i := len(trace.Events) - 1; i >= 0; i-- {
  1714  			if strings.Contains(trace.Events[i].Desc, fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name)) {
  1715  				break
  1716  			}
  1717  			if i == 0 {
  1718  				return false, fmt.Errorf("events do not contain expected address resolution change of LB policy")
  1719  			}
  1720  		}
  1721  		return true, nil
  1722  	}); err != nil {
  1723  		t.Fatal(err)
  1724  	}
  1725  
  1726  	newSC := parseServiceConfig(t, r, `{
  1727      "methodConfig": [
  1728          {
  1729              "name": [
  1730                  {
  1731                      "service": "grpc.testing.TestService",
  1732                      "method": "EmptyCall"
  1733                  }
  1734              ],
  1735              "waitForReady": false,
  1736              "timeout": ".001s"
  1737          }
  1738      ]
  1739  }`)
  1740  	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: newSC})
  1741  
  1742  	if err := verifyResultWithDelay(func() (bool, error) {
  1743  		cm := channelz.GetChannel(cid)
  1744  
  1745  		var es []string
  1746  		trace := cm.Trace()
  1747  		for i := len(trace.Events) - 1; i >= 0; i-- {
  1748  			if strings.Contains(trace.Events[i].Desc, "service config updated") {
  1749  				break
  1750  			}
  1751  			es = append(es, trace.Events[i].Desc)
  1752  			if i == 0 {
  1753  				return false, fmt.Errorf("events do not contain expected address resolution of new service config\n Events:\n%v", strings.Join(es, "\n"))
  1754  			}
  1755  		}
  1756  		return true, nil
  1757  	}); err != nil {
  1758  		t.Fatal(err)
  1759  	}
  1760  
  1761  	r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: newSC})
  1762  
  1763  	if err := verifyResultWithDelay(func() (bool, error) {
  1764  		cm := channelz.GetChannel(cid)
  1765  		trace := cm.Trace()
  1766  		for i := len(trace.Events) - 1; i >= 0; i-- {
  1767  			if strings.Contains(trace.Events[i].Desc, "resolver returned an empty address list") {
  1768  				break
  1769  			}
  1770  			if i == 0 {
  1771  				return false, fmt.Errorf("events do not contain expected address resolution of empty address")
  1772  			}
  1773  		}
  1774  		return true, nil
  1775  	}); err != nil {
  1776  		t.Fatal(err)
  1777  	}
  1778  }
  1779  
  1780  func (s) TestCZSubChannelPickedNewAddress(t *testing.T) {
  1781  	e := tcpClearRREnv
  1782  	e.balancer = ""
  1783  	te := newTest(t, e)
  1784  	te.startServers(&testServer{security: e.security}, 3)
  1785  	r := manual.NewBuilderWithScheme("whatever")
  1786  	var svrAddrs []resolver.Address
  1787  	for _, a := range te.srvAddrs {
  1788  		svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
  1789  	}
  1790  	r.InitialState(resolver.State{Addresses: svrAddrs})
  1791  	te.resolverScheme = r.Scheme()
  1792  	cc := te.clientConn(grpc.WithResolvers(r))
  1793  	defer te.tearDown()
  1794  	tc := testgrpc.NewTestServiceClient(cc)
  1795  	// make sure the connection is up
  1796  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1797  	defer cancel()
  1798  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1799  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1800  	}
  1801  	te.srvs[0].Stop()
  1802  	te.srvs[1].Stop()
  1803  	// Here, we just wait for all sockets to be up. Make several rpc calls to
  1804  	// create the sockets since we do not automatically reconnect.
  1805  	done := make(chan struct{})
  1806  	defer close(done)
  1807  	go func() {
  1808  		for {
  1809  			tc.EmptyCall(ctx, &testpb.Empty{})
  1810  			select {
  1811  			case <-time.After(10 * time.Millisecond):
  1812  			case <-done:
  1813  				return
  1814  			}
  1815  		}
  1816  	}()
  1817  	if err := verifyResultWithDelay(func() (bool, error) {
  1818  		tcs, _ := channelz.GetTopChannels(0, 0)
  1819  		if len(tcs) != 1 {
  1820  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1821  		}
  1822  		subChans := tcs[0].SubChans()
  1823  		if len(subChans) != 1 {
  1824  			return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
  1825  		}
  1826  		var subConn int64
  1827  		for k := range subChans {
  1828  			subConn = k
  1829  		}
  1830  		scm := channelz.GetSubChannel(subConn)
  1831  		trace := scm.Trace()
  1832  		if trace == nil {
  1833  			return false, fmt.Errorf("trace for SubChannel should not be empty")
  1834  		}
  1835  		if len(trace.Events) == 0 {
  1836  			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1837  		}
  1838  		for i := len(trace.Events) - 1; i >= 0; i-- {
  1839  			if strings.Contains(trace.Events[i].Desc, fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2])) {
  1840  				break
  1841  			}
  1842  			if i == 0 {
  1843  				return false, fmt.Errorf("events do not contain expected address resolution of subchannel picked new address")
  1844  			}
  1845  		}
  1846  		return true, nil
  1847  	}); err != nil {
  1848  		t.Fatal(err)
  1849  	}
  1850  }
  1851  
  1852  func (s) TestCZSubChannelConnectivityState(t *testing.T) {
  1853  	e := tcpClearRREnv
  1854  	te := newTest(t, e)
  1855  	te.startServer(&testServer{security: e.security})
  1856  	r := manual.NewBuilderWithScheme("whatever")
  1857  	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
  1858  	te.resolverScheme = r.Scheme()
  1859  	cc := te.clientConn(grpc.WithResolvers(r))
  1860  	defer te.tearDown()
  1861  	tc := testgrpc.NewTestServiceClient(cc)
  1862  	// make sure the connection is up
  1863  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1864  	defer cancel()
  1865  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1866  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1867  	}
  1868  	te.srv.Stop()
  1869  
  1870  	var subConn int64
  1871  	if err := verifyResultWithDelay(func() (bool, error) {
  1872  		// we need to obtain the SubChannel id before it gets deleted from Channel's children list (due
  1873  		// to effect of r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}))
  1874  		if subConn == 0 {
  1875  			tcs, _ := channelz.GetTopChannels(0, 0)
  1876  			if len(tcs) != 1 {
  1877  				return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1878  			}
  1879  			subChans := tcs[0].SubChans()
  1880  			if len(subChans) != 1 {
  1881  				return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
  1882  			}
  1883  			for k := range subChans {
  1884  				// get the SubChannel id for further trace inquiry.
  1885  				subConn = k
  1886  				t.Logf("SubChannel Id is %d", subConn)
  1887  			}
  1888  		}
  1889  		scm := channelz.GetSubChannel(subConn)
  1890  		if scm == nil {
  1891  			return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
  1892  		}
  1893  		trace := scm.Trace()
  1894  		if trace == nil {
  1895  			return false, fmt.Errorf("trace for SubChannel should not be empty")
  1896  		}
  1897  		if len(trace.Events) == 0 {
  1898  			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1899  		}
  1900  		var ready, connecting, transient, shutdown int
  1901  		t.Log("SubChannel trace events seen so far...")
  1902  		for _, e := range trace.Events {
  1903  			t.Log(e.Desc)
  1904  			if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure)) {
  1905  				transient++
  1906  			}
  1907  		}
  1908  		// Make sure the SubChannel has already seen transient failure before shutting it down through
  1909  		// r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}).
  1910  		if transient == 0 {
  1911  			return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
  1912  		}
  1913  		transient = 0
  1914  		r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
  1915  		t.Log("SubChannel trace events seen so far...")
  1916  		for _, e := range trace.Events {
  1917  			t.Log(e.Desc)
  1918  			if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready)) {
  1919  				ready++
  1920  			}
  1921  			if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting)) {
  1922  				connecting++
  1923  			}
  1924  			if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure)) {
  1925  				transient++
  1926  			}
  1927  			if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown)) {
  1928  				shutdown++
  1929  			}
  1930  		}
  1931  		// example:
  1932  		// Subchannel Created
  1933  		// Subchannel's connectivity state changed to CONNECTING
  1934  		// Subchannel picked a new address: "localhost:36011"
  1935  		// Subchannel's connectivity state changed to READY
  1936  		// Subchannel's connectivity state changed to TRANSIENT_FAILURE
  1937  		// Subchannel's connectivity state changed to CONNECTING
  1938  		// Subchannel picked a new address: "localhost:36011"
  1939  		// Subchannel's connectivity state changed to SHUTDOWN
  1940  		// Subchannel Deleted
  1941  		if ready != 1 || connecting < 1 || transient < 1 || shutdown != 1 {
  1942  			return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, shutdown = %d, want: 1, >=1, >=1, 1", ready, connecting, transient, shutdown)
  1943  		}
  1944  
  1945  		return true, nil
  1946  	}); err != nil {
  1947  		t.Fatal(err)
  1948  	}
  1949  }
  1950  
  1951  func (s) TestCZChannelConnectivityState(t *testing.T) {
  1952  	e := tcpClearRREnv
  1953  	te := newTest(t, e)
  1954  	te.startServer(&testServer{security: e.security})
  1955  	r := manual.NewBuilderWithScheme("whatever")
  1956  	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
  1957  	te.resolverScheme = r.Scheme()
  1958  	cc := te.clientConn(grpc.WithResolvers(r))
  1959  	defer te.tearDown()
  1960  	tc := testgrpc.NewTestServiceClient(cc)
  1961  	// make sure the connection is up
  1962  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1963  	defer cancel()
  1964  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1965  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1966  	}
  1967  	te.srv.Stop()
  1968  
  1969  	if err := verifyResultWithDelay(func() (bool, error) {
  1970  		tcs, _ := channelz.GetTopChannels(0, 0)
  1971  		if len(tcs) != 1 {
  1972  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1973  		}
  1974  
  1975  		var ready, connecting, transient int
  1976  		t.Log("Channel trace events seen so far...")
  1977  		for _, e := range tcs[0].Trace().Events {
  1978  			t.Log(e.Desc)
  1979  			if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready)) {
  1980  				ready++
  1981  			}
  1982  			if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting)) {
  1983  				connecting++
  1984  			}
  1985  			if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure)) {
  1986  				transient++
  1987  			}
  1988  		}
  1989  
  1990  		// example:
  1991  		// Channel Created
  1992  		// Adressses resolved (from empty address state): "localhost:40467"
  1993  		// SubChannel (id: 4[]) Created
  1994  		// Channel's connectivity state changed to CONNECTING
  1995  		// Channel's connectivity state changed to READY
  1996  		// Channel's connectivity state changed to TRANSIENT_FAILURE
  1997  		// Channel's connectivity state changed to CONNECTING
  1998  		// Channel's connectivity state changed to TRANSIENT_FAILURE
  1999  		if ready != 1 || connecting < 1 || transient < 1 {
  2000  			return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, want: 1, >=1, >=1", ready, connecting, transient)
  2001  		}
  2002  		return true, nil
  2003  	}); err != nil {
  2004  		t.Fatal(err)
  2005  	}
  2006  }
  2007  
  2008  func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
  2009  	e := tcpClearRREnv
  2010  	e.balancer = ""
  2011  	te := newTest(t, e)
  2012  	channelz.SetMaxTraceEntry(1)
  2013  	defer channelz.ResetMaxTraceEntryToDefault()
  2014  	r := manual.NewBuilderWithScheme("whatever")
  2015  	te.resolverScheme = r.Scheme()
  2016  	te.clientConn(grpc.WithResolvers(r))
  2017  	resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", ServerName: "grpclb.server"}}
  2018  	grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
  2019  	r.UpdateState(grpclbstate.Set(resolver.State{ServiceConfig: grpclbConfig}, &grpclbstate.State{BalancerAddresses: resolvedAddrs}))
  2020  	defer te.tearDown()
  2021  	var nestedConn int64
  2022  	if err := verifyResultWithDelay(func() (bool, error) {
  2023  		tcs, _ := channelz.GetTopChannels(0, 0)
  2024  		if len(tcs) != 1 {
  2025  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  2026  		}
  2027  		nestedChans := tcs[0].NestedChans()
  2028  		if len(nestedChans) != 1 {
  2029  			return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(nestedChans))
  2030  		}
  2031  		for k := range nestedChans {
  2032  			nestedConn = k
  2033  		}
  2034  		return true, nil
  2035  	}); err != nil {
  2036  		t.Fatal(err)
  2037  	}
  2038  
  2039  	r.UpdateState(resolver.State{
  2040  		Addresses:     []resolver.Address{{Addr: "127.0.0.1:0"}},
  2041  		ServiceConfig: parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`),
  2042  	})
  2043  
  2044  	// wait for the shutdown of grpclb balancer
  2045  	if err := verifyResultWithDelay(func() (bool, error) {
  2046  		tcs, _ := channelz.GetTopChannels(0, 0)
  2047  		if len(tcs) != 1 {
  2048  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  2049  		}
  2050  
  2051  		if nestedChans := tcs[0].NestedChans(); len(nestedChans) != 0 {
  2052  			return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(nestedChans))
  2053  		}
  2054  		return true, nil
  2055  	}); err != nil {
  2056  		t.Fatal(err)
  2057  	}
  2058  
  2059  	// If nested channel deletion is last trace event before the next validation, it will fail, as the top channel will hold a reference to it.
  2060  	// This line forces a trace event on the top channel in that case.
  2061  	r.UpdateState(resolver.State{
  2062  		Addresses:     []resolver.Address{{Addr: "127.0.0.1:0"}},
  2063  		ServiceConfig: parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`),
  2064  	})
  2065  
  2066  	// verify that the nested channel no longer exist due to trace referencing it got overwritten.
  2067  	if err := verifyResultWithDelay(func() (bool, error) {
  2068  		cm := channelz.GetChannel(nestedConn)
  2069  		if cm != nil {
  2070  			return false, fmt.Errorf("nested channel should have been deleted since its parent's trace should not contain any reference to it anymore")
  2071  		}
  2072  		return true, nil
  2073  	}); err != nil {
  2074  		t.Fatal(err)
  2075  	}
  2076  }
  2077  
  2078  func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
  2079  	e := tcpClearRREnv
  2080  	te := newTest(t, e)
  2081  	channelz.SetMaxTraceEntry(1)
  2082  	defer channelz.ResetMaxTraceEntryToDefault()
  2083  	te.startServer(&testServer{security: e.security})
  2084  	r := manual.NewBuilderWithScheme("whatever")
  2085  	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
  2086  	te.resolverScheme = r.Scheme()
  2087  	te.clientConn(grpc.WithResolvers(r))
  2088  	defer te.tearDown()
  2089  	var subConn int64
  2090  	// Here, we just wait for all sockets to be up. In the future, if we implement
  2091  	// IDLE, we may need to make several rpc calls to create the sockets.
  2092  	if err := verifyResultWithDelay(func() (bool, error) {
  2093  		tcs, _ := channelz.GetTopChannels(0, 0)
  2094  		if len(tcs) != 1 {
  2095  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  2096  		}
  2097  		subChans := tcs[0].SubChans()
  2098  		if len(subChans) != 1 {
  2099  			return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
  2100  		}
  2101  		for k := range subChans {
  2102  			subConn = k
  2103  		}
  2104  		return true, nil
  2105  	}); err != nil {
  2106  		t.Fatal(err)
  2107  	}
  2108  
  2109  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2110  	defer cancel()
  2111  	testutils.AwaitState(ctx, t, te.cc, connectivity.Ready)
  2112  	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
  2113  	testutils.AwaitNotState(ctx, t, te.cc, connectivity.Ready)
  2114  
  2115  	// verify that the subchannel no longer exist due to trace referencing it got overwritten.
  2116  	if err := verifyResultWithDelay(func() (bool, error) {
  2117  		cm := channelz.GetChannel(subConn)
  2118  		if cm != nil {
  2119  			return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
  2120  		}
  2121  		return true, nil
  2122  	}); err != nil {
  2123  		t.Fatal(err)
  2124  	}
  2125  }
  2126  
  2127  func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
  2128  	e := tcpClearRREnv
  2129  	te := newTest(t, e)
  2130  	te.startServer(&testServer{security: e.security})
  2131  	r := manual.NewBuilderWithScheme("whatever")
  2132  	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
  2133  	te.resolverScheme = r.Scheme()
  2134  	te.clientConn(grpc.WithResolvers(r))
  2135  	var subConn int64
  2136  	// Here, we just wait for all sockets to be up. In the future, if we implement
  2137  	// IDLE, we may need to make several rpc calls to create the sockets.
  2138  	if err := verifyResultWithDelay(func() (bool, error) {
  2139  		tcs, _ := channelz.GetTopChannels(0, 0)
  2140  		if len(tcs) != 1 {
  2141  			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  2142  		}
  2143  		subChans := tcs[0].SubChans()
  2144  		if len(subChans) != 1 {
  2145  			return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
  2146  		}
  2147  		for k := range subChans {
  2148  			subConn = k
  2149  		}
  2150  		return true, nil
  2151  	}); err != nil {
  2152  		t.Fatal(err)
  2153  	}
  2154  	te.tearDown()
  2155  	// verify that the subchannel no longer exist due to parent channel got deleted and its trace cleared.
  2156  	if err := verifyResultWithDelay(func() (bool, error) {
  2157  		cm := channelz.GetChannel(subConn)
  2158  		if cm != nil {
  2159  			return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
  2160  		}
  2161  		return true, nil
  2162  	}); err != nil {
  2163  		t.Fatal(err)
  2164  	}
  2165  }
  2166  

View as plain text