1
16
17 package apimachinery
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "io"
24 "net/http"
25 "net/http/httptest"
26 "net/http/httputil"
27 "net/url"
28 "strings"
29 "testing"
30 "time"
31
32 "golang.org/x/net/websocket"
33
34 corev1 "k8s.io/api/core/v1"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/apimachinery/pkg/runtime"
37 "k8s.io/apimachinery/pkg/watch"
38 "k8s.io/client-go/kubernetes"
39 restclient "k8s.io/client-go/rest"
40 "k8s.io/client-go/tools/cache"
41 kubectlproxy "k8s.io/kubectl/pkg/proxy"
42 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
43 "k8s.io/kubernetes/test/integration/framework"
44 )
45
46 type extractRT struct {
47 http.Header
48 }
49
50 func (rt *extractRT) RoundTrip(req *http.Request) (*http.Response, error) {
51 rt.Header = req.Header
52 return &http.Response{}, nil
53 }
54
55
56
57 func headersForConfig(c *restclient.Config, url *url.URL) (http.Header, error) {
58 extract := &extractRT{}
59 rt, err := restclient.HTTPWrappersForConfig(c, extract)
60 if err != nil {
61 return nil, err
62 }
63 request, err := http.NewRequest("GET", url.String(), nil)
64 if err != nil {
65 return nil, err
66 }
67 if _, err := rt.RoundTrip(request); err != nil {
68 return nil, err
69 }
70 return extract.Header, nil
71 }
72
73
74
75 func websocketConfig(url *url.URL, config *restclient.Config, protocols []string) (*websocket.Config, error) {
76 tlsConfig, err := restclient.TLSConfigFor(config)
77 if err != nil {
78 return nil, fmt.Errorf("Failed to create tls config: %v", err)
79 }
80 if url.Scheme == "https" {
81 url.Scheme = "wss"
82 } else {
83 url.Scheme = "ws"
84 }
85 headers, err := headersForConfig(config, url)
86 if err != nil {
87 return nil, fmt.Errorf("Failed to load http headers: %v", err)
88 }
89 cfg, err := websocket.NewConfig(url.String(), "http://localhost")
90 if err != nil {
91 return nil, fmt.Errorf("Failed to create websocket config: %v", err)
92 }
93 cfg.Header = headers
94 cfg.TlsConfig = tlsConfig
95 cfg.Protocol = protocols
96 return cfg, err
97 }
98
99 func TestWebsocketWatchClientTimeout(t *testing.T) {
100
101 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
102 defer server.TearDownFn()
103
104
105 service := &corev1.Service{
106 ObjectMeta: metav1.ObjectMeta{Name: "test"},
107 Spec: corev1.ServiceSpec{
108 Ports: []corev1.ServicePort{{Name: "http", Port: 80}},
109 },
110 }
111 configmap := &corev1.ConfigMap{
112 ObjectMeta: metav1.ObjectMeta{Name: "test"},
113 }
114 clientset, err := kubernetes.NewForConfig(server.ClientConfig)
115 if err != nil {
116 t.Fatal(err)
117 }
118 if _, err := clientset.CoreV1().Services("default").Create(context.TODO(), service, metav1.CreateOptions{}); err != nil {
119 t.Fatal(err)
120 }
121 if _, err := clientset.CoreV1().ConfigMaps("default").Create(context.TODO(), configmap, metav1.CreateOptions{}); err != nil {
122 t.Fatal(err)
123 }
124
125 testcases := []struct {
126 name string
127 path string
128 timeout time.Duration
129 expectResult string
130 }{
131 {
132 name: "configmaps",
133 path: "/api/v1/configmaps?watch=true&timeoutSeconds=5",
134 timeout: 10 * time.Second,
135 expectResult: `"name":"test"`,
136 },
137 {
138 name: "services",
139 path: "/api/v1/services?watch=true&timeoutSeconds=5",
140 timeout: 10 * time.Second,
141 expectResult: `"name":"test"`,
142 },
143 }
144
145 for _, tc := range testcases {
146 t.Run(tc.name, func(t *testing.T) {
147 url, err := url.Parse(server.ClientConfig.Host + tc.path)
148 if err != nil {
149 t.Fatal(err)
150 }
151 wsc, err := websocketConfig(url, server.ClientConfig, nil)
152 if err != nil {
153 t.Fatal(err)
154 }
155
156 wsConn, err := websocket.DialConfig(wsc)
157 if err != nil {
158 t.Fatal(err)
159 }
160 defer wsConn.Close()
161
162 resultCh := make(chan string)
163 go func() {
164 defer close(resultCh)
165 buf := &bytes.Buffer{}
166 for {
167 var msg []byte
168 if err := websocket.Message.Receive(wsConn, &msg); err != nil {
169 if err == io.EOF {
170 resultCh <- buf.String()
171 return
172 }
173 if !t.Failed() {
174
175 t.Errorf("Failed to read completely from websocket %v", err)
176 }
177 return
178 }
179 if len(msg) == 0 {
180 t.Logf("zero-length message")
181 continue
182 }
183 t.Logf("Read %v %v", len(msg), string(msg))
184 buf.Write(msg)
185 }
186 }()
187
188 select {
189 case resultString := <-resultCh:
190 if !strings.Contains(resultString, tc.expectResult) {
191 t.Fatalf("Unexpected result:\n%s", resultString)
192 }
193 case <-time.After(tc.timeout):
194 t.Fatalf("hit timeout before connection closed")
195 }
196 })
197 }
198 }
199
200 func TestWatchClientTimeoutXXX(t *testing.T) {
201
202 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
203 defer server.TearDownFn()
204
205 t.Run("direct", func(t *testing.T) {
206 t.Logf("client at %s", server.ClientConfig.Host)
207 testWatchClientTimeouts(t, restclient.CopyConfig(server.ClientConfig))
208 })
209
210 t.Run("reverse proxy", func(t *testing.T) {
211 u, _ := url.Parse(server.ClientConfig.Host)
212 proxy := httputil.NewSingleHostReverseProxy(u)
213 proxy.FlushInterval = -1
214
215 transport, err := restclient.TransportFor(server.ClientConfig)
216 if err != nil {
217 t.Fatal(err)
218 }
219 proxy.Transport = transport
220
221 proxyServer := httptest.NewServer(proxy)
222 defer proxyServer.Close()
223
224 t.Logf("client to %s, backend at %s", proxyServer.URL, server.ClientConfig.Host)
225 testWatchClientTimeouts(t, &restclient.Config{Host: proxyServer.URL})
226 })
227
228 t.Run("kubectl proxy", func(t *testing.T) {
229 kubectlProxyServer, err := kubectlproxy.NewServer("", "/", "/static/", nil, server.ClientConfig, 0, false)
230 if err != nil {
231 t.Fatal(err)
232 }
233 kubectlProxyListener, err := kubectlProxyServer.Listen("", 0)
234 if err != nil {
235 t.Fatal(err)
236 }
237 defer kubectlProxyListener.Close()
238 go kubectlProxyServer.ServeOnListener(kubectlProxyListener)
239
240 t.Logf("client to %s, backend at %s", kubectlProxyListener.Addr().String(), server.ClientConfig.Host)
241 testWatchClientTimeouts(t, &restclient.Config{Host: "http://" + kubectlProxyListener.Addr().String()})
242 })
243 }
244
245 func testWatchClientTimeouts(t *testing.T, config *restclient.Config) {
246 t.Run("timeout", func(t *testing.T) {
247 testWatchClientTimeout(t, config, time.Second, 0)
248 })
249 t.Run("timeoutSeconds", func(t *testing.T) {
250 testWatchClientTimeout(t, config, 0, time.Second)
251 })
252 t.Run("timeout+timeoutSeconds", func(t *testing.T) {
253 testWatchClientTimeout(t, config, time.Second, time.Second)
254 })
255 }
256
257 func testWatchClientTimeout(t *testing.T, config *restclient.Config, timeout, timeoutSeconds time.Duration) {
258 config.Timeout = timeout
259 client, err := kubernetes.NewForConfig(config)
260 if err != nil {
261 t.Fatal(err)
262 }
263
264 listCount := 0
265 watchCount := 0
266 stopCh := make(chan struct{})
267 listWatch := &cache.ListWatch{
268 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
269 t.Logf("listing (version=%s continue=%s)", options.ResourceVersion, options.Continue)
270 listCount++
271 if listCount > 1 {
272 t.Errorf("listed more than once")
273 close(stopCh)
274 }
275 return client.CoreV1().ConfigMaps(metav1.NamespaceAll).List(context.TODO(), options)
276 },
277 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
278 t.Logf("watching (version=%s)", options.ResourceVersion)
279 if timeoutSeconds != 0 {
280 timeout := int64(timeoutSeconds / time.Second)
281 options.TimeoutSeconds = &timeout
282 }
283 watchCount++
284 if watchCount > 1 {
285
286 close(stopCh)
287 }
288 return client.CoreV1().ConfigMaps(metav1.NamespaceAll).Watch(context.TODO(), options)
289 },
290 }
291 _, informer := cache.NewIndexerInformer(listWatch, &corev1.ConfigMap{}, 30*time.Minute, cache.ResourceEventHandlerFuncs{}, cache.Indexers{})
292 informer.Run(stopCh)
293 select {
294 case <-stopCh:
295 case <-time.After(time.Minute):
296 t.Fatal("timeout")
297 }
298 }
299
View as plain text