1
16
17 package staticpod
18
19 import (
20 "bytes"
21 "crypto/md5"
22 "fmt"
23 "hash"
24 "io"
25 "math"
26 "net/url"
27 "os"
28 "sort"
29 "strings"
30 "sync"
31
32 "github.com/pkg/errors"
33 "github.com/pmezard/go-difflib/difflib"
34
35 v1 "k8s.io/api/core/v1"
36 "k8s.io/apimachinery/pkg/api/resource"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/apimachinery/pkg/util/dump"
39 "k8s.io/apimachinery/pkg/util/intstr"
40
41 kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
42 kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
43 kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
44 "k8s.io/kubernetes/cmd/kubeadm/app/util/patches"
45 "k8s.io/kubernetes/cmd/kubeadm/app/util/users"
46 )
47
48 const (
49
50 kubeControllerManagerBindAddressArg = "bind-address"
51
52
53 kubeSchedulerBindAddressArg = "bind-address"
54 )
55
56 var (
57 usersAndGroups *users.UsersAndGroups
58 usersAndGroupsOnce sync.Once
59 )
60
61
62 func ComponentPod(container v1.Container, volumes map[string]v1.Volume, annotations map[string]string) v1.Pod {
63
64 priority := int32(2000001000)
65 return v1.Pod{
66 TypeMeta: metav1.TypeMeta{
67 APIVersion: "v1",
68 Kind: "Pod",
69 },
70 ObjectMeta: metav1.ObjectMeta{
71 Name: container.Name,
72 Namespace: metav1.NamespaceSystem,
73
74
75 Labels: map[string]string{"component": container.Name, "tier": kubeadmconstants.ControlPlaneTier},
76 Annotations: annotations,
77 },
78 Spec: v1.PodSpec{
79 Containers: []v1.Container{container},
80 Priority: &priority,
81 PriorityClassName: "system-node-critical",
82 HostNetwork: true,
83 Volumes: VolumeMapToSlice(volumes),
84 SecurityContext: &v1.PodSecurityContext{
85 SeccompProfile: &v1.SeccompProfile{
86 Type: v1.SeccompProfileTypeRuntimeDefault,
87 },
88 },
89 },
90 }
91 }
92
93
94 func ComponentResources(cpu string) v1.ResourceRequirements {
95 return v1.ResourceRequirements{
96 Requests: v1.ResourceList{
97 v1.ResourceCPU: resource.MustParse(cpu),
98 },
99 }
100 }
101
102
103 func NewVolume(name, path string, pathType *v1.HostPathType) v1.Volume {
104 return v1.Volume{
105 Name: name,
106 VolumeSource: v1.VolumeSource{
107 HostPath: &v1.HostPathVolumeSource{
108 Path: path,
109 Type: pathType,
110 },
111 },
112 }
113 }
114
115
116 func NewVolumeMount(name, path string, readOnly bool) v1.VolumeMount {
117 return v1.VolumeMount{
118 Name: name,
119 MountPath: path,
120 ReadOnly: readOnly,
121 }
122 }
123
124
125 func VolumeMapToSlice(volumes map[string]v1.Volume) []v1.Volume {
126 v := make([]v1.Volume, 0, len(volumes))
127
128 for _, vol := range volumes {
129 v = append(v, vol)
130 }
131
132 sort.Slice(v, func(i, j int) bool {
133 return strings.Compare(v[i].Name, v[j].Name) == -1
134 })
135
136 return v
137 }
138
139
140 func VolumeMountMapToSlice(volumeMounts map[string]v1.VolumeMount) []v1.VolumeMount {
141 v := make([]v1.VolumeMount, 0, len(volumeMounts))
142
143 for _, volMount := range volumeMounts {
144 v = append(v, volMount)
145 }
146
147 sort.Slice(v, func(i, j int) bool {
148 return strings.Compare(v[i].Name, v[j].Name) == -1
149 })
150
151 return v
152 }
153
154
155 func GetExtraParameters(overrides map[string]string, defaults map[string]string) []string {
156 var command []string
157 for k, v := range overrides {
158 if len(v) > 0 {
159 command = append(command, fmt.Sprintf("--%s=%s", k, v))
160 }
161 }
162 for k, v := range defaults {
163 if _, overrideExists := overrides[k]; !overrideExists {
164 command = append(command, fmt.Sprintf("--%s=%s", k, v))
165 }
166 }
167 return command
168 }
169
170
171 func PatchStaticPod(pod *v1.Pod, patchesDir string, output io.Writer) (*v1.Pod, error) {
172
173 podYAML, err := kubeadmutil.MarshalToYaml(pod, v1.SchemeGroupVersion)
174 if err != nil {
175 return pod, errors.Wrapf(err, "failed to marshal Pod manifest to YAML")
176 }
177
178 patchManager, err := patches.GetPatchManagerForPath(patchesDir, patches.KnownTargets(), output)
179 if err != nil {
180 return pod, err
181 }
182
183 patchTarget := &patches.PatchTarget{
184 Name: pod.Name,
185 StrategicMergePatchObject: v1.Pod{},
186 Data: podYAML,
187 }
188 if err := patchManager.ApplyPatchesToTarget(patchTarget); err != nil {
189 return pod, err
190 }
191
192 obj, err := kubeadmutil.UniversalUnmarshal(patchTarget.Data)
193 if err != nil {
194 return pod, errors.Wrap(err, "failed to unmarshal patched manifest")
195 }
196
197 pod2, ok := obj.(*v1.Pod)
198 if !ok {
199 return pod, errors.Wrap(err, "patched manifest is not a valid Pod object")
200 }
201
202 return pod2, nil
203 }
204
205
206 func WriteStaticPodToDisk(componentName, manifestDir string, pod v1.Pod) error {
207
208
209 if err := os.MkdirAll(manifestDir, 0700); err != nil {
210 return errors.Wrapf(err, "failed to create directory %q", manifestDir)
211 }
212
213
214 serialized, err := kubeadmutil.MarshalToYaml(&pod, v1.SchemeGroupVersion)
215 if err != nil {
216 return errors.Wrapf(err, "failed to marshal manifest for %q to YAML", componentName)
217 }
218
219 filename := kubeadmconstants.GetStaticPodFilepath(componentName, manifestDir)
220
221 if err := os.WriteFile(filename, serialized, 0600); err != nil {
222 return errors.Wrapf(err, "failed to write static pod manifest file for %q (%q)", componentName, filename)
223 }
224
225 return nil
226 }
227
228
229 func ReadStaticPodFromDisk(manifestPath string) (*v1.Pod, error) {
230 buf, err := os.ReadFile(manifestPath)
231 if err != nil {
232 return &v1.Pod{}, errors.Wrapf(err, "failed to read manifest for %q", manifestPath)
233 }
234
235 obj, err := kubeadmutil.UniversalUnmarshal(buf)
236 if err != nil {
237 return &v1.Pod{}, errors.Errorf("failed to unmarshal manifest for %q: %v", manifestPath, err)
238 }
239
240 pod, ok := obj.(*v1.Pod)
241 if !ok {
242 return &v1.Pod{}, errors.Errorf("failed to parse Pod object defined in %q", manifestPath)
243 }
244
245 return pod, nil
246 }
247
248
249 func LivenessProbe(host, path string, port int32, scheme v1.URIScheme) *v1.Probe {
250
251 return createHTTPProbe(host, path, port, scheme, 10, 15, 8, 10)
252 }
253
254
255 func ReadinessProbe(host, path string, port int32, scheme v1.URIScheme) *v1.Probe {
256
257
258 return createHTTPProbe(host, path, port, scheme, 0, 15, 3, 1)
259 }
260
261
262 func StartupProbe(host, path string, port int32, scheme v1.URIScheme, timeoutForControlPlane *metav1.Duration) *v1.Probe {
263 periodSeconds, timeoutForControlPlaneSeconds := int32(10), kubeadmconstants.ControlPlaneComponentHealthCheckTimeout.Seconds()
264 if timeoutForControlPlane != nil {
265 timeoutForControlPlaneSeconds = timeoutForControlPlane.Seconds()
266 }
267
268
269 failureThreshold := int32(math.Ceil(timeoutForControlPlaneSeconds / float64(periodSeconds)))
270
271 return createHTTPProbe(host, path, port, scheme, periodSeconds, 15, failureThreshold, periodSeconds)
272 }
273
274 func createHTTPProbe(host, path string, port int32, scheme v1.URIScheme, initialDelaySeconds, timeoutSeconds, failureThreshold, periodSeconds int32) *v1.Probe {
275 return &v1.Probe{
276 ProbeHandler: v1.ProbeHandler{
277 HTTPGet: &v1.HTTPGetAction{
278 Host: host,
279 Path: path,
280 Port: intstr.FromInt32(port),
281 Scheme: scheme,
282 },
283 },
284 InitialDelaySeconds: initialDelaySeconds,
285 TimeoutSeconds: timeoutSeconds,
286 FailureThreshold: failureThreshold,
287 PeriodSeconds: periodSeconds,
288 }
289 }
290
291
292 func GetAPIServerProbeAddress(endpoint *kubeadmapi.APIEndpoint) string {
293 if endpoint != nil && endpoint.AdvertiseAddress != "" {
294 return getProbeAddress(endpoint.AdvertiseAddress)
295 }
296
297 return "127.0.0.1"
298 }
299
300
301 func GetControllerManagerProbeAddress(cfg *kubeadmapi.ClusterConfiguration) string {
302 if addr, idx := kubeadmapi.GetArgValue(cfg.ControllerManager.ExtraArgs, kubeControllerManagerBindAddressArg, -1); idx > -1 {
303 return getProbeAddress(addr)
304 }
305 return "127.0.0.1"
306 }
307
308
309 func GetSchedulerProbeAddress(cfg *kubeadmapi.ClusterConfiguration) string {
310 if addr, idx := kubeadmapi.GetArgValue(cfg.Scheduler.ExtraArgs, kubeSchedulerBindAddressArg, -1); idx > -1 {
311 return getProbeAddress(addr)
312 }
313 return "127.0.0.1"
314 }
315
316
317
318
319 func GetEtcdProbeEndpoint(cfg *kubeadmapi.Etcd, isIPv6 bool) (string, int32, v1.URIScheme) {
320 localhost := "127.0.0.1"
321 if isIPv6 {
322 localhost = "::1"
323 }
324 if cfg.Local == nil || cfg.Local.ExtraArgs == nil {
325 return localhost, kubeadmconstants.EtcdMetricsPort, v1.URISchemeHTTP
326 }
327 if arg, idx := kubeadmapi.GetArgValue(cfg.Local.ExtraArgs, "listen-metrics-urls", -1); idx > -1 {
328
329 arg = strings.Split(arg, ",")[0]
330 parsedURL, err := url.Parse(arg)
331 if err != nil {
332 return localhost, kubeadmconstants.EtcdMetricsPort, v1.URISchemeHTTP
333 }
334
335 scheme := v1.URISchemeHTTP
336 if parsedURL.Scheme == "https" {
337 scheme = v1.URISchemeHTTPS
338 }
339
340 hostname := parsedURL.Hostname()
341 if len(hostname) == 0 {
342 hostname = localhost
343 }
344
345 port := kubeadmconstants.EtcdMetricsPort
346 portStr := parsedURL.Port()
347 if len(portStr) != 0 {
348 p, err := kubeadmutil.ParsePort(portStr)
349 if err == nil {
350 port = p
351 }
352 }
353 return hostname, int32(port), scheme
354 }
355 return localhost, kubeadmconstants.EtcdMetricsPort, v1.URISchemeHTTP
356 }
357
358
359 func ManifestFilesAreEqual(path1, path2 string) (bool, string, error) {
360 pod1, err := ReadStaticPodFromDisk(path1)
361 if err != nil {
362 return false, "", err
363 }
364 pod2, err := ReadStaticPodFromDisk(path2)
365 if err != nil {
366 return false, "", err
367 }
368
369 hasher := md5.New()
370 DeepHashObject(hasher, pod1)
371 hash1 := hasher.Sum(nil)[0:]
372 DeepHashObject(hasher, pod2)
373 hash2 := hasher.Sum(nil)[0:]
374 if bytes.Equal(hash1, hash2) {
375 return true, "", nil
376 }
377
378 manifest1, err := kubeadmutil.MarshalToYaml(pod1, v1.SchemeGroupVersion)
379 if err != nil {
380 return false, "", errors.Wrapf(err, "failed to marshal Pod manifest for %q to YAML", path1)
381 }
382
383 manifest2, err := kubeadmutil.MarshalToYaml(pod2, v1.SchemeGroupVersion)
384 if err != nil {
385 return false, "", errors.Wrapf(err, "failed to marshal Pod manifest for %q to YAML", path2)
386 }
387
388 diff := difflib.UnifiedDiff{
389 A: difflib.SplitLines(string(manifest1)),
390 B: difflib.SplitLines(string(manifest2)),
391 }
392
393 diffStr, err := difflib.GetUnifiedDiffString(diff)
394 if err != nil {
395 return false, "", errors.Wrapf(err, "failed to generate the differences between manifest %q and manifest %q", path1, path2)
396 }
397
398 return false, diffStr, nil
399 }
400
401
402
403
404
405
406
407 func getProbeAddress(addr string) string {
408 if addr == "0.0.0.0" || addr == "::" {
409 return ""
410 }
411 return addr
412 }
413
414
415
416 func GetUsersAndGroups() (*users.UsersAndGroups, error) {
417 var err error
418 usersAndGroupsOnce.Do(func() {
419 usersAndGroups, err = users.AddUsersAndGroups()
420 })
421 return usersAndGroups, err
422 }
423
424
425
426
427
428 func DeepHashObject(hasher hash.Hash, objectToWrite interface{}) {
429 hasher.Reset()
430 fmt.Fprintf(hasher, "%v", dump.ForHash(objectToWrite))
431 }
432
View as plain text