1
16
17 package config
18
19 import (
20 "bytes"
21 "fmt"
22 "net/http"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/util/wait"
27 api "k8s.io/kubernetes/pkg/apis/core"
28 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
29
30 "k8s.io/apimachinery/pkg/types"
31 "k8s.io/klog/v2"
32 utilio "k8s.io/utils/io"
33 )
34
35 type sourceURL struct {
36 url string
37 header http.Header
38 nodeName types.NodeName
39 updates chan<- interface{}
40 data []byte
41 failureLogs int
42 client *http.Client
43 }
44
45
46 func NewSourceURL(url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
47 config := &sourceURL{
48 url: url,
49 header: header,
50 nodeName: nodeName,
51 updates: updates,
52 data: nil,
53
54
55 client: &http.Client{Timeout: 10 * time.Second},
56 }
57 klog.V(1).InfoS("Watching URL", "URL", url)
58 go wait.Until(config.run, period, wait.NeverStop)
59 }
60
61 func (s *sourceURL) run() {
62 if err := s.extractFromURL(); err != nil {
63
64
65 if s.failureLogs < 3 {
66 klog.InfoS("Failed to read pods from URL", "err", err)
67 } else if s.failureLogs == 3 {
68 klog.InfoS("Failed to read pods from URL. Dropping verbosity of this message to V(4)", "err", err)
69 } else {
70 klog.V(4).InfoS("Failed to read pods from URL", "err", err)
71 }
72 s.failureLogs++
73 } else {
74 if s.failureLogs > 0 {
75 klog.InfoS("Successfully read pods from URL")
76 s.failureLogs = 0
77 }
78 }
79 }
80
81 func (s *sourceURL) applyDefaults(pod *api.Pod) error {
82 return applyDefaults(pod, s.url, false, s.nodeName)
83 }
84
85 func (s *sourceURL) extractFromURL() error {
86 req, err := http.NewRequest("GET", s.url, nil)
87 if err != nil {
88 return err
89 }
90 req.Header = s.header
91 resp, err := s.client.Do(req)
92 if err != nil {
93 return err
94 }
95 defer resp.Body.Close()
96 data, err := utilio.ReadAtMost(resp.Body, maxConfigLength)
97 if err != nil {
98 return err
99 }
100 if resp.StatusCode != http.StatusOK {
101 return fmt.Errorf("%v: %v", s.url, resp.Status)
102 }
103 if len(data) == 0 {
104
105 s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
106 return fmt.Errorf("zero-length data received from %v", s.url)
107 }
108
109 if bytes.Equal(data, s.data) {
110 return nil
111 }
112 s.data = data
113
114
115 parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.applyDefaults)
116 if parsed {
117 if singlePodErr != nil {
118
119 return singlePodErr
120 }
121 s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
122 return nil
123 }
124
125
126 parsed, podList, multiPodErr := tryDecodePodList(data, s.applyDefaults)
127 if parsed {
128 if multiPodErr != nil {
129
130 return multiPodErr
131 }
132 pods := make([]*v1.Pod, 0, len(podList.Items))
133 for i := range podList.Items {
134 pods = append(pods, &podList.Items[i])
135 }
136 s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
137 return nil
138 }
139
140 return fmt.Errorf("%v: received '%v', but couldn't parse as "+
141 "single (%v) or multiple pods (%v)",
142 s.url, string(data), singlePodErr, multiPodErr)
143 }
144
View as plain text