...
1
18
19 package xds_test
20
21 import (
22 "context"
23 "fmt"
24 "testing"
25
26 "github.com/google/go-cmp/cmp"
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/credentials/insecure"
29 "google.golang.org/grpc/internal/grpcsync"
30 "google.golang.org/grpc/internal/stubserver"
31 "google.golang.org/grpc/internal/testutils"
32 "google.golang.org/grpc/internal/testutils/xds/e2e"
33
34 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
35 v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
36 testgrpc "google.golang.org/grpc/interop/grpc_testing"
37 testpb "google.golang.org/grpc/interop/grpc_testing"
38 )
39
40
41
42 const wantResources = 4
43
44
45
46
47 func seenAllACKs(acksVersions map[string]string, wantNonEmpty bool) bool {
48 if len(acksVersions) != wantResources {
49 return false
50 }
51 for _, ack := range acksVersions {
52 if wantNonEmpty && ack == "" {
53 return false
54 }
55 }
56 return true
57 }
58
59
60
61
62
63 func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
64
65 l, err := testutils.LocalTCPListener()
66 if err != nil {
67 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
68 }
69 lis := testutils.NewRestartableListener(l)
70
71
72
73 const (
74 idBeforeRestart = 1
75 idAfterRestart = 2
76 )
77
78
79
80 acksReceivedBeforeRestart := grpcsync.NewEvent()
81 streamRestarted := grpcsync.NewEvent()
82 acksReceivedAfterRestart := grpcsync.NewEvent()
83
84
85 ackVersionsMap := make(map[int64]map[string]string)
86 managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
87 Listener: lis,
88 OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
89
90
91
92
93
94
95 if acksReceivedAfterRestart.HasFired() || len(req.GetResourceNames()) == 0 {
96 return nil
97 }
98
99
100 if ackVersionsMap[id] == nil {
101 ackVersionsMap[id] = make(map[string]string)
102 }
103 ackVersionsMap[id][req.GetTypeUrl()] = req.GetVersionInfo()
104
105
106
107
108
109 if seenAllACKs(ackVersionsMap[idBeforeRestart], true) {
110 acksReceivedBeforeRestart.Fire()
111 }
112
113
114
115
116 if seenAllACKs(ackVersionsMap[idAfterRestart], false) {
117 acksReceivedAfterRestart.Fire()
118 }
119 return nil
120 },
121 OnStreamClosed: func(int64, *v3corepb.Node) {
122 streamRestarted.Fire()
123 },
124 })
125 defer cleanup1()
126
127 server := stubserver.StartTestService(t, nil)
128 defer server.Stop()
129
130 const serviceName = "my-service-client-side-xds"
131 resources := e2e.DefaultClientResources(e2e.ResourceParams{
132 DialTarget: serviceName,
133 NodeID: nodeID,
134 Host: "localhost",
135 Port: testutils.ParsePort(t, server.Address),
136 SecLevel: e2e.SecurityLevelNone,
137 })
138 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
139 defer cancel()
140 if err := managementServer.Update(ctx, resources); err != nil {
141 t.Fatal(err)
142 }
143
144
145 cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
146 if err != nil {
147 t.Fatalf("failed to dial local test server: %v", err)
148 }
149 defer cc.Close()
150
151 client := testgrpc.NewTestServiceClient(cc)
152 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
153 t.Fatalf("rpc EmptyCall() failed: %v", err)
154 }
155
156
157
158
159
160 select {
161 case <-ctx.Done():
162 t.Fatal("Timeout when waiting for all resources to be ACKed prior to stream restart")
163 case <-acksReceivedBeforeRestart.Done():
164 }
165
166
167
168 lis.Stop()
169
170
171 <-streamRestarted.Done()
172
173
174
175 lis.Restart()
176
177
178 select {
179 case <-ctx.Done():
180 t.Fatal("Timeout when waiting for all resources to be ACKed post stream restart")
181 case <-acksReceivedAfterRestart.Done():
182 }
183
184 if diff := cmp.Diff(ackVersionsMap[idBeforeRestart], ackVersionsMap[idAfterRestart]); diff != "" {
185 t.Fatalf("unexpected diff in ack versions before and after stream restart (-want, +got):\n%s", diff)
186 }
187 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
188 t.Fatalf("rpc EmptyCall() failed: %v", err)
189 }
190 }
191
View as plain text