    17  package apimachinery
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"io"
    24  	"net/http"
    25  	"net/http/httptest"
    26  	"net/http/httputil"
    27  	"net/url"
    28  	"strings"
    29  	"testing"
    30  	"time"
    32  	"golang.org/x/net/websocket"
    34  	corev1 "k8s.io/api/core/v1"
    35  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    36  	"k8s.io/apimachinery/pkg/runtime"
    37  	"k8s.io/apimachinery/pkg/watch"
    38  	"k8s.io/client-go/kubernetes"
    39  	restclient "k8s.io/client-go/rest"
    40  	"k8s.io/client-go/tools/cache"
    41  	kubectlproxy "k8s.io/kubectl/pkg/proxy"
    42  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    43  	"k8s.io/kubernetes/test/integration/framework"
    44  )
    46  type extractRT struct {
    47  	http.Header
    48  }
    50  func (rt *extractRT) RoundTrip(req *http.Request) (*http.Response, error) {
    51  	rt.Header = req.Header
    52  	return &http.Response{}, nil
    53  }
    55  // headersForConfig extracts any http client logic necessary for the provided
    56  // config.
    57  func headersForConfig(c *restclient.Config, url *url.URL) (http.Header, error) {
    58  	extract := &extractRT{}
    59  	rt, err := restclient.HTTPWrappersForConfig(c, extract)
    60  	if err != nil {
    61  		return nil, err
    62  	}
    63  	request, err := http.NewRequest("GET", url.String(), nil)
    64  	if err != nil {
    65  		return nil, err
    66  	}
    67  	if _, err := rt.RoundTrip(request); err != nil {
    68  		return nil, err
    69  	}
    70  	return extract.Header, nil
    71  }
    73  // websocketConfig constructs a websocket config to the provided URL, using the client
    74  // config, with the specified protocols.
    75  func websocketConfig(url *url.URL, config *restclient.Config, protocols []string) (*websocket.Config, error) {
    76  	tlsConfig, err := restclient.TLSConfigFor(config)
    77  	if err != nil {
    78  		return nil, fmt.Errorf("Failed to create tls config: %v", err)
    79  	}
    80  	if url.Scheme == "https" {
    81  		url.Scheme = "wss"
    82  	} else {
    83  		url.Scheme = "ws"
    84  	}
    85  	headers, err := headersForConfig(config, url)
    86  	if err != nil {
    87  		return nil, fmt.Errorf("Failed to load http headers: %v", err)
    88  	}
    89  	cfg, err := websocket.NewConfig(url.String(), "http://localhost")
    90  	if err != nil {
    91  		return nil, fmt.Errorf("Failed to create websocket config: %v", err)
    92  	}
    93  	cfg.Header = headers
    94  	cfg.TlsConfig = tlsConfig
    95  	cfg.Protocol = protocols
    96  	return cfg, err
    97  }
    99  func TestWebsocketWatchClientTimeout(t *testing.T) {
   100  	// server setup
   101  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
   102  	defer server.TearDownFn()
   104  	// object setup
   105  	service := &corev1.Service{
   106  		ObjectMeta: metav1.ObjectMeta{Name: "test"},
   107  		Spec: corev1.ServiceSpec{
   108  			Ports: []corev1.ServicePort{{Name: "http", Port: 80}},
   109  		},
   110  	}
   111  	configmap := &corev1.ConfigMap{
   112  		ObjectMeta: metav1.ObjectMeta{Name: "test"},
   113  	}
   114  	clientset, err := kubernetes.NewForConfig(server.ClientConfig)
   115  	if err != nil {
   116  		t.Fatal(err)
   117  	}
   118  	if _, err := clientset.CoreV1().Services("default").Create(context.TODO(), service, metav1.CreateOptions{}); err != nil {
   119  		t.Fatal(err)
   120  	}
   121  	if _, err := clientset.CoreV1().ConfigMaps("default").Create(context.TODO(), configmap, metav1.CreateOptions{}); err != nil {
   122  		t.Fatal(err)
   123  	}
   125  	testcases := []struct {
   126  		name         string
   127  		path         string
   128  		timeout      time.Duration
   129  		expectResult string
   130  	}{
   131  		{
   132  			name:         "configmaps",
   133  			path:         "/api/v1/configmaps?watch=true&timeoutSeconds=5",
   134  			timeout:      10 * time.Second,
   135  			expectResult: `"name":"test"`,
   136  		},
   137  		{
   138  			name:         "services",
   139  			path:         "/api/v1/services?watch=true&timeoutSeconds=5",
   140  			timeout:      10 * time.Second,
   141  			expectResult: `"name":"test"`,
   142  		},
   143  	}
   145  	for _, tc := range testcases {
   146  		t.Run(tc.name, func(t *testing.T) {
   147  			url, err := url.Parse(server.ClientConfig.Host + tc.path)
   148  			if err != nil {
   149  				t.Fatal(err)
   150  			}
   151  			wsc, err := websocketConfig(url, server.ClientConfig, nil)
   152  			if err != nil {
   153  				t.Fatal(err)
   154  			}
   156  			wsConn, err := websocket.DialConfig(wsc)
   157  			if err != nil {
   158  				t.Fatal(err)
   159  			}
   160  			defer wsConn.Close()
   162  			resultCh := make(chan string)
   163  			go func() {
   164  				defer close(resultCh)
   165  				buf := &bytes.Buffer{}
   166  				for {
   167  					var msg []byte
   168  					if err := websocket.Message.Receive(wsConn, &msg); err != nil {
   169  						if err == io.EOF {
   170  							resultCh <- buf.String()
   171  							return
   172  						}
   173  						if !t.Failed() {
   174  							// if we didn't already fail, treat this as an error
   175  							t.Errorf("Failed to read completely from websocket %v", err)
   176  						}
   177  						return
   178  					}
   179  					if len(msg) == 0 {
   180  						t.Logf("zero-length message")
   181  						continue
   182  					}
   183  					t.Logf("Read %v %v", len(msg), string(msg))
   184  					buf.Write(msg)
   185  				}
   186  			}()
   188  			select {
   189  			case resultString := <-resultCh:
   190  				if !strings.Contains(resultString, tc.expectResult) {
   191  					t.Fatalf("Unexpected result:\n%s", resultString)
   192  				}
   193  			case <-time.After(tc.timeout):
   194  				t.Fatalf("hit timeout before connection closed")
   195  			}
   196  		})
   197  	}
   198  }
   200  func TestWatchClientTimeoutXXX(t *testing.T) {
   201  	// server setup
   202  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
   203  	defer server.TearDownFn()
   205  	t.Run("direct", func(t *testing.T) {
   206  		t.Logf("client at %s", server.ClientConfig.Host)
   207  		testWatchClientTimeouts(t, restclient.CopyConfig(server.ClientConfig))
   208  	})
   210  	t.Run("reverse proxy", func(t *testing.T) {
   211  		u, _ := url.Parse(server.ClientConfig.Host)
   212  		proxy := httputil.NewSingleHostReverseProxy(u)
   213  		proxy.FlushInterval = -1
   215  		transport, err := restclient.TransportFor(server.ClientConfig)
   216  		if err != nil {
   217  			t.Fatal(err)
   218  		}
   219  		proxy.Transport = transport
   221  		proxyServer := httptest.NewServer(proxy)
   222  		defer proxyServer.Close()
   224  		t.Logf("client to %s, backend at %s", proxyServer.URL, server.ClientConfig.Host)
   225  		testWatchClientTimeouts(t, &restclient.Config{Host: proxyServer.URL})
   226  	})
   228  	t.Run("kubectl proxy", func(t *testing.T) {
   229  		kubectlProxyServer, err := kubectlproxy.NewServer("", "/", "/static/", nil, server.ClientConfig, 0, false)
   230  		if err != nil {
   231  			t.Fatal(err)
   232  		}
   233  		kubectlProxyListener, err := kubectlProxyServer.Listen("", 0)
   234  		if err != nil {
   235  			t.Fatal(err)
   236  		}
   237  		defer kubectlProxyListener.Close()
   238  		go kubectlProxyServer.ServeOnListener(kubectlProxyListener)
   240  		t.Logf("client to %s, backend at %s", kubectlProxyListener.Addr().String(), server.ClientConfig.Host)
   241  		testWatchClientTimeouts(t, &restclient.Config{Host: "http://" + kubectlProxyListener.Addr().String()})
   242  	})
   243  }
   245  func testWatchClientTimeouts(t *testing.T, config *restclient.Config) {
   246  	t.Run("timeout", func(t *testing.T) {
   247  		testWatchClientTimeout(t, config, time.Second, 0)
   248  	})
   249  	t.Run("timeoutSeconds", func(t *testing.T) {
   250  		testWatchClientTimeout(t, config, 0, time.Second)
   251  	})
   252  	t.Run("timeout+timeoutSeconds", func(t *testing.T) {
   253  		testWatchClientTimeout(t, config, time.Second, time.Second)
   254  	})
   255  }
   257  func testWatchClientTimeout(t *testing.T, config *restclient.Config, timeout, timeoutSeconds time.Duration) {
   258  	config.Timeout = timeout
   259  	client, err := kubernetes.NewForConfig(config)
   260  	if err != nil {
   261  		t.Fatal(err)
   262  	}
   264  	listCount := 0
   265  	watchCount := 0
   266  	stopCh := make(chan struct{})
   267  	listWatch := &cache.ListWatch{
   268  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   269  			t.Logf("listing (version=%s continue=%s)", options.ResourceVersion, options.Continue)
   270  			listCount++
   271  			if listCount > 1 {
   272  				t.Errorf("listed more than once")
   273  				close(stopCh)
   274  			}
   275  			return client.CoreV1().ConfigMaps(metav1.NamespaceAll).List(context.TODO(), options)
   276  		},
   277  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   278  			t.Logf("watching (version=%s)", options.ResourceVersion)
   279  			if timeoutSeconds != 0 {
   280  				timeout := int64(timeoutSeconds / time.Second)
   281  				options.TimeoutSeconds = &timeout
   282  			}
   283  			watchCount++
   284  			if watchCount > 1 {
   285  				// success, restarted watch
   286  				close(stopCh)
   287  			}
   288  			return client.CoreV1().ConfigMaps(metav1.NamespaceAll).Watch(context.TODO(), options)
   289  		},
   290  	}
   291  	_, informer := cache.NewIndexerInformer(listWatch, &corev1.ConfigMap{}, 30*time.Minute, cache.ResourceEventHandlerFuncs{}, cache.Indexers{})
   292  	informer.Run(stopCh)
   293  	select {
   294  	case <-stopCh:
   295  	case <-time.After(time.Minute):
   296  		t.Fatal("timeout")
   297  	}
   298  }

