...

Source file src/k8s.io/kubernetes/test/e2e_node/dra_test.go

Documentation: k8s.io/kubernetes/test/e2e_node

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  /*
    18  E2E Node test for DRA (Dynamic Resource Allocation)
    19  This test covers node-specific aspects of DRA
    20  The test can be run locally on Linux this way:
    21    make test-e2e-node FOCUS='\[NodeAlphaFeature:DynamicResourceAllocation\]' SKIP='\[Flaky\]' PARALLELISM=1 \
    22         TEST_ARGS='--feature-gates="DynamicResourceAllocation=true" --service-feature-gates="DynamicResourceAllocation=true" --runtime-config=api/all=true'
    23  */
    24  
    25  package e2enode
    26  
    27  import (
    28  	"context"
    29  	"os"
    30  	"path"
    31  	"path/filepath"
    32  	"time"
    33  
    34  	"github.com/onsi/ginkgo/v2"
    35  	"github.com/onsi/gomega"
    36  
    37  	v1 "k8s.io/api/core/v1"
    38  	resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
    39  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    40  	"k8s.io/client-go/kubernetes"
    41  	"k8s.io/klog/v2"
    42  	dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
    43  	admissionapi "k8s.io/pod-security-admission/api"
    44  
    45  	"k8s.io/kubernetes/test/e2e/feature"
    46  	"k8s.io/kubernetes/test/e2e/framework"
    47  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    48  
    49  	"k8s.io/dynamic-resource-allocation/kubeletplugin"
    50  	testdriver "k8s.io/kubernetes/test/e2e/dra/test-driver/app"
    51  )
    52  
    53  const (
    54  	driverName                = "test-driver.cdi.k8s.io"
    55  	cdiDir                    = "/var/run/cdi"
    56  	endpoint                  = "/var/lib/kubelet/plugins/test-driver/dra.sock"
    57  	pluginRegistrationPath    = "/var/lib/kubelet/plugins_registry"
    58  	draAddress                = "/var/lib/kubelet/plugins/test-driver/dra.sock"
    59  	pluginRegistrationTimeout = time.Second * 60 // how long to wait for a node plugin to be registered
    60  	podInPendingStateTimeout  = time.Second * 60 // how long to wait for a pod to stay in pending state
    61  )
    62  
    63  var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, "[NodeAlphaFeature:DynamicResourceAllocation]", func() {
    64  	f := framework.NewDefaultFramework("dra-node")
    65  	f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
    66  
    67  	var kubeletPlugin *testdriver.ExamplePlugin
    68  
    69  	f.Context("Resource Kubelet Plugin", f.WithSerial(), func() {
    70  		ginkgo.BeforeEach(func(ctx context.Context) {
    71  			kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f))
    72  		})
    73  
    74  		ginkgo.It("must register after Kubelet restart", func(ctx context.Context) {
    75  			oldCalls := kubeletPlugin.GetGRPCCalls()
    76  			getNewCalls := func() []testdriver.GRPCCall {
    77  				calls := kubeletPlugin.GetGRPCCalls()
    78  				return calls[len(oldCalls):]
    79  			}
    80  
    81  			ginkgo.By("restarting Kubelet")
    82  			restartKubelet(true)
    83  
    84  			ginkgo.By("wait for Kubelet plugin re-registration")
    85  			gomega.Eventually(getNewCalls).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
    86  		})
    87  
    88  		ginkgo.It("must register after plugin restart", func(ctx context.Context) {
    89  			ginkgo.By("restart Kubelet Plugin")
    90  			kubeletPlugin.Stop()
    91  			kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f))
    92  
    93  			ginkgo.By("wait for Kubelet plugin re-registration")
    94  			gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
    95  		})
    96  
    97  		ginkgo.It("must process pod created when kubelet is not running", func(ctx context.Context) {
    98  			// Stop Kubelet
    99  			startKubelet := stopKubelet()
   100  			pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod")
   101  			// Pod must be in pending state
   102  			err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
   103  				return pod.Status.Phase == v1.PodPending, nil
   104  			})
   105  			framework.ExpectNoError(err)
   106  			// Start Kubelet
   107  			startKubelet()
   108  			// Pod should succeed
   109  			err = e2epod.WaitForPodSuccessInNamespaceTimeout(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartShortTimeout)
   110  			framework.ExpectNoError(err)
   111  		})
   112  
   113  		ginkgo.It("must keep pod in pending state if NodePrepareResources times out", func(ctx context.Context) {
   114  			ginkgo.By("set delay for the NodePrepareResources call")
   115  			kubeletPlugin.Block()
   116  			pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod")
   117  
   118  			ginkgo.By("wait for pod to be in Pending state")
   119  			err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
   120  				return pod.Status.Phase == v1.PodPending, nil
   121  			})
   122  			framework.ExpectNoError(err)
   123  
   124  			ginkgo.By("wait for NodePrepareResources call")
   125  			gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesCalled)
   126  
   127  			// TODO: Check condition or event when implemented
   128  			// see https://github.com/kubernetes/kubernetes/issues/118468 for details
   129  			ginkgo.By("check that pod is consistently in Pending state")
   130  			gomega.Consistently(ctx, e2epod.Get(f.ClientSet, pod)).WithTimeout(podInPendingStateTimeout).Should(e2epod.BeInPhase(v1.PodPending),
   131  				"Pod should be in Pending state as resource preparation time outed")
   132  		})
   133  	})
   134  })
   135  
   136  // Run Kubelet plugin and wait until it's registered
   137  func newKubeletPlugin(ctx context.Context, nodeName string) *testdriver.ExamplePlugin {
   138  	ginkgo.By("start Kubelet plugin")
   139  	logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", nodeName)
   140  	ctx = klog.NewContext(ctx, logger)
   141  
   142  	// Ensure that directories exist, creating them if necessary. We want
   143  	// to know early if there is a setup problem that would prevent
   144  	// creating those directories.
   145  	err := os.MkdirAll(cdiDir, os.FileMode(0750))
   146  	framework.ExpectNoError(err, "create CDI directory")
   147  	err = os.MkdirAll(filepath.Dir(endpoint), 0750)
   148  	framework.ExpectNoError(err, "create socket directory")
   149  
   150  	plugin, err := testdriver.StartPlugin(
   151  		ctx,
   152  		cdiDir,
   153  		driverName,
   154  		"",
   155  		testdriver.FileOperations{},
   156  		kubeletplugin.PluginSocketPath(endpoint),
   157  		kubeletplugin.RegistrarSocketPath(path.Join(pluginRegistrationPath, driverName+"-reg.sock")),
   158  		kubeletplugin.KubeletPluginSocketPath(draAddress),
   159  	)
   160  	framework.ExpectNoError(err)
   161  
   162  	gomega.Eventually(plugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
   163  
   164  	ginkgo.DeferCleanup(plugin.Stop)
   165  
   166  	return plugin
   167  }
   168  
   169  // createTestObjects creates objects required by the test
   170  // NOTE: as scheduler and controller manager are not running by the Node e2e,
   171  // the objects must contain all required data to be processed correctly by the API server
   172  // and placed on the node without involving the scheduler and the DRA controller
   173  func createTestObjects(ctx context.Context, clientSet kubernetes.Interface, nodename, namespace, className, claimName, podName string) *v1.Pod {
   174  	// ResourceClass
   175  	class := &resourcev1alpha2.ResourceClass{
   176  		ObjectMeta: metav1.ObjectMeta{
   177  			Name: className,
   178  		},
   179  		DriverName: driverName,
   180  	}
   181  	_, err := clientSet.ResourceV1alpha2().ResourceClasses().Create(ctx, class, metav1.CreateOptions{})
   182  	framework.ExpectNoError(err)
   183  
   184  	ginkgo.DeferCleanup(clientSet.ResourceV1alpha2().ResourceClasses().Delete, className, metav1.DeleteOptions{})
   185  
   186  	// ResourceClaim
   187  	podClaimName := "resource-claim"
   188  	claim := &resourcev1alpha2.ResourceClaim{
   189  		ObjectMeta: metav1.ObjectMeta{
   190  			Name: claimName,
   191  		},
   192  		Spec: resourcev1alpha2.ResourceClaimSpec{
   193  			ResourceClassName: className,
   194  		},
   195  	}
   196  	createdClaim, err := clientSet.ResourceV1alpha2().ResourceClaims(namespace).Create(ctx, claim, metav1.CreateOptions{})
   197  	framework.ExpectNoError(err)
   198  
   199  	ginkgo.DeferCleanup(clientSet.ResourceV1alpha2().ResourceClaims(namespace).Delete, claimName, metav1.DeleteOptions{})
   200  
   201  	// Pod
   202  	containerName := "testcontainer"
   203  	pod := &v1.Pod{
   204  		ObjectMeta: metav1.ObjectMeta{
   205  			Name:      podName,
   206  			Namespace: namespace,
   207  		},
   208  		Spec: v1.PodSpec{
   209  			NodeName: nodename, // Assign the node as the scheduler is not running
   210  			ResourceClaims: []v1.PodResourceClaim{
   211  				{
   212  					Name: podClaimName,
   213  					Source: v1.ClaimSource{
   214  						ResourceClaimName: &claimName,
   215  					},
   216  				},
   217  			},
   218  			Containers: []v1.Container{
   219  				{
   220  					Name:  containerName,
   221  					Image: e2epod.GetDefaultTestImage(),
   222  					Resources: v1.ResourceRequirements{
   223  						Claims: []v1.ResourceClaim{{Name: podClaimName}},
   224  					},
   225  					Command: []string{"/bin/sh", "-c", "env | grep DRA_PARAM1=PARAM1_VALUE"},
   226  				},
   227  			},
   228  			RestartPolicy: v1.RestartPolicyNever,
   229  		},
   230  	}
   231  	createdPod, err := clientSet.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
   232  	framework.ExpectNoError(err)
   233  
   234  	ginkgo.DeferCleanup(clientSet.CoreV1().Pods(namespace).Delete, podName, metav1.DeleteOptions{})
   235  
   236  	// Update claim status: set ReservedFor and AllocationResult
   237  	// NOTE: This is usually done by the DRA controller
   238  	createdClaim.Status = resourcev1alpha2.ResourceClaimStatus{
   239  		DriverName: driverName,
   240  		ReservedFor: []resourcev1alpha2.ResourceClaimConsumerReference{
   241  			{Resource: "pods", Name: podName, UID: createdPod.UID},
   242  		},
   243  		Allocation: &resourcev1alpha2.AllocationResult{
   244  			ResourceHandles: []resourcev1alpha2.ResourceHandle{
   245  				{
   246  					DriverName: driverName,
   247  					Data:       "{\"EnvVars\":{\"DRA_PARAM1\":\"PARAM1_VALUE\"},\"NodeName\":\"\"}",
   248  				},
   249  			},
   250  		},
   251  	}
   252  	_, err = clientSet.ResourceV1alpha2().ResourceClaims(namespace).UpdateStatus(ctx, createdClaim, metav1.UpdateOptions{})
   253  	framework.ExpectNoError(err)
   254  
   255  	return pod
   256  }
   257  

View as plain text