...

Source file src/k8s.io/kubernetes/pkg/kubelet/config/http.go

Documentation: k8s.io/kubernetes/pkg/kubelet/config

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package config
    18  
    19  import (
    20  	"bytes"
    21  	"fmt"
    22  	"net/http"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/util/wait"
    27  	api "k8s.io/kubernetes/pkg/apis/core"
    28  	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
    29  
    30  	"k8s.io/apimachinery/pkg/types"
    31  	"k8s.io/klog/v2"
    32  	utilio "k8s.io/utils/io"
    33  )
    34  
    35  type sourceURL struct {
    36  	url         string
    37  	header      http.Header
    38  	nodeName    types.NodeName
    39  	updates     chan<- interface{}
    40  	data        []byte
    41  	failureLogs int
    42  	client      *http.Client
    43  }
    44  
    45  // NewSourceURL specifies the URL where to read the Pod configuration from, then watches it for changes.
    46  func NewSourceURL(url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
    47  	config := &sourceURL{
    48  		url:      url,
    49  		header:   header,
    50  		nodeName: nodeName,
    51  		updates:  updates,
    52  		data:     nil,
    53  		// Timing out requests leads to retries. This client is only used to
    54  		// read the manifest URL passed to kubelet.
    55  		client: &http.Client{Timeout: 10 * time.Second},
    56  	}
    57  	klog.V(1).InfoS("Watching URL", "URL", url)
    58  	go wait.Until(config.run, period, wait.NeverStop)
    59  }
    60  
    61  func (s *sourceURL) run() {
    62  	if err := s.extractFromURL(); err != nil {
    63  		// Don't log this multiple times per minute. The first few entries should be
    64  		// enough to get the point across.
    65  		if s.failureLogs < 3 {
    66  			klog.InfoS("Failed to read pods from URL", "err", err)
    67  		} else if s.failureLogs == 3 {
    68  			klog.InfoS("Failed to read pods from URL. Dropping verbosity of this message to V(4)", "err", err)
    69  		} else {
    70  			klog.V(4).InfoS("Failed to read pods from URL", "err", err)
    71  		}
    72  		s.failureLogs++
    73  	} else {
    74  		if s.failureLogs > 0 {
    75  			klog.InfoS("Successfully read pods from URL")
    76  			s.failureLogs = 0
    77  		}
    78  	}
    79  }
    80  
    81  func (s *sourceURL) applyDefaults(pod *api.Pod) error {
    82  	return applyDefaults(pod, s.url, false, s.nodeName)
    83  }
    84  
    85  func (s *sourceURL) extractFromURL() error {
    86  	req, err := http.NewRequest("GET", s.url, nil)
    87  	if err != nil {
    88  		return err
    89  	}
    90  	req.Header = s.header
    91  	resp, err := s.client.Do(req)
    92  	if err != nil {
    93  		return err
    94  	}
    95  	defer resp.Body.Close()
    96  	data, err := utilio.ReadAtMost(resp.Body, maxConfigLength)
    97  	if err != nil {
    98  		return err
    99  	}
   100  	if resp.StatusCode != http.StatusOK {
   101  		return fmt.Errorf("%v: %v", s.url, resp.Status)
   102  	}
   103  	if len(data) == 0 {
   104  		// Emit an update with an empty PodList to allow HTTPSource to be marked as seen
   105  		s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
   106  		return fmt.Errorf("zero-length data received from %v", s.url)
   107  	}
   108  	// Short circuit if the data has not changed since the last time it was read.
   109  	if bytes.Equal(data, s.data) {
   110  		return nil
   111  	}
   112  	s.data = data
   113  
   114  	// First try as it is a single pod.
   115  	parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.applyDefaults)
   116  	if parsed {
   117  		if singlePodErr != nil {
   118  			// It parsed but could not be used.
   119  			return singlePodErr
   120  		}
   121  		s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
   122  		return nil
   123  	}
   124  
   125  	// That didn't work, so try a list of pods.
   126  	parsed, podList, multiPodErr := tryDecodePodList(data, s.applyDefaults)
   127  	if parsed {
   128  		if multiPodErr != nil {
   129  			// It parsed but could not be used.
   130  			return multiPodErr
   131  		}
   132  		pods := make([]*v1.Pod, 0, len(podList.Items))
   133  		for i := range podList.Items {
   134  			pods = append(pods, &podList.Items[i])
   135  		}
   136  		s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
   137  		return nil
   138  	}
   139  
   140  	return fmt.Errorf("%v: received '%v', but couldn't parse as "+
   141  		"single (%v) or multiple pods (%v)",
   142  		s.url, string(data), singlePodErr, multiPodErr)
   143  }
   144  

View as plain text