...

Source file src/k8s.io/kubernetes/test/e2e_node/kubeletconfig/kubeletconfig.go

Documentation: k8s.io/kubernetes/test/e2e_node/kubeletconfig

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubeletconfig
    18  
    19  import (
    20  	"context"
    21  	"crypto/tls"
    22  	"encoding/json"
    23  	"fmt"
    24  	"io"
    25  	"net/http"
    26  	"os"
    27  	"path/filepath"
    28  	"regexp"
    29  	"strconv"
    30  	"time"
    31  
    32  	"github.com/onsi/gomega"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
    35  	"k8s.io/kubernetes/pkg/cluster/ports"
    36  	kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
    37  	kubeletconfigscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
    38  	kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
    39  	"k8s.io/kubernetes/test/e2e/framework"
    40  	e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
    41  
    42  	"sigs.k8s.io/yaml"
    43  )
    44  
    45  func getKubeletConfigFilePath() (string, error) {
    46  	cwd, err := os.Getwd()
    47  	if err != nil {
    48  		return "", fmt.Errorf("failed to get current working directory: %w", err)
    49  	}
    50  
    51  	// DO NOT name this file "kubelet" - you will overwrite the kubelet binary and be very confused :)
    52  	return filepath.Join(cwd, "kubelet-config"), nil
    53  }
    54  
    55  // GetCurrentKubeletConfigFromFile returns the current kubelet configuration under the filesystem.
    56  // This method should only run together with e2e node tests, meaning the test executor and the cluster nodes is the
    57  // same machine
    58  func GetCurrentKubeletConfigFromFile() (*kubeletconfig.KubeletConfiguration, error) {
    59  	kubeletConfigFilePath, err := getKubeletConfigFilePath()
    60  	if err != nil {
    61  		return nil, err
    62  	}
    63  
    64  	data, err := os.ReadFile(kubeletConfigFilePath)
    65  	if err != nil {
    66  		return nil, fmt.Errorf("failed to get the kubelet config from the file %q: %w", kubeletConfigFilePath, err)
    67  	}
    68  
    69  	var kubeletConfigV1Beta1 kubeletconfigv1beta1.KubeletConfiguration
    70  	if err := yaml.Unmarshal(data, &kubeletConfigV1Beta1); err != nil {
    71  		return nil, fmt.Errorf("failed to unmarshal the kubelet config: %w", err)
    72  	}
    73  
    74  	scheme, _, err := kubeletconfigscheme.NewSchemeAndCodecs()
    75  	if err != nil {
    76  		return nil, err
    77  	}
    78  
    79  	kubeletConfig := kubeletconfig.KubeletConfiguration{}
    80  	err = scheme.Convert(&kubeletConfigV1Beta1, &kubeletConfig, nil)
    81  	if err != nil {
    82  		return nil, err
    83  	}
    84  
    85  	return &kubeletConfig, nil
    86  }
    87  
    88  // WriteKubeletConfigFile updates the kubelet configuration under the filesystem
    89  // This method should only run together with e2e node tests, meaning the test executor and the cluster nodes is the
    90  // same machine
    91  func WriteKubeletConfigFile(kubeletConfig *kubeletconfig.KubeletConfiguration) error {
    92  	data, err := kubeletconfigcodec.EncodeKubeletConfig(kubeletConfig, kubeletconfigv1beta1.SchemeGroupVersion)
    93  	if err != nil {
    94  		return err
    95  	}
    96  
    97  	kubeletConfigFilePath, err := getKubeletConfigFilePath()
    98  	if err != nil {
    99  		return err
   100  	}
   101  
   102  	if err := os.WriteFile(kubeletConfigFilePath, data, 0644); err != nil {
   103  		return fmt.Errorf("failed to write the kubelet file to %q: %w", kubeletConfigFilePath, err)
   104  	}
   105  
   106  	return nil
   107  }
   108  
   109  // GetCurrentKubeletConfig fetches the current Kubelet Config for the given node
   110  func GetCurrentKubeletConfig(ctx context.Context, nodeName, namespace string, useProxy bool, standaloneMode bool) (*kubeletconfig.KubeletConfiguration, error) {
   111  	resp := pollConfigz(ctx, 5*time.Minute, 5*time.Second, nodeName, namespace, useProxy, standaloneMode)
   112  	if len(resp) == 0 {
   113  		return nil, fmt.Errorf("failed to fetch /configz from %q", nodeName)
   114  	}
   115  	kubeCfg, err := decodeConfigz(resp)
   116  	if err != nil {
   117  		return nil, err
   118  	}
   119  	return kubeCfg, nil
   120  }
   121  
   122  // returns a status 200 response from the /configz endpoint or nil if fails
   123  func pollConfigz(ctx context.Context, timeout time.Duration, pollInterval time.Duration, nodeName, namespace string, useProxy bool, standaloneMode bool) []byte {
   124  	endpoint := ""
   125  	if useProxy {
   126  		// start local proxy, so we can send graceful deletion over query string, rather than body parameter
   127  		framework.Logf("Opening proxy to cluster")
   128  		tk := e2ekubectl.NewTestKubeconfig(framework.TestContext.CertDir, framework.TestContext.Host, framework.TestContext.KubeConfig, framework.TestContext.KubeContext, framework.TestContext.KubectlPath, namespace)
   129  		cmd := tk.KubectlCmd("proxy", "-p", "0")
   130  		stdout, stderr, err := framework.StartCmdAndStreamOutput(cmd)
   131  		framework.ExpectNoError(err)
   132  		defer stdout.Close()
   133  		defer stderr.Close()
   134  		defer framework.TryKill(cmd)
   135  
   136  		buf := make([]byte, 128)
   137  		var n int
   138  		n, err = stdout.Read(buf)
   139  		framework.ExpectNoError(err)
   140  		output := string(buf[:n])
   141  		proxyRegexp := regexp.MustCompile("Starting to serve on 127.0.0.1:([0-9]+)")
   142  		match := proxyRegexp.FindStringSubmatch(output)
   143  		gomega.Expect(match).To(gomega.HaveLen(2))
   144  		port, err := strconv.Atoi(match[1])
   145  		framework.ExpectNoError(err)
   146  		framework.Logf("http requesting node kubelet /configz")
   147  		endpoint = fmt.Sprintf("http://127.0.0.1:%d/api/v1/nodes/%s/proxy/configz", port, nodeName)
   148  	} else if !standaloneMode {
   149  		endpoint = fmt.Sprintf("%s/api/v1/nodes/%s/proxy/configz", framework.TestContext.Host, framework.TestContext.NodeName)
   150  	} else {
   151  		endpoint = fmt.Sprintf("https://127.0.0.1:%d/configz", ports.KubeletPort)
   152  	}
   153  	tr := &http.Transport{
   154  		TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
   155  	}
   156  	client := &http.Client{Transport: tr}
   157  	req, err := http.NewRequest("GET", endpoint, nil)
   158  	framework.ExpectNoError(err)
   159  	if !useProxy {
   160  		req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", framework.TestContext.BearerToken))
   161  	}
   162  	req.Header.Add("Accept", "application/json")
   163  
   164  	var respBody []byte
   165  	err = wait.PollUntilContextTimeout(ctx, pollInterval, timeout, true, func(ctx context.Context) (bool, error) {
   166  		resp, err := client.Do(req)
   167  		if err != nil {
   168  			framework.Logf("Failed to get /configz, retrying. Error: %v", err)
   169  			return false, nil
   170  		}
   171  		defer resp.Body.Close()
   172  
   173  		if resp.StatusCode != 200 {
   174  			framework.Logf("/configz response status not 200, retrying. Response was: %+v", resp)
   175  			return false, nil
   176  		}
   177  
   178  		respBody, err = io.ReadAll(resp.Body)
   179  		if err != nil {
   180  			framework.Logf("failed to read body from /configz response, retrying. Error: %v", err)
   181  			return false, nil
   182  		}
   183  
   184  		return true, nil
   185  	})
   186  	framework.ExpectNoError(err, "Failed to get successful response from /configz")
   187  
   188  	return respBody
   189  }
   190  
   191  // Decodes the http response from /configz and returns a kubeletconfig.KubeletConfiguration (internal type).
   192  func decodeConfigz(respBody []byte) (*kubeletconfig.KubeletConfiguration, error) {
   193  	// This hack because /configz reports the following structure:
   194  	// {"kubeletconfig": {the JSON representation of kubeletconfigv1beta1.KubeletConfiguration}}
   195  	type configzWrapper struct {
   196  		ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"`
   197  	}
   198  
   199  	configz := configzWrapper{}
   200  	kubeCfg := kubeletconfig.KubeletConfiguration{}
   201  
   202  	err := json.Unmarshal(respBody, &configz)
   203  	if err != nil {
   204  		return nil, err
   205  	}
   206  
   207  	scheme, _, err := kubeletconfigscheme.NewSchemeAndCodecs()
   208  	if err != nil {
   209  		return nil, err
   210  	}
   211  	err = scheme.Convert(&configz.ComponentConfig, &kubeCfg, nil)
   212  	if err != nil {
   213  		return nil, err
   214  	}
   215  
   216  	return &kubeCfg, nil
   217  }
   218  

View as plain text