/* Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package kubelet import ( "context" "encoding/json" "fmt" "net" goruntime "runtime" "sort" "strconv" "strings" "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" cadvisorapi "github.com/google/cadvisor/info/v1" "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" core "k8s.io/client-go/testing" "k8s.io/component-base/version" kubeletapis "k8s.io/kubelet/pkg/apis" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/nodestatus" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager" taintutil "k8s.io/kubernetes/pkg/util/taints" "k8s.io/kubernetes/pkg/volume/util" netutils "k8s.io/utils/net" ) const ( maxImageTagsForTest = 20 ) // generateTestingImageLists generate randomly generated image list and corresponding expectedImageList. func generateTestingImageLists(count int, maxImages int) ([]kubecontainer.Image, []v1.ContainerImage) { // imageList is randomly generated image list var imageList []kubecontainer.Image for ; count > 0; count-- { imageItem := kubecontainer.Image{ ID: string(uuid.NewUUID()), RepoTags: generateImageTags(), Size: rand.Int63nRange(minImgSize, maxImgSize+1), } imageList = append(imageList, imageItem) } expectedImageList := makeExpectedImageList(imageList, maxImages) return imageList, expectedImageList } func makeExpectedImageList(imageList []kubecontainer.Image, maxImages int) []v1.ContainerImage { // expectedImageList is generated by imageList according to size and maxImages // 1. sort the imageList by size sort.Sort(sliceutils.ByImageSize(imageList)) // 2. convert sorted imageList to v1.ContainerImage list var expectedImageList []v1.ContainerImage for _, kubeImage := range imageList { apiImage := v1.ContainerImage{ Names: kubeImage.RepoTags[0:nodestatus.MaxNamesPerImageInNodeStatus], SizeBytes: kubeImage.Size, } expectedImageList = append(expectedImageList, apiImage) } // 3. only returns the top maxImages images in expectedImageList if maxImages == -1 { // -1 means no limit return expectedImageList } return expectedImageList[0:maxImages] } func generateImageTags() []string { var tagList []string // Generate > MaxNamesPerImageInNodeStatus tags so that the test can verify // that kubelet report up to MaxNamesPerImageInNodeStatus tags. count := rand.IntnRange(nodestatus.MaxNamesPerImageInNodeStatus+1, maxImageTagsForTest+1) for ; count > 0; count-- { tagList = append(tagList, "registry.k8s.io:v"+strconv.Itoa(count)) } return tagList } func applyNodeStatusPatch(originalNode *v1.Node, patch []byte) (*v1.Node, error) { original, err := json.Marshal(originalNode) if err != nil { return nil, fmt.Errorf("failed to marshal original node %#v: %v", originalNode, err) } updated, err := strategicpatch.StrategicMergePatch(original, patch, v1.Node{}) if err != nil { return nil, fmt.Errorf("failed to apply strategic merge patch %q on node %#v: %v", patch, originalNode, err) } updatedNode := &v1.Node{} if err := json.Unmarshal(updated, updatedNode); err != nil { return nil, fmt.Errorf("failed to unmarshal updated node %q: %v", updated, err) } return updatedNode, nil } func notImplemented(action core.Action) (bool, runtime.Object, error) { return true, nil, fmt.Errorf("no reaction implemented for %s", action) } func addNotImplatedReaction(kubeClient *fake.Clientset) { if kubeClient == nil { return } kubeClient.AddReactor("*", "*", notImplemented) } type localCM struct { cm.ContainerManager allocatableReservation v1.ResourceList capacity v1.ResourceList } func (lcm *localCM) GetNodeAllocatableReservation() v1.ResourceList { return lcm.allocatableReservation } func (lcm *localCM) GetCapacity(localStorageCapacityIsolation bool) v1.ResourceList { if !localStorageCapacityIsolation { delete(lcm.capacity, v1.ResourceEphemeralStorage) } return lcm.capacity } func TestUpdateNewNodeStatus(t *testing.T) { cases := []struct { desc string nodeStatusMaxImages int32 }{ { desc: "5 image limit", nodeStatusMaxImages: 5, }, { desc: "no image limit", nodeStatusMaxImages: -1, }, } for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { ctx := context.Background() // generate one more in inputImageList than we configure the Kubelet to report, // or 5 images if unlimited numTestImages := int(tc.nodeStatusMaxImages) + 1 if tc.nodeStatusMaxImages == -1 { numTestImages = 5 } inputImageList, expectedImageList := generateTestingImageLists(numTestImages, int(tc.nodeStatusMaxImages)) testKubelet := newTestKubeletWithImageList( t, inputImageList, false /* controllerAttachDetachEnabled */, true /*initFakeVolumePlugin*/, true /* localStorageCapacityIsolation */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.nodeStatusMaxImages = tc.nodeStatusMaxImages kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100e6, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(2000, resource.BinarySI), }, capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, } // Since this test retroactively overrides the stub container manager, // we have to regenerate default status setters. kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 10e9, // 10G } kubelet.setCachedMachineInfo(machineInfo) expectedNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Conditions: []v1.NodeCondition{ { Type: v1.NodeMemoryPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientMemory", Message: "kubelet has sufficient memory available", LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeDiskPressure, Status: v1.ConditionFalse, Reason: "KubeletHasNoDiskPressure", Message: "kubelet has no disk pressure", LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodePIDPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientPID", Message: "kubelet has sufficient PID available", LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeReady, Status: v1.ConditionTrue, Reason: "KubeletReady", Message: "kubelet is posting ready status", LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, }, NodeInfo: v1.NodeSystemInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", KernelVersion: cadvisortest.FakeKernelVersion, OSImage: cadvisortest.FakeContainerOSVersion, OperatingSystem: goruntime.GOOS, Architecture: goruntime.GOARCH, ContainerRuntimeVersion: "test://1.5.0", KubeletVersion: version.Get().String(), KubeProxyVersion: version.Get().String(), }, Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(9900e6, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(3000, resource.BinarySI), }, Addresses: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, Images: expectedImageList, }, } kubelet.updateRuntimeUp() assert.NoError(t, kubelet.updateNodeStatus(ctx)) actions := kubeClient.Actions() require.Len(t, actions, 2) require.True(t, actions[1].Matches("patch", "nodes")) require.Equal(t, actions[1].GetSubresource(), "status") updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch()) assert.NoError(t, err) for i, cond := range updatedNode.Status.Conditions { assert.False(t, cond.LastHeartbeatTime.IsZero(), "LastHeartbeatTime for %v condition is zero", cond.Type) assert.False(t, cond.LastTransitionTime.IsZero(), "LastTransitionTime for %v condition is zero", cond.Type) updatedNode.Status.Conditions[i].LastHeartbeatTime = metav1.Time{} updatedNode.Status.Conditions[i].LastTransitionTime = metav1.Time{} } // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type, "NotReady should be last") assert.Len(t, updatedNode.Status.Images, len(expectedImageList)) assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", cmp.Diff(expectedNode, updatedNode)) }) } } func TestUpdateExistingNodeStatus(t *testing.T) { ctx := context.Background() testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.nodeStatusMaxImages = 5 // don't truncate the image list that gets constructed by hand for this test kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100e6, resource.BinarySI), }, capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(20e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, } // Since this test retroactively overrides the stub container manager, // we have to regenerate default status setters. kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Conditions: []v1.NodeCondition{ { Type: v1.NodeMemoryPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientMemory", Message: fmt.Sprintf("kubelet has sufficient memory available"), LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, { Type: v1.NodeDiskPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientDisk", Message: fmt.Sprintf("kubelet has sufficient disk space available"), LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, { Type: v1.NodePIDPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientPID", Message: fmt.Sprintf("kubelet has sufficient PID available"), LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, { Type: v1.NodeReady, Status: v1.ConditionTrue, Reason: "KubeletReady", Message: fmt.Sprintf("kubelet is posting ready status"), LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, }, Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(20e9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(19900e6, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, }, } kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 20e9, } kubelet.setCachedMachineInfo(machineInfo) expectedNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Conditions: []v1.NodeCondition{ { Type: v1.NodeMemoryPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientMemory", Message: fmt.Sprintf("kubelet has sufficient memory available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeDiskPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientDisk", Message: fmt.Sprintf("kubelet has sufficient disk space available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodePIDPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientPID", Message: fmt.Sprintf("kubelet has sufficient PID available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeReady, Status: v1.ConditionTrue, Reason: "KubeletReady", Message: fmt.Sprintf("kubelet is posting ready status"), LastHeartbeatTime: metav1.Time{}, // placeholder LastTransitionTime: metav1.Time{}, // placeholder }, }, NodeInfo: v1.NodeSystemInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", KernelVersion: cadvisortest.FakeKernelVersion, OSImage: cadvisortest.FakeContainerOSVersion, OperatingSystem: goruntime.GOOS, Architecture: goruntime.GOARCH, ContainerRuntimeVersion: "test://1.5.0", KubeletVersion: version.Get().String(), KubeProxyVersion: version.Get().String(), }, Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(20e9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(19900e6, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Addresses: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, // images will be sorted from max to min in node status. Images: []v1.ContainerImage{ { Names: []string{"registry.k8s.io:v1", "registry.k8s.io:v2"}, SizeBytes: 123, }, { Names: []string{"registry.k8s.io:v3", "registry.k8s.io:v4"}, SizeBytes: 456, }, }, }, } kubelet.updateRuntimeUp() assert.NoError(t, kubelet.updateNodeStatus(ctx)) actions := kubeClient.Actions() assert.Len(t, actions, 2) assert.IsType(t, core.PatchActionImpl{}, actions[1]) patchAction := actions[1].(core.PatchActionImpl) updatedNode, err := applyNodeStatusPatch(&existingNode, patchAction.GetPatch()) require.NoError(t, err) for i, cond := range updatedNode.Status.Conditions { old := metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time // Expect LastHearbeat to be updated to Now, while LastTransitionTime to be the same. assert.NotEqual(t, old, cond.LastHeartbeatTime.Rfc3339Copy().UTC(), "LastHeartbeatTime for condition %v", cond.Type) assert.EqualValues(t, old, cond.LastTransitionTime.Rfc3339Copy().UTC(), "LastTransitionTime for condition %v", cond.Type) updatedNode.Status.Conditions[i].LastHeartbeatTime = metav1.Time{} updatedNode.Status.Conditions[i].LastTransitionTime = metav1.Time{} } // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type, "NodeReady should be the last condition") assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", cmp.Diff(expectedNode, updatedNode)) } func TestUpdateExistingNodeStatusTimeout(t *testing.T) { ctx := context.Background() if testing.Short() { t.Skip("skipping test in short mode.") } attempts := int64(0) failureCallbacks := int64(0) // set up a listener that hangs connections ln, err := net.Listen("tcp", "127.0.0.1:0") assert.NoError(t, err) defer ln.Close() go func() { // accept connections and just let them hang for { _, err := ln.Accept() if err != nil { t.Log(err) return } t.Log("accepted connection") atomic.AddInt64(&attempts, 1) } }() config := &rest.Config{ Host: "http://" + ln.Addr().String(), QPS: -1, Timeout: time.Second, } assert.NoError(t, err) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.heartbeatClient, err = clientset.NewForConfig(config) require.NoError(t, err) kubelet.onRepeatedHeartbeatFailure = func() { atomic.AddInt64(&failureCallbacks, 1) } kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100e6, resource.BinarySI), }, capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(20e9, resource.BinarySI), }, } // should return an error, but not hang assert.Error(t, kubelet.updateNodeStatus(ctx)) // should have attempted multiple times if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts < nodeStatusUpdateRetry { t.Errorf("Expected at least %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts) } // should have gotten multiple failure callbacks if actualFailureCallbacks := atomic.LoadInt64(&failureCallbacks); actualFailureCallbacks < (nodeStatusUpdateRetry - 1) { t.Errorf("Expected %d failure callbacks, got %d", (nodeStatusUpdateRetry - 1), actualFailureCallbacks) } } func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { ctx := context.Background() testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.nodeStatusMaxImages = 5 // don't truncate the image list that gets constructed by hand for this test kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100e6, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(10e9, resource.BinarySI), }, capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(20e9, resource.BinarySI), }, } // Since this test retroactively overrides the stub container manager, // we have to regenerate default status setters. kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() clock := testKubelet.fakeClock kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 10e9, } kubelet.setCachedMachineInfo(machineInfo) expectedNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Conditions: []v1.NodeCondition{ { Type: v1.NodeMemoryPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientMemory", Message: fmt.Sprintf("kubelet has sufficient memory available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeDiskPressure, Status: v1.ConditionFalse, Reason: "KubeletHasNoDiskPressure", Message: fmt.Sprintf("kubelet has no disk pressure"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodePIDPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientPID", Message: fmt.Sprintf("kubelet has sufficient PID available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, {}, //placeholder }, NodeInfo: v1.NodeSystemInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", KernelVersion: cadvisortest.FakeKernelVersion, OSImage: cadvisortest.FakeContainerOSVersion, OperatingSystem: goruntime.GOOS, Architecture: goruntime.GOARCH, ContainerRuntimeVersion: "test://1.5.0", KubeletVersion: version.Get().String(), KubeProxyVersion: version.Get().String(), }, Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(20e9, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(9900e6, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(10e9, resource.BinarySI), }, Addresses: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, Images: []v1.ContainerImage{ { Names: []string{"registry.k8s.io:v1", "registry.k8s.io:v2"}, SizeBytes: 123, }, { Names: []string{"registry.k8s.io:v3", "registry.k8s.io:v4"}, SizeBytes: 456, }, }, }, } checkNodeStatus := func(status v1.ConditionStatus, reason string) { kubeClient.ClearActions() assert.NoError(t, kubelet.updateNodeStatus(ctx)) actions := kubeClient.Actions() require.Len(t, actions, 2) require.True(t, actions[1].Matches("patch", "nodes")) require.Equal(t, actions[1].GetSubresource(), "status") updatedNode, err := kubeClient.CoreV1().Nodes().Get(ctx, testKubeletHostname, metav1.GetOptions{}) require.NoError(t, err, "can't apply node status patch") for i, cond := range updatedNode.Status.Conditions { assert.False(t, cond.LastHeartbeatTime.IsZero(), "LastHeartbeatTime for %v condition is zero", cond.Type) assert.False(t, cond.LastTransitionTime.IsZero(), "LastTransitionTime for %v condition is zero", cond.Type) updatedNode.Status.Conditions[i].LastHeartbeatTime = metav1.Time{} updatedNode.Status.Conditions[i].LastTransitionTime = metav1.Time{} } // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 lastIndex := len(updatedNode.Status.Conditions) - 1 assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[lastIndex].Type, "NodeReady should be the last condition") assert.NotEmpty(t, updatedNode.Status.Conditions[lastIndex].Message) updatedNode.Status.Conditions[lastIndex].Message = "" expectedNode.Status.Conditions[lastIndex] = v1.NodeCondition{ Type: v1.NodeReady, Status: status, Reason: reason, LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, } assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", cmp.Diff(expectedNode, updatedNode)) } // TODO(random-liu): Refactor the unit test to be table driven test. // Should report kubelet not ready if the runtime check is out of date clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") // Should report kubelet ready if the runtime check is updated clock.SetTime(time.Now()) kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionTrue, "KubeletReady") // Should report kubelet not ready if the runtime check is out of date clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") // Should report kubelet not ready if the runtime check failed fakeRuntime := testKubelet.fakeRuntime // Inject error into fake runtime status check, node should be NotReady fakeRuntime.StatusErr = fmt.Errorf("injected runtime status error") clock.SetTime(time.Now()) kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") fakeRuntime.StatusErr = nil // Should report node not ready if runtime status is nil. fakeRuntime.RuntimeStatus = nil kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") // Should report node not ready if runtime status is empty. fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{} kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") // Should report node not ready if RuntimeReady is false. fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{ Conditions: []kubecontainer.RuntimeCondition{ {Type: kubecontainer.RuntimeReady, Status: false}, {Type: kubecontainer.NetworkReady, Status: true}, }, } kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") // Should report node ready if RuntimeReady is true. fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{ Conditions: []kubecontainer.RuntimeCondition{ {Type: kubecontainer.RuntimeReady, Status: true}, {Type: kubecontainer.NetworkReady, Status: true}, }, } kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionTrue, "KubeletReady") // Should report node not ready if NetworkReady is false. fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{ Conditions: []kubecontainer.RuntimeCondition{ {Type: kubecontainer.RuntimeReady, Status: true}, {Type: kubecontainer.NetworkReady, Status: false}, }, } kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") } func TestUpdateNodeStatusError(t *testing.T) { ctx := context.Background() testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.kubeClient = nil // ensure only the heartbeat client is used // No matching node for the kubelet testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{}}).ReactionChain assert.Error(t, kubelet.updateNodeStatus(ctx)) assert.Len(t, testKubelet.fakeKubeClient.Actions(), nodeStatusUpdateRetry) } func TestUpdateNodeStatusWithLease(t *testing.T) { ctx := context.Background() testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() clock := testKubelet.fakeClock kubelet := testKubelet.kubelet kubelet.nodeStatusMaxImages = 5 // don't truncate the image list that gets constructed by hand for this test kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100e6, resource.BinarySI), }, capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(20e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, } // Since this test retroactively overrides the stub container manager, // we have to regenerate default status setters. kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() kubelet.nodeStatusReportFrequency = time.Minute kubeClient := testKubelet.fakeKubeClient existingNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*existingNode}}).ReactionChain machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 20e9, } kubelet.setCachedMachineInfo(machineInfo) now := metav1.NewTime(clock.Now()).Rfc3339Copy() expectedNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Conditions: []v1.NodeCondition{ { Type: v1.NodeMemoryPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientMemory", Message: fmt.Sprintf("kubelet has sufficient memory available"), LastHeartbeatTime: now, LastTransitionTime: now, }, { Type: v1.NodeDiskPressure, Status: v1.ConditionFalse, Reason: "KubeletHasNoDiskPressure", Message: fmt.Sprintf("kubelet has no disk pressure"), LastHeartbeatTime: now, LastTransitionTime: now, }, { Type: v1.NodePIDPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientPID", Message: fmt.Sprintf("kubelet has sufficient PID available"), LastHeartbeatTime: now, LastTransitionTime: now, }, { Type: v1.NodeReady, Status: v1.ConditionTrue, Reason: "KubeletReady", Message: fmt.Sprintf("kubelet is posting ready status"), LastHeartbeatTime: now, LastTransitionTime: now, }, }, NodeInfo: v1.NodeSystemInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", KernelVersion: cadvisortest.FakeKernelVersion, OSImage: cadvisortest.FakeContainerOSVersion, OperatingSystem: goruntime.GOOS, Architecture: goruntime.GOARCH, ContainerRuntimeVersion: "test://1.5.0", KubeletVersion: version.Get().String(), KubeProxyVersion: version.Get().String(), }, Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(20e9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(19900e6, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Addresses: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, // images will be sorted from max to min in node status. Images: []v1.ContainerImage{ { Names: []string{"registry.k8s.io:v1", "registry.k8s.io:v2"}, SizeBytes: 123, }, { Names: []string{"registry.k8s.io:v3", "registry.k8s.io:v4"}, SizeBytes: 456, }, }, }, } // Update node status when node status is created. // Report node status. kubelet.updateRuntimeUp() assert.NoError(t, kubelet.updateNodeStatus(ctx)) actions := kubeClient.Actions() assert.Len(t, actions, 2) assert.IsType(t, core.GetActionImpl{}, actions[0]) assert.IsType(t, core.PatchActionImpl{}, actions[1]) patchAction := actions[1].(core.PatchActionImpl) updatedNode, err := applyNodeStatusPatch(existingNode, patchAction.GetPatch()) require.NoError(t, err) for _, cond := range updatedNode.Status.Conditions { cond.LastHeartbeatTime = cond.LastHeartbeatTime.Rfc3339Copy() cond.LastTransitionTime = cond.LastTransitionTime.Rfc3339Copy() } assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", cmp.Diff(expectedNode, updatedNode)) // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type, "NodeReady should be the last condition") // Update node status again when nothing is changed (except heartbeat time). // Report node status if it has exceeded the duration of nodeStatusReportFrequency. clock.Step(time.Minute) assert.NoError(t, kubelet.updateNodeStatus(ctx)) // 2 more action (There were 2 actions before). actions = kubeClient.Actions() assert.Len(t, actions, 4) assert.IsType(t, core.GetActionImpl{}, actions[2]) assert.IsType(t, core.PatchActionImpl{}, actions[3]) patchAction = actions[3].(core.PatchActionImpl) updatedNode, err = applyNodeStatusPatch(updatedNode, patchAction.GetPatch()) require.NoError(t, err) for _, cond := range updatedNode.Status.Conditions { cond.LastHeartbeatTime = cond.LastHeartbeatTime.Rfc3339Copy() cond.LastTransitionTime = cond.LastTransitionTime.Rfc3339Copy() } // Expect LastHeartbeat updated, other things unchanged. for i, cond := range expectedNode.Status.Conditions { expectedNode.Status.Conditions[i].LastHeartbeatTime = metav1.NewTime(cond.LastHeartbeatTime.Time.Add(time.Minute)).Rfc3339Copy() } assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", cmp.Diff(expectedNode, updatedNode)) // Update node status again when nothing is changed (except heartbeat time). // Do not report node status if it is within the duration of nodeStatusReportFrequency. clock.Step(10 * time.Second) assert.NoError(t, kubelet.updateNodeStatus(ctx)) // Only 1 more action (There were 4 actions before). actions = kubeClient.Actions() assert.Len(t, actions, 5) assert.IsType(t, core.GetActionImpl{}, actions[4]) // Update node status again when something is changed. // Report node status even if it is still within the duration of nodeStatusReportFrequency. clock.Step(10 * time.Second) var newMemoryCapacity int64 = 40e9 oldMachineInfo, err := kubelet.GetCachedMachineInfo() if err != nil { t.Fatal(err) } newMachineInfo := oldMachineInfo.Clone() newMachineInfo.MemoryCapacity = uint64(newMemoryCapacity) kubelet.setCachedMachineInfo(newMachineInfo) assert.NoError(t, kubelet.updateNodeStatus(ctx)) // 2 more action (There were 5 actions before). actions = kubeClient.Actions() assert.Len(t, actions, 7) assert.IsType(t, core.GetActionImpl{}, actions[5]) assert.IsType(t, core.PatchActionImpl{}, actions[6]) patchAction = actions[6].(core.PatchActionImpl) updatedNode, err = applyNodeStatusPatch(updatedNode, patchAction.GetPatch()) require.NoError(t, err) memCapacity := updatedNode.Status.Capacity[v1.ResourceMemory] updatedMemoryCapacity, _ := (&memCapacity).AsInt64() assert.Equal(t, newMemoryCapacity, updatedMemoryCapacity, "Memory capacity") now = metav1.NewTime(clock.Now()).Rfc3339Copy() for _, cond := range updatedNode.Status.Conditions { // Expect LastHearbeat updated, while LastTransitionTime unchanged. assert.Equal(t, now, cond.LastHeartbeatTime.Rfc3339Copy(), "LastHeartbeatTime for condition %v", cond.Type) assert.Equal(t, now, metav1.NewTime(cond.LastTransitionTime.Time.Add(time.Minute+20*time.Second)).Rfc3339Copy(), "LastTransitionTime for condition %v", cond.Type) } // Update node status when changing pod CIDR. // Report node status if it is still within the duration of nodeStatusReportFrequency. clock.Step(10 * time.Second) assert.Equal(t, "", kubelet.runtimeState.podCIDR(), "Pod CIDR should be empty") podCIDRs := []string{"10.0.0.0/24", "2000::/10"} updatedNode.Spec.PodCIDR = podCIDRs[0] updatedNode.Spec.PodCIDRs = podCIDRs kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*updatedNode}}).ReactionChain assert.NoError(t, kubelet.updateNodeStatus(ctx)) assert.Equal(t, strings.Join(podCIDRs, ","), kubelet.runtimeState.podCIDR(), "Pod CIDR should be updated now") // 2 more action (There were 7 actions before). actions = kubeClient.Actions() assert.Len(t, actions, 9) assert.IsType(t, core.GetActionImpl{}, actions[7]) assert.IsType(t, core.PatchActionImpl{}, actions[8]) // Update node status when keeping the pod CIDR. // Do not report node status if it is within the duration of nodeStatusReportFrequency. clock.Step(10 * time.Second) assert.Equal(t, strings.Join(podCIDRs, ","), kubelet.runtimeState.podCIDR(), "Pod CIDR should already be updated") assert.NoError(t, kubelet.updateNodeStatus(ctx)) // Only 1 more action (There were 9 actions before). actions = kubeClient.Actions() assert.Len(t, actions, 10) assert.IsType(t, core.GetActionImpl{}, actions[9]) } func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) { cases := []struct { desc string existingVolumes []v1.UniqueVolumeName // volumes to initially populate volumeManager existingNode *v1.Node // existing node object expectedNode *v1.Node // new node object after patch expectedReportedInUse []v1.UniqueVolumeName // expected volumes reported in use in volumeManager }{ { desc: "no volumes and no update", existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}}, }, { desc: "volumes inuse on node and volumeManager", existingVolumes: []v1.UniqueVolumeName{"vol1"}, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}, Status: v1.NodeStatus{ VolumesInUse: []v1.UniqueVolumeName{"vol1"}, }, }, expectedReportedInUse: []v1.UniqueVolumeName{"vol1"}, }, { desc: "volumes inuse on node but not in volumeManager", existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, Status: v1.NodeStatus{ VolumesInUse: []v1.UniqueVolumeName{"vol1"}, }, }, expectedNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}}, }, { desc: "volumes inuse in volumeManager but not on node", existingVolumes: []v1.UniqueVolumeName{"vol1"}, existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}, expectedNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}, Status: v1.NodeStatus{ VolumesInUse: []v1.UniqueVolumeName{"vol1"}, }, }, expectedReportedInUse: []v1.UniqueVolumeName{"vol1"}, }, } for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { ctx := context.Background() // Setup testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ContainerManager: cm.NewStubContainerManager()} kubelet.lastStatusReportTime = kubelet.clock.Now() kubelet.nodeStatusReportFrequency = time.Hour kubelet.setCachedMachineInfo(&cadvisorapi.MachineInfo{}) // override test volumeManager fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes) kubelet.volumeManager = fakeVolumeManager // Only test VolumesInUse setter kubelet.setNodeStatusFuncs = []func(context.Context, *v1.Node) error{ nodestatus.VolumesInUse(kubelet.volumeManager.ReconcilerStatesHasBeenSynced, kubelet.volumeManager.GetVolumesInUse), } kubeClient := testKubelet.fakeKubeClient kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*tc.existingNode}}).ReactionChain // Execute assert.NoError(t, kubelet.updateNodeStatus(ctx)) // Validate actions := kubeClient.Actions() if tc.expectedNode != nil { assert.Len(t, actions, 2) assert.IsType(t, core.GetActionImpl{}, actions[0]) assert.IsType(t, core.PatchActionImpl{}, actions[1]) patchAction := actions[1].(core.PatchActionImpl) updatedNode, err := applyNodeStatusPatch(tc.existingNode, patchAction.GetPatch()) require.NoError(t, err) assert.True(t, apiequality.Semantic.DeepEqual(tc.expectedNode, updatedNode), "%s", cmp.Diff(tc.expectedNode, updatedNode)) } else { assert.Len(t, actions, 1) assert.IsType(t, core.GetActionImpl{}, actions[0]) } reportedInUse := fakeVolumeManager.GetVolumesReportedInUse() assert.True(t, apiequality.Semantic.DeepEqual(tc.expectedReportedInUse, reportedInUse), "%s", cmp.Diff(tc.expectedReportedInUse, reportedInUse)) }) } } func TestFastStatusUpdateOnce(t *testing.T) { tests := []struct { name string beforeMarkReady int beforeNextReady int beforeTimeout int wantCalls int patchFailures int wantPatches int }{ { name: "timeout after third loop", beforeMarkReady: 9, beforeNextReady: 9, beforeTimeout: 2, wantCalls: 3, }, { name: "already ready on third loop", beforeMarkReady: 9, beforeNextReady: 1, beforeTimeout: 9, wantCalls: 2, }, { name: "turns ready on third loop", beforeMarkReady: 2, beforeNextReady: 9, beforeTimeout: 9, wantCalls: 3, wantPatches: 1, }, { name: "turns ready on second loop then first patch fails", beforeMarkReady: 1, beforeNextReady: 9, beforeTimeout: 9, wantCalls: 3, patchFailures: 1, wantPatches: 2, }, { name: "turns ready on second loop then all patches fail", beforeMarkReady: 1, beforeNextReady: 9, beforeTimeout: 9, wantCalls: nodeStatusUpdateRetry + 2, patchFailures: nodeStatusUpdateRetry + 2, wantPatches: nodeStatusUpdateRetry + 1, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet // Ensure we capture actions on the heartbeat client only. // We don't set it to nil or GetNode() doesn't read from nodeLister. kubelet.kubeClient = &fake.Clientset{} kubeClient := testKubelet.fakeKubeClient node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: string(kubelet.nodeName), }, Status: v1.NodeStatus{ Conditions: []v1.NodeCondition{ { Type: v1.NodeReady, Status: v1.ConditionFalse, Reason: "NotReady", Message: "Node not ready", }, }, }, } nodeLister := testNodeLister{[]*v1.Node{node.DeepCopy()}} kubelet.nodeLister = nodeLister callCount := 0 // The original node status functions turn the node ready. nodeStatusFuncs := kubelet.setNodeStatusFuncs kubelet.setNodeStatusFuncs = []func(context.Context, *v1.Node) error{func(ctx context.Context, node *v1.Node) error { assert.False(t, kubelet.containerRuntimeReadyExpected) callCount++ var lastErr error if callCount > tc.beforeMarkReady { for _, f := range nodeStatusFuncs { if err := f(ctx, node); err != nil { lastErr = err } } } if callCount > tc.beforeNextReady { nodeLister.nodes[0].Status.Conditions[0].Status = v1.ConditionTrue } if callCount > tc.beforeTimeout { testKubelet.fakeClock.Step(nodeReadyGracePeriod) } return lastErr }} patchCount := 0 kubeClient.AddReactor("patch", "nodes", func(action core.Action) (bool, runtime.Object, error) { assert.False(t, kubelet.containerRuntimeReadyExpected) patchCount++ if patchCount > tc.patchFailures { return false, nil, nil } return true, nil, fmt.Errorf("try again") }) kubelet.fastStatusUpdateOnce() assert.True(t, kubelet.containerRuntimeReadyExpected) assert.Equal(t, tc.wantCalls, callCount) assert.Equal(t, tc.wantPatches, patchCount) actions := kubeClient.Actions() if tc.wantPatches == 0 { require.Len(t, actions, 0) return } // patch, get, patch, get, patch, ... up to initial patch + nodeStatusUpdateRetry patches require.Len(t, actions, 2*tc.wantPatches-1) for i, action := range actions { if i%2 == 1 { require.IsType(t, core.GetActionImpl{}, action) continue } require.IsType(t, core.PatchActionImpl{}, action) patchAction := action.(core.PatchActionImpl) updatedNode, err := applyNodeStatusPatch(node, patchAction.GetPatch()) require.NoError(t, err) seenNodeReady := false for _, c := range updatedNode.Status.Conditions { if c.Type == v1.NodeReady { assert.Equal(t, v1.ConditionTrue, c.Status) seenNodeReady = true } } assert.True(t, seenNodeReady) } }) } } func TestRegisterWithApiServer(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) { // Return an error on create. return true, &v1.Node{}, &apierrors.StatusError{ ErrStatus: metav1.Status{Reason: metav1.StatusReasonAlreadyExists}, } }) kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { // Return an existing (matching) node on get. return true, &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: testKubeletHostname, Labels: map[string]string{ v1.LabelHostname: testKubeletHostname, v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH, kubeletapis.LabelOS: goruntime.GOOS, kubeletapis.LabelArch: goruntime.GOARCH, }, }, }, nil }) kubeClient.AddReactor("patch", "nodes", func(action core.Action) (bool, runtime.Object, error) { if action.GetSubresource() == "status" { return true, nil, nil } return notImplemented(action) }) addNotImplatedReaction(kubeClient) machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 1024, } kubelet.setCachedMachineInfo(machineInfo) done := make(chan struct{}) go func() { kubelet.registerWithAPIServer() done <- struct{}{} }() select { case <-time.After(wait.ForeverTestTimeout): assert.Fail(t, "timed out waiting for registration") case <-done: return } } func TestTryRegisterWithApiServer(t *testing.T) { alreadyExists := &apierrors.StatusError{ ErrStatus: metav1.Status{Reason: metav1.StatusReasonAlreadyExists}, } conflict := &apierrors.StatusError{ ErrStatus: metav1.Status{Reason: metav1.StatusReasonConflict}, } newNode := func(cmad bool) *v1.Node { node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: testKubeletHostname, v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH, kubeletapis.LabelOS: goruntime.GOOS, kubeletapis.LabelArch: goruntime.GOARCH, }, }, } if cmad { node.Annotations = make(map[string]string) node.Annotations[util.ControllerManagedAttachAnnotation] = "true" } return node } cases := []struct { name string newNode *v1.Node existingNode *v1.Node createError error getError error patchError error deleteError error expectedResult bool expectedActions int testSavedNode bool savedNodeIndex int savedNodeCMAD bool }{ { name: "success case - new node", newNode: &v1.Node{}, expectedResult: true, expectedActions: 1, }, { name: "success case - existing node - no change in CMAD", newNode: newNode(true), createError: alreadyExists, existingNode: newNode(true), expectedResult: true, expectedActions: 2, }, { name: "success case - existing node - CMAD disabled", newNode: newNode(false), createError: alreadyExists, existingNode: newNode(true), expectedResult: true, expectedActions: 3, testSavedNode: true, savedNodeIndex: 2, savedNodeCMAD: false, }, { name: "success case - existing node - CMAD enabled", newNode: newNode(true), createError: alreadyExists, existingNode: newNode(false), expectedResult: true, expectedActions: 3, testSavedNode: true, savedNodeIndex: 2, savedNodeCMAD: true, }, { name: "create failed", newNode: newNode(false), createError: conflict, expectedResult: false, expectedActions: 1, }, { name: "get existing node failed", newNode: newNode(false), createError: alreadyExists, getError: conflict, expectedResult: false, expectedActions: 2, }, { name: "update existing node failed", newNode: newNode(false), createError: alreadyExists, existingNode: newNode(true), patchError: conflict, expectedResult: false, expectedActions: 3, }, } for _, tc := range cases { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled is a don't-care for this test */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) { return true, nil, tc.createError }) kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { // Return an existing (matching) node on get. return true, tc.existingNode, tc.getError }) kubeClient.AddReactor("patch", "nodes", func(action core.Action) (bool, runtime.Object, error) { if action.GetSubresource() == "status" { return true, nil, tc.patchError } return notImplemented(action) }) kubeClient.AddReactor("delete", "nodes", func(action core.Action) (bool, runtime.Object, error) { return true, nil, tc.deleteError }) addNotImplatedReaction(kubeClient) result := kubelet.tryRegisterWithAPIServer(tc.newNode) require.Equal(t, tc.expectedResult, result, "test [%s]", tc.name) actions := kubeClient.Actions() assert.Len(t, actions, tc.expectedActions, "test [%s]", tc.name) if tc.testSavedNode { var savedNode *v1.Node t.Logf("actions: %v: %+v", len(actions), actions) action := actions[tc.savedNodeIndex] if action.GetVerb() == "create" { createAction := action.(core.CreateAction) obj := createAction.GetObject() require.IsType(t, &v1.Node{}, obj) savedNode = obj.(*v1.Node) } else if action.GetVerb() == "patch" { patchAction := action.(core.PatchActionImpl) var err error savedNode, err = applyNodeStatusPatch(tc.existingNode, patchAction.GetPatch()) require.NoError(t, err) } actualCMAD, _ := strconv.ParseBool(savedNode.Annotations[util.ControllerManagedAttachAnnotation]) assert.Equal(t, tc.savedNodeCMAD, actualCMAD, "test [%s]", tc.name) } } } func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) { ctx := context.Background() const nodeStatusMaxImages = 5 // generate one more in inputImageList than we configure the Kubelet to report inputImageList, _ := generateTestingImageLists(nodeStatusMaxImages+1, nodeStatusMaxImages) testKubelet := newTestKubeletWithImageList( t, inputImageList, false /* controllerAttachDetachEnabled */, true /* initFakeVolumePlugin */, true) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.nodeStatusMaxImages = nodeStatusMaxImages kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(40000, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(1000, resource.BinarySI), }, capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(3000, resource.BinarySI), }, } // Since this test retroactively overrides the stub container manager, // we have to regenerate default status setters. kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 10e9, // 10G } kubelet.setCachedMachineInfo(machineInfo) expectedNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(3000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(2000, resource.BinarySI), }, }, } kubelet.updateRuntimeUp() assert.NoError(t, kubelet.updateNodeStatus(ctx)) actions := kubeClient.Actions() require.Len(t, actions, 2) require.True(t, actions[1].Matches("patch", "nodes")) require.Equal(t, actions[1].GetSubresource(), "status") updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch()) assert.NoError(t, err) assert.True(t, apiequality.Semantic.DeepEqual(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable), "%s", cmp.Diff(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable)) } func TestUpdateDefaultLabels(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used cases := []struct { name string initialNode *v1.Node existingNode *v1.Node needsUpdate bool finalLabels map[string]string }{ { name: "make sure default labels exist", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{}, }, }, needsUpdate: true, finalLabels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, { name: "make sure default labels are up to date", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: "old-hostname", v1.LabelTopologyZone: "old-zone-failure-domain", v1.LabelTopologyRegion: "old-zone-region", v1.LabelFailureDomainBetaZone: "old-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "old-zone-region", v1.LabelInstanceTypeStable: "old-instance-type", v1.LabelInstanceType: "old-instance-type", v1.LabelOSStable: "old-os", v1.LabelArchStable: "old-arch", }, }, }, needsUpdate: true, finalLabels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, { name: "make sure existing labels do not get deleted", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", "please-persist": "foo", }, }, }, needsUpdate: false, finalLabels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", "please-persist": "foo", }, }, { name: "make sure existing labels do not get deleted when initial node has no opinion", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{}, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", "please-persist": "foo", }, }, }, needsUpdate: false, finalLabels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", "please-persist": "foo", }, }, { name: "no update needed", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, }, needsUpdate: false, finalLabels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, { name: "not panic when existing node has nil labels", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{}, }, needsUpdate: true, finalLabels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, { name: "backfill required for new stable labels for os/arch/zones/regions/instance-type", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceType: "new-instance-type", }, }, }, needsUpdate: true, finalLabels: map[string]string{ v1.LabelHostname: "new-hostname", v1.LabelTopologyZone: "new-zone-failure-domain", v1.LabelTopologyRegion: "new-zone-region", v1.LabelFailureDomainBetaZone: "new-zone-failure-domain", v1.LabelFailureDomainBetaRegion: "new-zone-region", v1.LabelInstanceTypeStable: "new-instance-type", v1.LabelInstanceType: "new-instance-type", v1.LabelOSStable: "new-os", v1.LabelArchStable: "new-arch", }, }, } for _, tc := range cases { defer testKubelet.Cleanup() kubelet := testKubelet.kubelet needsUpdate := kubelet.updateDefaultLabels(tc.initialNode, tc.existingNode) assert.Equal(t, tc.needsUpdate, needsUpdate, tc.name) assert.Equal(t, tc.finalLabels, tc.existingNode.Labels, tc.name) } } func TestUpdateDefaultResources(t *testing.T) { cases := []struct { name string initialNode *v1.Node existingNode *v1.Node expectedNode *v1.Node needsUpdate bool }{ { name: "no update needed when capacity and allocatable of the existing node are not nil", initialNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, needsUpdate: false, }, { name: "no update needed when capacity and allocatable of the initial node are nil", initialNode: &v1.Node{}, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, needsUpdate: false, }, { name: "update needed when capacity and allocatable of the existing node are nil and capacity and allocatable of the initial node are not nil", initialNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, existingNode: &v1.Node{}, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, needsUpdate: true, }, { name: "update needed when capacity of the existing node is nil and capacity of the initial node is not nil", initialNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, existingNode: &v1.Node{ Status: v1.NodeStatus{ Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, needsUpdate: true, }, { name: "update needed when allocatable of the existing node is nil and allocatable of the initial node is not nil", initialNode: &v1.Node{ Status: v1.NodeStatus{ Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, needsUpdate: true, }, { name: "no update needed but capacity and allocatable of existing node should be initialized", initialNode: &v1.Node{}, existingNode: &v1.Node{}, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{}, }, }, needsUpdate: false, }, } for _, tc := range cases { t.Run(tc.name, func(T *testing.T) { needsUpdate := updateDefaultResources(tc.initialNode, tc.existingNode) assert.Equal(t, tc.needsUpdate, needsUpdate, tc.name) assert.Equal(t, tc.expectedNode, tc.existingNode, tc.name) }) } } func TestReconcileHugePageResource(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) hugePageResourceName64Ki := v1.ResourceName("hugepages-64Ki") hugePageResourceName2Mi := v1.ResourceName("hugepages-2Mi") hugePageResourceName1Gi := v1.ResourceName("hugepages-1Gi") cases := []struct { name string testKubelet *TestKubelet initialNode *v1.Node existingNode *v1.Node expectedNode *v1.Node needsUpdate bool }{ { name: "no update needed when all huge page resources are similar", testKubelet: testKubelet, needsUpdate: false, initialNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), hugePageResourceName64Ki: *resource.NewQuantity(0, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), hugePageResourceName64Ki: *resource.NewQuantity(0, resource.BinarySI), }, }, }, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), hugePageResourceName64Ki: *resource.NewQuantity(0, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), hugePageResourceName64Ki: *resource.NewQuantity(0, resource.BinarySI), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), hugePageResourceName64Ki: *resource.NewQuantity(0, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), hugePageResourceName64Ki: *resource.NewQuantity(0, resource.BinarySI), }, }, }, }, { name: "update needed when new huge page resources is supported", testKubelet: testKubelet, needsUpdate: true, initialNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: *resource.NewQuantity(0, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: *resource.NewQuantity(0, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, }, }, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: *resource.NewQuantity(0, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: *resource.NewQuantity(0, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, }, }, }, { name: "update needed when huge page resource quantity has changed", testKubelet: testKubelet, needsUpdate: true, initialNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("4Gi"), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("4Gi"), }, }, }, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("4Gi"), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("4Gi"), }, }, }, }, { name: "update needed when a huge page resources is no longer supported", testKubelet: testKubelet, needsUpdate: true, initialNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, }, }, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: *resource.NewQuantity(0, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: *resource.NewQuantity(0, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName1Gi: resource.MustParse("2Gi"), }, }, }, }, { name: "not panic when capacity or allocatable of existing node is nil", testKubelet: testKubelet, needsUpdate: true, initialNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), hugePageResourceName64Ki: *resource.NewQuantity(0, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), hugePageResourceName64Ki: *resource.NewQuantity(0, resource.BinarySI), }, }, }, existingNode: &v1.Node{ Status: v1.NodeStatus{}, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), hugePageResourceName64Ki: *resource.NewQuantity(0, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), hugePageResourceName2Mi: resource.MustParse("100Mi"), hugePageResourceName64Ki: *resource.NewQuantity(0, resource.BinarySI), }, }, }, }, } for _, tc := range cases { t.Run(tc.name, func(T *testing.T) { defer testKubelet.Cleanup() kubelet := testKubelet.kubelet needsUpdate := kubelet.reconcileHugePageResource(tc.initialNode, tc.existingNode) assert.Equal(t, tc.needsUpdate, needsUpdate, tc.name) assert.Equal(t, tc.expectedNode, tc.existingNode, tc.name) }) } } func TestReconcileExtendedResource(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used testKubelet.kubelet.containerManager = cm.NewStubContainerManagerWithExtendedResource(true /* shouldResetExtendedResourceCapacity*/) testKubeletNoReset := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubeletNoReset.Cleanup() extendedResourceName1 := v1.ResourceName("test.com/resource1") extendedResourceName2 := v1.ResourceName("test.com/resource2") cases := []struct { name string testKubelet *TestKubelet initialNode *v1.Node existingNode *v1.Node expectedNode *v1.Node needsUpdate bool }{ { name: "no update needed without extended resource", testKubelet: testKubelet, initialNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, needsUpdate: false, }, { name: "extended resource capacity is zeroed", testKubelet: testKubeletNoReset, initialNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI), }, }, }, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(0), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(0), resource.DecimalSI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(0), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(0), resource.DecimalSI), }, }, }, needsUpdate: true, }, { name: "not panic when allocatable of existing node is nil", testKubelet: testKubelet, initialNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI), }, }, }, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(0), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(0), resource.DecimalSI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10e9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(0), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(0), resource.DecimalSI), }, }, }, needsUpdate: true, }, } for _, tc := range cases { defer testKubelet.Cleanup() kubelet := testKubelet.kubelet needsUpdate := kubelet.reconcileExtendedResource(tc.initialNode, tc.existingNode) assert.Equal(t, tc.needsUpdate, needsUpdate, tc.name) assert.Equal(t, tc.expectedNode, tc.existingNode, tc.name) } } func TestValidateNodeIPParam(t *testing.T) { type test struct { nodeIP string success bool testName string } tests := []test{ { nodeIP: "", success: false, testName: "IP not set", }, { nodeIP: "127.0.0.1", success: false, testName: "IPv4 loopback address", }, { nodeIP: "::1", success: false, testName: "IPv6 loopback address", }, { nodeIP: "224.0.0.1", success: false, testName: "multicast IPv4 address", }, { nodeIP: "ff00::1", success: false, testName: "multicast IPv6 address", }, { nodeIP: "169.254.0.1", success: false, testName: "IPv4 link-local unicast address", }, { nodeIP: "fe80::0202:b3ff:fe1e:8329", success: false, testName: "IPv6 link-local unicast address", }, { nodeIP: "0.0.0.0", success: false, testName: "Unspecified IPv4 address", }, { nodeIP: "::", success: false, testName: "Unspecified IPv6 address", }, { nodeIP: "1.2.3.4", success: false, testName: "IPv4 address that doesn't belong to host", }, } addrs, err := net.InterfaceAddrs() if err != nil { assert.Error(t, err, fmt.Sprintf( "Unable to obtain a list of the node's unicast interface addresses.")) } for _, addr := range addrs { var ip net.IP switch v := addr.(type) { case *net.IPNet: ip = v.IP case *net.IPAddr: ip = v.IP } if ip.IsLoopback() || ip.IsLinkLocalUnicast() { continue } successTest := test{ nodeIP: ip.String(), success: true, testName: fmt.Sprintf("Success test case for address %s", ip.String()), } tests = append(tests, successTest) } for _, test := range tests { err := validateNodeIP(netutils.ParseIPSloppy(test.nodeIP)) if test.success { assert.NoError(t, err, "test %s", test.testName) } else { assert.Error(t, err, fmt.Sprintf("test %s", test.testName)) } } } func TestRegisterWithApiServerWithTaint(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 1024, } kubelet.setCachedMachineInfo(machineInfo) var gotNode runtime.Object kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) { createAction := action.(core.CreateAction) gotNode = createAction.GetObject() return true, gotNode, nil }) addNotImplatedReaction(kubeClient) // Make node to be unschedulable. kubelet.registerSchedulable = false // Reset kubelet status for each test. kubelet.registrationCompleted = false // Register node to apiserver. kubelet.registerWithAPIServer() // Check the unschedulable taint. got := gotNode.(*v1.Node) unschedulableTaint := &v1.Taint{ Key: v1.TaintNodeUnschedulable, Effect: v1.TaintEffectNoSchedule, } require.Equal(t, true, taintutil.TaintExists(got.Spec.Taints, unschedulableTaint), "test unschedulable taint for TaintNodesByCondition") } func TestNodeStatusHasChanged(t *testing.T) { fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) fakeFuture := metav1.Time{Time: fakeNow.Time.Add(time.Minute)} readyCondition := v1.NodeCondition{ Type: v1.NodeReady, Status: v1.ConditionTrue, LastHeartbeatTime: fakeNow, LastTransitionTime: fakeNow, } readyConditionAtDiffHearbeatTime := v1.NodeCondition{ Type: v1.NodeReady, Status: v1.ConditionTrue, LastHeartbeatTime: fakeFuture, LastTransitionTime: fakeNow, } readyConditionAtDiffTransitionTime := v1.NodeCondition{ Type: v1.NodeReady, Status: v1.ConditionTrue, LastHeartbeatTime: fakeFuture, LastTransitionTime: fakeFuture, } notReadyCondition := v1.NodeCondition{ Type: v1.NodeReady, Status: v1.ConditionFalse, LastHeartbeatTime: fakeNow, LastTransitionTime: fakeNow, } memoryPressureCondition := v1.NodeCondition{ Type: v1.NodeMemoryPressure, Status: v1.ConditionFalse, LastHeartbeatTime: fakeNow, LastTransitionTime: fakeNow, } testcases := []struct { name string originalStatus *v1.NodeStatus status *v1.NodeStatus expectChange bool }{ { name: "Node status does not change with nil status.", originalStatus: nil, status: nil, expectChange: false, }, { name: "Node status does not change with default status.", originalStatus: &v1.NodeStatus{}, status: &v1.NodeStatus{}, expectChange: false, }, { name: "Node status changes with nil and default status.", originalStatus: nil, status: &v1.NodeStatus{}, expectChange: true, }, { name: "Node status changes with nil and status.", originalStatus: nil, status: &v1.NodeStatus{ Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, }, expectChange: true, }, { name: "Node status does not change with empty conditions.", originalStatus: &v1.NodeStatus{Conditions: []v1.NodeCondition{}}, status: &v1.NodeStatus{Conditions: []v1.NodeCondition{}}, expectChange: false, }, { name: "Node status does not change", originalStatus: &v1.NodeStatus{ Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, }, status: &v1.NodeStatus{ Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, }, expectChange: false, }, { name: "Node status does not change even if heartbeat time changes.", originalStatus: &v1.NodeStatus{ Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, }, status: &v1.NodeStatus{ Conditions: []v1.NodeCondition{readyConditionAtDiffHearbeatTime, memoryPressureCondition}, }, expectChange: false, }, { name: "Node status does not change even if the orders of conditions are different.", originalStatus: &v1.NodeStatus{ Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, }, status: &v1.NodeStatus{ Conditions: []v1.NodeCondition{memoryPressureCondition, readyConditionAtDiffHearbeatTime}, }, expectChange: false, }, { name: "Node status changes if condition status differs.", originalStatus: &v1.NodeStatus{ Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, }, status: &v1.NodeStatus{ Conditions: []v1.NodeCondition{notReadyCondition, memoryPressureCondition}, }, expectChange: true, }, { name: "Node status changes if transition time changes.", originalStatus: &v1.NodeStatus{ Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, }, status: &v1.NodeStatus{ Conditions: []v1.NodeCondition{readyConditionAtDiffTransitionTime, memoryPressureCondition}, }, expectChange: true, }, { name: "Node status changes with different number of conditions.", originalStatus: &v1.NodeStatus{ Conditions: []v1.NodeCondition{readyCondition}, }, status: &v1.NodeStatus{ Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, }, expectChange: true, }, { name: "Node status changes with different phase.", originalStatus: &v1.NodeStatus{ Phase: v1.NodePending, Conditions: []v1.NodeCondition{readyCondition}, }, status: &v1.NodeStatus{ Phase: v1.NodeRunning, Conditions: []v1.NodeCondition{readyCondition}, }, expectChange: true, }, } for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { originalStatusCopy := tc.originalStatus.DeepCopy() statusCopy := tc.status.DeepCopy() changed := nodeStatusHasChanged(tc.originalStatus, tc.status) assert.Equal(t, tc.expectChange, changed, "Expect node status change to be %t, but got %t.", tc.expectChange, changed) assert.True(t, apiequality.Semantic.DeepEqual(originalStatusCopy, tc.originalStatus), "%s", cmp.Diff(originalStatusCopy, tc.originalStatus)) assert.True(t, apiequality.Semantic.DeepEqual(statusCopy, tc.status), "%s", cmp.Diff(statusCopy, tc.status)) }) } } func TestUpdateNodeAddresses(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain tests := []struct { Name string Before []v1.NodeAddress After []v1.NodeAddress }{ { Name: "nil to populated", Before: nil, After: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, }, { Name: "empty to populated", Before: []v1.NodeAddress{}, After: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, }, { Name: "populated to nil", Before: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, After: nil, }, { Name: "populated to empty", Before: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, After: []v1.NodeAddress{}, }, { Name: "multiple addresses of same type, no change", Before: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, {Type: v1.NodeInternalIP, Address: "127.0.0.3"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, After: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, {Type: v1.NodeInternalIP, Address: "127.0.0.3"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, }, { Name: "1 InternalIP to 2 InternalIP", Before: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, After: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, }, { Name: "2 InternalIP to 1 InternalIP", Before: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, After: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, }, { Name: "2 InternalIP to 2 different InternalIP", Before: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, After: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.3"}, {Type: v1.NodeInternalIP, Address: "127.0.0.4"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, }, { Name: "2 InternalIP to reversed order", Before: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, After: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, }, } for _, test := range tests { t.Run(test.Name, func(t *testing.T) { ctx := context.Background() oldNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Addresses: test.Before, }, } expectedNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Addresses: test.After, }, } _, err := kubeClient.CoreV1().Nodes().Update(ctx, oldNode, metav1.UpdateOptions{}) assert.NoError(t, err) kubelet.setNodeStatusFuncs = []func(context.Context, *v1.Node) error{ func(_ context.Context, node *v1.Node) error { node.Status.Addresses = expectedNode.Status.Addresses return nil }, } assert.NoError(t, kubelet.updateNodeStatus(ctx)) actions := kubeClient.Actions() lastAction := actions[len(actions)-1] assert.IsType(t, core.PatchActionImpl{}, lastAction) patchAction := lastAction.(core.PatchActionImpl) updatedNode, err := applyNodeStatusPatch(oldNode, patchAction.GetPatch()) require.NoError(t, err) assert.True(t, apiequality.Semantic.DeepEqual(updatedNode, expectedNode), "%s", cmp.Diff(expectedNode, updatedNode)) }) } }