1
16
17 package kubelet
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 goruntime "runtime"
24 "sort"
25 "strings"
26 "time"
27
28 v1 "k8s.io/api/core/v1"
29 apiequality "k8s.io/apimachinery/pkg/api/equality"
30 apierrors "k8s.io/apimachinery/pkg/api/errors"
31 "k8s.io/apimachinery/pkg/api/resource"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/types"
34 "k8s.io/apimachinery/pkg/util/sets"
35 cloudprovider "k8s.io/cloud-provider"
36 cloudproviderapi "k8s.io/cloud-provider/api"
37 nodeutil "k8s.io/component-helpers/node/util"
38 "k8s.io/klog/v2"
39 kubeletapis "k8s.io/kubelet/pkg/apis"
40 v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
41 "k8s.io/kubernetes/pkg/kubelet/events"
42 "k8s.io/kubernetes/pkg/kubelet/nodestatus"
43 "k8s.io/kubernetes/pkg/kubelet/util"
44 taintutil "k8s.io/kubernetes/pkg/util/taints"
45 volutil "k8s.io/kubernetes/pkg/volume/util"
46 )
47
48
49
50
51 func (kl *Kubelet) registerWithAPIServer() {
52 if kl.registrationCompleted {
53 return
54 }
55
56 kl.nodeStartupLatencyTracker.RecordAttemptRegisterNode()
57
58 step := 100 * time.Millisecond
59
60 for {
61 time.Sleep(step)
62 step = step * 2
63 if step >= 7*time.Second {
64 step = 7 * time.Second
65 }
66
67 node, err := kl.initialNode(context.TODO())
68 if err != nil {
69 klog.ErrorS(err, "Unable to construct v1.Node object for kubelet")
70 continue
71 }
72
73 klog.InfoS("Attempting to register node", "node", klog.KObj(node))
74 registered := kl.tryRegisterWithAPIServer(node)
75 if registered {
76 klog.InfoS("Successfully registered node", "node", klog.KObj(node))
77 kl.registrationCompleted = true
78 return
79 }
80 }
81 }
82
83
84
85
86
87
88 func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
89 _, err := kl.kubeClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
90 if err == nil {
91 kl.nodeStartupLatencyTracker.RecordRegisteredNewNode()
92 return true
93 }
94
95 if !apierrors.IsAlreadyExists(err) {
96 klog.ErrorS(err, "Unable to register node with API server", "node", klog.KObj(node))
97 return false
98 }
99
100 existingNode, err := kl.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), metav1.GetOptions{})
101 if err != nil {
102 klog.ErrorS(err, "Unable to register node with API server, error getting existing node", "node", klog.KObj(node))
103 return false
104 }
105 if existingNode == nil {
106 klog.InfoS("Unable to register node with API server, no node instance returned", "node", klog.KObj(node))
107 return false
108 }
109
110 originalNode := existingNode.DeepCopy()
111
112 klog.InfoS("Node was previously registered", "node", klog.KObj(node))
113
114
115
116
117 requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
118 requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
119 requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
120 requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
121 if requiresUpdate {
122 if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
123 klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
124 return false
125 }
126 }
127
128 return true
129 }
130
131
132 func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
133 requiresUpdate := updateDefaultResources(initialNode, existingNode)
134 supportedHugePageResources := sets.String{}
135
136 for resourceName := range initialNode.Status.Capacity {
137 if !v1helper.IsHugePageResourceName(resourceName) {
138 continue
139 }
140 supportedHugePageResources.Insert(string(resourceName))
141
142 initialCapacity := initialNode.Status.Capacity[resourceName]
143 initialAllocatable := initialNode.Status.Allocatable[resourceName]
144
145 capacity, resourceIsSupported := existingNode.Status.Capacity[resourceName]
146 allocatable := existingNode.Status.Allocatable[resourceName]
147
148
149 if !resourceIsSupported || capacity.Cmp(initialCapacity) != 0 {
150 existingNode.Status.Capacity[resourceName] = initialCapacity.DeepCopy()
151 requiresUpdate = true
152 }
153
154
155 if !resourceIsSupported || allocatable.Cmp(initialAllocatable) != 0 {
156 existingNode.Status.Allocatable[resourceName] = initialAllocatable.DeepCopy()
157 requiresUpdate = true
158 }
159
160 }
161
162 for resourceName := range existingNode.Status.Capacity {
163 if !v1helper.IsHugePageResourceName(resourceName) {
164 continue
165 }
166
167
168 if !supportedHugePageResources.Has(string(resourceName)) {
169 delete(existingNode.Status.Capacity, resourceName)
170 delete(existingNode.Status.Allocatable, resourceName)
171 klog.InfoS("Removing huge page resource which is no longer supported", "resourceName", resourceName)
172 requiresUpdate = true
173 }
174 }
175 return requiresUpdate
176 }
177
178
179 func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool {
180 requiresUpdate := updateDefaultResources(initialNode, node)
181
182 if kl.containerManager.ShouldResetExtendedResourceCapacity() {
183 for k := range node.Status.Capacity {
184 if v1helper.IsExtendedResourceName(k) {
185 klog.InfoS("Zero out resource capacity in existing node", "resourceName", k, "node", klog.KObj(node))
186 node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
187 node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
188 requiresUpdate = true
189 }
190 }
191 }
192 return requiresUpdate
193 }
194
195
196 func updateDefaultResources(initialNode, existingNode *v1.Node) bool {
197 requiresUpdate := false
198 if existingNode.Status.Capacity == nil {
199 if initialNode.Status.Capacity != nil {
200 existingNode.Status.Capacity = initialNode.Status.Capacity.DeepCopy()
201 requiresUpdate = true
202 } else {
203 existingNode.Status.Capacity = make(map[v1.ResourceName]resource.Quantity)
204 }
205 }
206
207 if existingNode.Status.Allocatable == nil {
208 if initialNode.Status.Allocatable != nil {
209 existingNode.Status.Allocatable = initialNode.Status.Allocatable.DeepCopy()
210 requiresUpdate = true
211 } else {
212 existingNode.Status.Allocatable = make(map[v1.ResourceName]resource.Quantity)
213 }
214 }
215 return requiresUpdate
216 }
217
218
219 func (kl *Kubelet) updateDefaultLabels(initialNode, existingNode *v1.Node) bool {
220 defaultLabels := []string{
221 v1.LabelHostname,
222 v1.LabelTopologyZone,
223 v1.LabelTopologyRegion,
224 v1.LabelFailureDomainBetaZone,
225 v1.LabelFailureDomainBetaRegion,
226 v1.LabelInstanceTypeStable,
227 v1.LabelInstanceType,
228 v1.LabelOSStable,
229 v1.LabelArchStable,
230 v1.LabelWindowsBuild,
231 kubeletapis.LabelOS,
232 kubeletapis.LabelArch,
233 }
234
235 needsUpdate := false
236 if existingNode.Labels == nil {
237 existingNode.Labels = make(map[string]string)
238 }
239
240 for _, label := range defaultLabels {
241 if _, hasInitialValue := initialNode.Labels[label]; !hasInitialValue {
242 continue
243 }
244
245 if existingNode.Labels[label] != initialNode.Labels[label] {
246 existingNode.Labels[label] = initialNode.Labels[label]
247 needsUpdate = true
248 }
249
250 if existingNode.Labels[label] == "" {
251 delete(existingNode.Labels, label)
252 }
253 }
254
255 return needsUpdate
256 }
257
258
259
260
261 func (kl *Kubelet) reconcileCMADAnnotationWithExistingNode(node, existingNode *v1.Node) bool {
262 var (
263 existingCMAAnnotation = existingNode.Annotations[volutil.ControllerManagedAttachAnnotation]
264 newCMAAnnotation, newSet = node.Annotations[volutil.ControllerManagedAttachAnnotation]
265 )
266
267 if newCMAAnnotation == existingCMAAnnotation {
268 return false
269 }
270
271
272
273
274 if !newSet {
275 klog.InfoS("Controller attach-detach setting changed to false; updating existing Node")
276 delete(existingNode.Annotations, volutil.ControllerManagedAttachAnnotation)
277 } else {
278 klog.InfoS("Controller attach-detach setting changed to true; updating existing Node")
279 if existingNode.Annotations == nil {
280 existingNode.Annotations = make(map[string]string)
281 }
282 existingNode.Annotations[volutil.ControllerManagedAttachAnnotation] = newCMAAnnotation
283 }
284
285 return true
286 }
287
288
289
290 func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
291 node := &v1.Node{
292 ObjectMeta: metav1.ObjectMeta{
293 Name: string(kl.nodeName),
294 Labels: map[string]string{
295 v1.LabelHostname: kl.hostname,
296 v1.LabelOSStable: goruntime.GOOS,
297 v1.LabelArchStable: goruntime.GOARCH,
298 kubeletapis.LabelOS: goruntime.GOOS,
299 kubeletapis.LabelArch: goruntime.GOARCH,
300 },
301 },
302 Spec: v1.NodeSpec{
303 Unschedulable: !kl.registerSchedulable,
304 },
305 }
306 osLabels, err := getOSSpecificLabels()
307 if err != nil {
308 return nil, err
309 }
310 for label, value := range osLabels {
311 node.Labels[label] = value
312 }
313
314 nodeTaints := make([]v1.Taint, len(kl.registerWithTaints))
315 copy(nodeTaints, kl.registerWithTaints)
316 unschedulableTaint := v1.Taint{
317 Key: v1.TaintNodeUnschedulable,
318 Effect: v1.TaintEffectNoSchedule,
319 }
320
321
322
323 if node.Spec.Unschedulable &&
324 !taintutil.TaintExists(nodeTaints, &unschedulableTaint) {
325 nodeTaints = append(nodeTaints, unschedulableTaint)
326 }
327
328 if kl.externalCloudProvider {
329 taint := v1.Taint{
330 Key: cloudproviderapi.TaintExternalCloudProvider,
331 Value: "true",
332 Effect: v1.TaintEffectNoSchedule,
333 }
334
335 nodeTaints = append(nodeTaints, taint)
336 }
337 if len(nodeTaints) > 0 {
338 node.Spec.Taints = nodeTaints
339 }
340
341 if kl.providerRequiresNetworkingConfiguration() {
342 node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
343 Type: v1.NodeNetworkUnavailable,
344 Status: v1.ConditionTrue,
345 Reason: "NoRouteCreated",
346 Message: "Node created without a route",
347 LastTransitionTime: metav1.NewTime(kl.clock.Now()),
348 })
349 }
350
351 if kl.enableControllerAttachDetach {
352 if node.Annotations == nil {
353 node.Annotations = make(map[string]string)
354 }
355
356 klog.V(2).InfoS("Setting node annotation to enable volume controller attach/detach")
357 node.Annotations[volutil.ControllerManagedAttachAnnotation] = "true"
358 } else {
359 klog.V(2).InfoS("Controller attach/detach is disabled for this node; Kubelet will attach and detach volumes")
360 }
361
362 if kl.keepTerminatedPodVolumes {
363 if node.Annotations == nil {
364 node.Annotations = make(map[string]string)
365 }
366 klog.V(2).InfoS("Setting node annotation to keep pod volumes of terminated pods attached to the node")
367 node.Annotations[volutil.KeepTerminatedPodVolumesAnnotation] = "true"
368 }
369
370
371 for k, v := range kl.nodeLabels {
372 if cv, found := node.ObjectMeta.Labels[k]; found {
373 klog.InfoS("the node label will overwrite default setting", "labelKey", k, "labelValue", v, "default", cv)
374 }
375 node.ObjectMeta.Labels[k] = v
376 }
377
378 if kl.providerID != "" {
379 node.Spec.ProviderID = kl.providerID
380 }
381
382 if kl.cloud != nil {
383 instances, ok := kl.cloud.Instances()
384 if !ok {
385 return nil, fmt.Errorf("failed to get instances from cloud provider")
386 }
387
388
389
390
391 var err error
392 if node.Spec.ProviderID == "" {
393 node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(ctx, kl.cloud, kl.nodeName)
394 if err != nil {
395 return nil, err
396 }
397 }
398
399 instanceType, err := instances.InstanceType(ctx, kl.nodeName)
400 if err != nil {
401 return nil, err
402 }
403 if instanceType != "" {
404 klog.InfoS("Adding label from cloud provider", "labelKey", v1.LabelInstanceType, "labelValue", instanceType)
405 node.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType
406 klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelInstanceTypeStable, "labelValue", instanceType)
407 node.ObjectMeta.Labels[v1.LabelInstanceTypeStable] = instanceType
408 }
409
410 zones, ok := kl.cloud.Zones()
411 if ok {
412 zone, err := zones.GetZone(ctx)
413 if err != nil {
414 return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err)
415 }
416 if zone.FailureDomain != "" {
417 klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelFailureDomainBetaZone, "labelValue", zone.FailureDomain)
418 node.ObjectMeta.Labels[v1.LabelFailureDomainBetaZone] = zone.FailureDomain
419 klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelTopologyZone, "labelValue", zone.FailureDomain)
420 node.ObjectMeta.Labels[v1.LabelTopologyZone] = zone.FailureDomain
421 }
422 if zone.Region != "" {
423 klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelFailureDomainBetaRegion, "labelValue", zone.Region)
424 node.ObjectMeta.Labels[v1.LabelFailureDomainBetaRegion] = zone.Region
425 klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelTopologyRegion, "labelValue", zone.Region)
426 node.ObjectMeta.Labels[v1.LabelTopologyRegion] = zone.Region
427 }
428 }
429 }
430
431 kl.setNodeStatus(ctx, node)
432
433 return node, nil
434 }
435
436
437
438
439
440
441 func (kl *Kubelet) fastNodeStatusUpdate(ctx context.Context, timeout bool) (completed bool) {
442 kl.syncNodeStatusMux.Lock()
443 defer func() {
444 kl.syncNodeStatusMux.Unlock()
445
446 if completed {
447
448
449 kl.updateRuntimeMux.Lock()
450 defer kl.updateRuntimeMux.Unlock()
451 kl.containerRuntimeReadyExpected = true
452 }
453 }()
454
455 if timeout {
456 klog.ErrorS(nil, "Node not becoming ready in time after startup")
457 return true
458 }
459
460 originalNode, err := kl.GetNode()
461 if err != nil {
462 klog.ErrorS(err, "Error getting the current node from lister")
463 return false
464 }
465
466 readyIdx, originalNodeReady := nodeutil.GetNodeCondition(&originalNode.Status, v1.NodeReady)
467 if readyIdx == -1 {
468 klog.ErrorS(nil, "Node does not have NodeReady condition", "originalNode", originalNode)
469 return false
470 }
471
472 if originalNodeReady.Status == v1.ConditionTrue {
473 return true
474 }
475
476
477
478 kl.updateRuntimeUp()
479
480 node, changed := kl.updateNode(ctx, originalNode)
481
482 if !changed {
483
484 return false
485 }
486
487 readyIdx, nodeReady := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
488 if readyIdx == -1 {
489 klog.ErrorS(nil, "Node does not have NodeReady condition", "node", node)
490 return false
491 }
492
493 if nodeReady.Status == v1.ConditionFalse {
494 return false
495 }
496
497 klog.InfoS("Fast updating node status as it just became ready")
498 if _, err := kl.patchNodeStatus(originalNode, node); err != nil {
499
500
501 klog.ErrorS(err, "Error updating node status, will retry with syncNodeStatus")
502
503
504 kl.syncNodeStatusMux.Unlock()
505 kl.syncNodeStatus()
506
507
508 kl.syncNodeStatusMux.Lock()
509 }
510
511
512 return true
513 }
514
515
516
517
518 func (kl *Kubelet) syncNodeStatus() {
519 kl.syncNodeStatusMux.Lock()
520 defer kl.syncNodeStatusMux.Unlock()
521 ctx := context.Background()
522
523 if kl.kubeClient == nil || kl.heartbeatClient == nil {
524 return
525 }
526 if kl.registerNode {
527
528 kl.registerWithAPIServer()
529 }
530 if err := kl.updateNodeStatus(ctx); err != nil {
531 klog.ErrorS(err, "Unable to update node status")
532 }
533 }
534
535
536
537 func (kl *Kubelet) updateNodeStatus(ctx context.Context) error {
538 klog.V(5).InfoS("Updating node status")
539 for i := 0; i < nodeStatusUpdateRetry; i++ {
540 if err := kl.tryUpdateNodeStatus(ctx, i); err != nil {
541 if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
542 kl.onRepeatedHeartbeatFailure()
543 }
544 klog.ErrorS(err, "Error updating node status, will retry")
545 } else {
546 return nil
547 }
548 }
549 return fmt.Errorf("update node status exceeds retry count")
550 }
551
552
553
554 func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error {
555
556
557
558
559
560
561 opts := metav1.GetOptions{}
562 if tryNumber == 0 {
563 util.FromApiserverCache(&opts)
564 }
565 originalNode, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
566 if err != nil {
567 return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
568 }
569 if originalNode == nil {
570 return fmt.Errorf("nil %q node object", kl.nodeName)
571 }
572
573 node, changed := kl.updateNode(ctx, originalNode)
574 shouldPatchNodeStatus := changed || kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency
575
576 if !shouldPatchNodeStatus {
577 kl.markVolumesFromNode(node)
578 return nil
579 }
580
581 updatedNode, err := kl.patchNodeStatus(originalNode, node)
582 if err == nil {
583 kl.markVolumesFromNode(updatedNode)
584 }
585 return err
586 }
587
588
589
590 func (kl *Kubelet) updateNode(ctx context.Context, originalNode *v1.Node) (*v1.Node, bool) {
591 node := originalNode.DeepCopy()
592
593 podCIDRChanged := false
594 if len(node.Spec.PodCIDRs) != 0 {
595
596
597
598 var err error
599 podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
600 if podCIDRChanged, err = kl.updatePodCIDR(ctx, podCIDRs); err != nil {
601 klog.ErrorS(err, "Error updating pod CIDR")
602 }
603 }
604
605 areRequiredLabelsNotPresent := false
606 osName, osLabelExists := node.Labels[v1.LabelOSStable]
607 if !osLabelExists || osName != goruntime.GOOS {
608 if len(node.Labels) == 0 {
609 node.Labels = make(map[string]string)
610 }
611 node.Labels[v1.LabelOSStable] = goruntime.GOOS
612 areRequiredLabelsNotPresent = true
613 }
614
615 arch, archLabelExists := node.Labels[v1.LabelArchStable]
616 if !archLabelExists || arch != goruntime.GOARCH {
617 if len(node.Labels) == 0 {
618 node.Labels = make(map[string]string)
619 }
620 node.Labels[v1.LabelArchStable] = goruntime.GOARCH
621 areRequiredLabelsNotPresent = true
622 }
623
624 kl.setNodeStatus(ctx, node)
625
626 changed := podCIDRChanged || nodeStatusHasChanged(&originalNode.Status, &node.Status) || areRequiredLabelsNotPresent
627 return node, changed
628 }
629
630
631
632 func (kl *Kubelet) patchNodeStatus(originalNode, node *v1.Node) (*v1.Node, error) {
633
634 updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
635 if err != nil {
636 return nil, err
637 }
638 kl.lastStatusReportTime = kl.clock.Now()
639 kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
640
641 readyIdx, readyCondition := nodeutil.GetNodeCondition(&updatedNode.Status, v1.NodeReady)
642 if readyIdx >= 0 && readyCondition.Status == v1.ConditionTrue {
643 kl.nodeStartupLatencyTracker.RecordNodeReady()
644 }
645
646 return updatedNode, nil
647 }
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672 func (kl *Kubelet) markVolumesFromNode(node *v1.Node) {
673 kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
674 }
675
676
677
678 func (kl *Kubelet) recordNodeStatusEvent(eventType, event string) {
679 klog.V(2).InfoS("Recording event message for node", "node", klog.KRef("", string(kl.nodeName)), "event", event)
680 kl.recorder.Eventf(kl.nodeRef, eventType, event, "Node %s status is now: %s", kl.nodeName, event)
681 }
682
683
684 func (kl *Kubelet) recordEvent(eventType, event, message string) {
685 kl.recorder.Eventf(kl.nodeRef, eventType, event, message)
686 }
687
688
689 func (kl *Kubelet) recordNodeSchedulableEvent(ctx context.Context, node *v1.Node) error {
690 kl.lastNodeUnschedulableLock.Lock()
691 defer kl.lastNodeUnschedulableLock.Unlock()
692 if kl.lastNodeUnschedulable != node.Spec.Unschedulable {
693 if node.Spec.Unschedulable {
694 kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeNotSchedulable)
695 } else {
696 kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeSchedulable)
697 }
698 kl.lastNodeUnschedulable = node.Spec.Unschedulable
699 }
700 return nil
701 }
702
703
704
705
706
707 func (kl *Kubelet) setNodeStatus(ctx context.Context, node *v1.Node) {
708 for i, f := range kl.setNodeStatusFuncs {
709 klog.V(5).InfoS("Setting node status condition code", "position", i, "node", klog.KObj(node))
710 if err := f(ctx, node); err != nil {
711 klog.ErrorS(err, "Failed to set some node status fields", "node", klog.KObj(node))
712 }
713 }
714 }
715
716 func (kl *Kubelet) setLastObservedNodeAddresses(addresses []v1.NodeAddress) {
717 kl.lastObservedNodeAddressesMux.Lock()
718 defer kl.lastObservedNodeAddressesMux.Unlock()
719 kl.lastObservedNodeAddresses = addresses
720 }
721 func (kl *Kubelet) getLastObservedNodeAddresses() []v1.NodeAddress {
722 kl.lastObservedNodeAddressesMux.RLock()
723 defer kl.lastObservedNodeAddressesMux.RUnlock()
724 return kl.lastObservedNodeAddresses
725 }
726
727
728
729 func (kl *Kubelet) defaultNodeStatusFuncs() []func(context.Context, *v1.Node) error {
730
731 var nodeAddressesFunc func() ([]v1.NodeAddress, error)
732 if kl.cloud != nil {
733 nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
734 }
735 var setters []func(ctx context.Context, n *v1.Node) error
736 setters = append(setters,
737 nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
738 nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
739 kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent, kl.supportLocalStorageCapacityIsolation()),
740 nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
741 nodestatus.DaemonEndpoints(kl.daemonEndpoints),
742 nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
743 nodestatus.GoRuntime(),
744 nodestatus.RuntimeHandlers(kl.runtimeState.runtimeHandlers),
745 )
746
747 setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
748
749 setters = append(setters,
750 nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
751 nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
752 nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
753 nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors,
754 kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent, kl.supportLocalStorageCapacityIsolation()),
755 nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
756
757
758
759
760 kl.recordNodeSchedulableEvent,
761 )
762 return setters
763 }
764
765
766 func validateNodeIP(nodeIP net.IP) error {
767
768 if nodeIP.To4() == nil && nodeIP.To16() == nil {
769 return fmt.Errorf("nodeIP must be a valid IP address")
770 }
771 if nodeIP.IsLoopback() {
772 return fmt.Errorf("nodeIP can't be loopback address")
773 }
774 if nodeIP.IsMulticast() {
775 return fmt.Errorf("nodeIP can't be a multicast address")
776 }
777 if nodeIP.IsLinkLocalUnicast() {
778 return fmt.Errorf("nodeIP can't be a link-local unicast address")
779 }
780 if nodeIP.IsUnspecified() {
781 return fmt.Errorf("nodeIP can't be an all zeros address")
782 }
783
784 addrs, err := net.InterfaceAddrs()
785 if err != nil {
786 return err
787 }
788 for _, addr := range addrs {
789 var ip net.IP
790 switch v := addr.(type) {
791 case *net.IPNet:
792 ip = v.IP
793 case *net.IPAddr:
794 ip = v.IP
795 }
796 if ip != nil && ip.Equal(nodeIP) {
797 return nil
798 }
799 }
800 return fmt.Errorf("node IP: %q not found in the host's network interfaces", nodeIP.String())
801 }
802
803
804
805 func nodeStatusHasChanged(originalStatus *v1.NodeStatus, status *v1.NodeStatus) bool {
806 if originalStatus == nil && status == nil {
807 return false
808 }
809 if originalStatus == nil || status == nil {
810 return true
811 }
812
813
814 if nodeConditionsHaveChanged(originalStatus.Conditions, status.Conditions) {
815 return true
816 }
817
818
819 originalStatusCopy := originalStatus.DeepCopy()
820 statusCopy := status.DeepCopy()
821 originalStatusCopy.Conditions = nil
822 statusCopy.Conditions = nil
823 return !apiequality.Semantic.DeepEqual(originalStatusCopy, statusCopy)
824 }
825
826
827
828
829 func nodeConditionsHaveChanged(originalConditions []v1.NodeCondition, conditions []v1.NodeCondition) bool {
830 if len(originalConditions) != len(conditions) {
831 return true
832 }
833
834 originalConditionsCopy := make([]v1.NodeCondition, 0, len(originalConditions))
835 originalConditionsCopy = append(originalConditionsCopy, originalConditions...)
836 conditionsCopy := make([]v1.NodeCondition, 0, len(conditions))
837 conditionsCopy = append(conditionsCopy, conditions...)
838
839 sort.SliceStable(originalConditionsCopy, func(i, j int) bool { return originalConditionsCopy[i].Type < originalConditionsCopy[j].Type })
840 sort.SliceStable(conditionsCopy, func(i, j int) bool { return conditionsCopy[i].Type < conditionsCopy[j].Type })
841
842 replacedheartbeatTime := metav1.Time{}
843 for i := range conditionsCopy {
844 originalConditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
845 conditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
846 if !apiequality.Semantic.DeepEqual(&originalConditionsCopy[i], &conditionsCopy[i]) {
847 return true
848 }
849 }
850 return false
851 }
852
View as plain text