1
16
17
24
25 package e2enode
26
27 import (
28 "context"
29 "os"
30 "path"
31 "path/filepath"
32 "time"
33
34 "github.com/onsi/ginkgo/v2"
35 "github.com/onsi/gomega"
36
37 v1 "k8s.io/api/core/v1"
38 resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
39 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
40 "k8s.io/client-go/kubernetes"
41 "k8s.io/klog/v2"
42 dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
43 admissionapi "k8s.io/pod-security-admission/api"
44
45 "k8s.io/kubernetes/test/e2e/feature"
46 "k8s.io/kubernetes/test/e2e/framework"
47 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
48
49 "k8s.io/dynamic-resource-allocation/kubeletplugin"
50 testdriver "k8s.io/kubernetes/test/e2e/dra/test-driver/app"
51 )
52
53 const (
54 driverName = "test-driver.cdi.k8s.io"
55 cdiDir = "/var/run/cdi"
56 endpoint = "/var/lib/kubelet/plugins/test-driver/dra.sock"
57 pluginRegistrationPath = "/var/lib/kubelet/plugins_registry"
58 draAddress = "/var/lib/kubelet/plugins/test-driver/dra.sock"
59 pluginRegistrationTimeout = time.Second * 60
60 podInPendingStateTimeout = time.Second * 60
61 )
62
63 var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, "[NodeAlphaFeature:DynamicResourceAllocation]", func() {
64 f := framework.NewDefaultFramework("dra-node")
65 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
66
67 var kubeletPlugin *testdriver.ExamplePlugin
68
69 f.Context("Resource Kubelet Plugin", f.WithSerial(), func() {
70 ginkgo.BeforeEach(func(ctx context.Context) {
71 kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f))
72 })
73
74 ginkgo.It("must register after Kubelet restart", func(ctx context.Context) {
75 oldCalls := kubeletPlugin.GetGRPCCalls()
76 getNewCalls := func() []testdriver.GRPCCall {
77 calls := kubeletPlugin.GetGRPCCalls()
78 return calls[len(oldCalls):]
79 }
80
81 ginkgo.By("restarting Kubelet")
82 restartKubelet(true)
83
84 ginkgo.By("wait for Kubelet plugin re-registration")
85 gomega.Eventually(getNewCalls).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
86 })
87
88 ginkgo.It("must register after plugin restart", func(ctx context.Context) {
89 ginkgo.By("restart Kubelet Plugin")
90 kubeletPlugin.Stop()
91 kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f))
92
93 ginkgo.By("wait for Kubelet plugin re-registration")
94 gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
95 })
96
97 ginkgo.It("must process pod created when kubelet is not running", func(ctx context.Context) {
98
99 startKubelet := stopKubelet()
100 pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod")
101
102 err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
103 return pod.Status.Phase == v1.PodPending, nil
104 })
105 framework.ExpectNoError(err)
106
107 startKubelet()
108
109 err = e2epod.WaitForPodSuccessInNamespaceTimeout(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartShortTimeout)
110 framework.ExpectNoError(err)
111 })
112
113 ginkgo.It("must keep pod in pending state if NodePrepareResources times out", func(ctx context.Context) {
114 ginkgo.By("set delay for the NodePrepareResources call")
115 kubeletPlugin.Block()
116 pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod")
117
118 ginkgo.By("wait for pod to be in Pending state")
119 err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
120 return pod.Status.Phase == v1.PodPending, nil
121 })
122 framework.ExpectNoError(err)
123
124 ginkgo.By("wait for NodePrepareResources call")
125 gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesCalled)
126
127
128
129 ginkgo.By("check that pod is consistently in Pending state")
130 gomega.Consistently(ctx, e2epod.Get(f.ClientSet, pod)).WithTimeout(podInPendingStateTimeout).Should(e2epod.BeInPhase(v1.PodPending),
131 "Pod should be in Pending state as resource preparation time outed")
132 })
133 })
134 })
135
136
137 func newKubeletPlugin(ctx context.Context, nodeName string) *testdriver.ExamplePlugin {
138 ginkgo.By("start Kubelet plugin")
139 logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", nodeName)
140 ctx = klog.NewContext(ctx, logger)
141
142
143
144
145 err := os.MkdirAll(cdiDir, os.FileMode(0750))
146 framework.ExpectNoError(err, "create CDI directory")
147 err = os.MkdirAll(filepath.Dir(endpoint), 0750)
148 framework.ExpectNoError(err, "create socket directory")
149
150 plugin, err := testdriver.StartPlugin(
151 ctx,
152 cdiDir,
153 driverName,
154 "",
155 testdriver.FileOperations{},
156 kubeletplugin.PluginSocketPath(endpoint),
157 kubeletplugin.RegistrarSocketPath(path.Join(pluginRegistrationPath, driverName+"-reg.sock")),
158 kubeletplugin.KubeletPluginSocketPath(draAddress),
159 )
160 framework.ExpectNoError(err)
161
162 gomega.Eventually(plugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
163
164 ginkgo.DeferCleanup(plugin.Stop)
165
166 return plugin
167 }
168
169
170
171
172
173 func createTestObjects(ctx context.Context, clientSet kubernetes.Interface, nodename, namespace, className, claimName, podName string) *v1.Pod {
174
175 class := &resourcev1alpha2.ResourceClass{
176 ObjectMeta: metav1.ObjectMeta{
177 Name: className,
178 },
179 DriverName: driverName,
180 }
181 _, err := clientSet.ResourceV1alpha2().ResourceClasses().Create(ctx, class, metav1.CreateOptions{})
182 framework.ExpectNoError(err)
183
184 ginkgo.DeferCleanup(clientSet.ResourceV1alpha2().ResourceClasses().Delete, className, metav1.DeleteOptions{})
185
186
187 podClaimName := "resource-claim"
188 claim := &resourcev1alpha2.ResourceClaim{
189 ObjectMeta: metav1.ObjectMeta{
190 Name: claimName,
191 },
192 Spec: resourcev1alpha2.ResourceClaimSpec{
193 ResourceClassName: className,
194 },
195 }
196 createdClaim, err := clientSet.ResourceV1alpha2().ResourceClaims(namespace).Create(ctx, claim, metav1.CreateOptions{})
197 framework.ExpectNoError(err)
198
199 ginkgo.DeferCleanup(clientSet.ResourceV1alpha2().ResourceClaims(namespace).Delete, claimName, metav1.DeleteOptions{})
200
201
202 containerName := "testcontainer"
203 pod := &v1.Pod{
204 ObjectMeta: metav1.ObjectMeta{
205 Name: podName,
206 Namespace: namespace,
207 },
208 Spec: v1.PodSpec{
209 NodeName: nodename,
210 ResourceClaims: []v1.PodResourceClaim{
211 {
212 Name: podClaimName,
213 Source: v1.ClaimSource{
214 ResourceClaimName: &claimName,
215 },
216 },
217 },
218 Containers: []v1.Container{
219 {
220 Name: containerName,
221 Image: e2epod.GetDefaultTestImage(),
222 Resources: v1.ResourceRequirements{
223 Claims: []v1.ResourceClaim{{Name: podClaimName}},
224 },
225 Command: []string{"/bin/sh", "-c", "env | grep DRA_PARAM1=PARAM1_VALUE"},
226 },
227 },
228 RestartPolicy: v1.RestartPolicyNever,
229 },
230 }
231 createdPod, err := clientSet.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
232 framework.ExpectNoError(err)
233
234 ginkgo.DeferCleanup(clientSet.CoreV1().Pods(namespace).Delete, podName, metav1.DeleteOptions{})
235
236
237
238 createdClaim.Status = resourcev1alpha2.ResourceClaimStatus{
239 DriverName: driverName,
240 ReservedFor: []resourcev1alpha2.ResourceClaimConsumerReference{
241 {Resource: "pods", Name: podName, UID: createdPod.UID},
242 },
243 Allocation: &resourcev1alpha2.AllocationResult{
244 ResourceHandles: []resourcev1alpha2.ResourceHandle{
245 {
246 DriverName: driverName,
247 Data: "{\"EnvVars\":{\"DRA_PARAM1\":\"PARAM1_VALUE\"},\"NodeName\":\"\"}",
248 },
249 },
250 },
251 }
252 _, err = clientSet.ResourceV1alpha2().ResourceClaims(namespace).UpdateStatus(ctx, createdClaim, metav1.UpdateOptions{})
253 framework.ExpectNoError(err)
254
255 return pod
256 }
257
View as plain text