1
16
17 package dra
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "net"
25 "path"
26 "sort"
27 "strings"
28 "sync"
29 "time"
30
31 "github.com/onsi/ginkgo/v2"
32 "github.com/onsi/gomega"
33 "google.golang.org/grpc"
34
35 appsv1 "k8s.io/api/apps/v1"
36 v1 "k8s.io/api/core/v1"
37 resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
38 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
39 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
40 "k8s.io/apimachinery/pkg/labels"
41 "k8s.io/apimachinery/pkg/selection"
42 "k8s.io/dynamic-resource-allocation/kubeletplugin"
43 "k8s.io/klog/v2"
44 "k8s.io/kubernetes/test/e2e/dra/test-driver/app"
45 "k8s.io/kubernetes/test/e2e/framework"
46 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
47 e2ereplicaset "k8s.io/kubernetes/test/e2e/framework/replicaset"
48 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
49 "k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
50 "k8s.io/kubernetes/test/e2e/storage/utils"
51 )
52
53 const (
54 NodePrepareResourceMethod = "/v1alpha2.Node/NodePrepareResource"
55 NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources"
56 NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource"
57 NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources"
58 NodeListAndWatchResourcesMethod = "/v1alpha3.Node/NodeListAndWatchResources"
59 )
60
61 type Nodes struct {
62 NodeNames []string
63 }
64
65
66 func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes {
67 nodes := &Nodes{}
68 ginkgo.BeforeEach(func(ctx context.Context) {
69 ginkgo.By("selecting nodes")
70
71
72
73
74
75
76 nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes)
77 framework.ExpectNoError(err, "get nodes")
78 numNodes := int32(len(nodeList.Items))
79 if int(numNodes) < minNodes {
80 e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes, numNodes)
81 }
82 nodes.NodeNames = nil
83 for _, node := range nodeList.Items {
84 nodes.NodeNames = append(nodes.NodeNames, node.Name)
85 }
86 framework.Logf("testing on nodes %v", nodes.NodeNames)
87 })
88 return nodes
89 }
90
91
92
93
94 func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() app.Resources) *Driver {
95 d := &Driver{
96 f: f,
97 fail: map[MethodInstance]bool{},
98 callCounts: map[MethodInstance]int64{},
99 NodeV1alpha2: true,
100 NodeV1alpha3: true,
101 }
102
103 ginkgo.BeforeEach(func() {
104 resources := configureResources()
105 if len(resources.Nodes) == 0 {
106
107
108 resources.Nodes = nodes.NodeNames
109 }
110 ginkgo.DeferCleanup(d.IsGone)
111 d.SetUp(nodes, resources)
112 ginkgo.DeferCleanup(d.TearDown)
113 })
114 return d
115 }
116
117 type MethodInstance struct {
118 Nodename string
119 FullMethod string
120 }
121
122 type Driver struct {
123 f *framework.Framework
124 ctx context.Context
125 cleanup []func()
126 wg sync.WaitGroup
127
128 NameSuffix string
129 Controller *app.ExampleController
130 Name string
131 Nodes map[string]*app.ExamplePlugin
132
133 parameterMode parameterMode
134 parameterAPIGroup string
135 parameterAPIVersion string
136 claimParameterAPIKind string
137 classParameterAPIKind string
138
139 NodeV1alpha2, NodeV1alpha3 bool
140
141 mutex sync.Mutex
142 fail map[MethodInstance]bool
143 callCounts map[MethodInstance]int64
144 }
145
146 type parameterMode string
147
148 const (
149 parameterModeConfigMap parameterMode = "configmap"
150 parameterModeStructured parameterMode = "structured"
151 parameterModeTranslated parameterMode = "translated"
152 )
153
154 func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
155 ginkgo.By(fmt.Sprintf("deploying driver on nodes %v", nodes.NodeNames))
156 d.Nodes = map[string]*app.ExamplePlugin{}
157 d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io"
158 resources.DriverName = d.Name
159
160 ctx, cancel := context.WithCancel(context.Background())
161 if d.NameSuffix != "" {
162 logger := klog.FromContext(ctx)
163 logger = klog.LoggerWithName(logger, "instance"+d.NameSuffix)
164 ctx = klog.NewContext(ctx, logger)
165 }
166 d.ctx = ctx
167 d.cleanup = append(d.cleanup, cancel)
168
169 switch d.parameterMode {
170 case "", parameterModeConfigMap:
171
172 d.Controller = app.NewController(d.f.ClientSet, resources)
173 d.wg.Add(1)
174 go func() {
175 defer d.wg.Done()
176 d.Controller.Run(d.ctx, 5 )
177 }()
178 }
179
180 manifests := []string{
181
182
183 "test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml",
184 }
185 if d.parameterMode == "" {
186 d.parameterMode = parameterModeConfigMap
187 }
188 var numResourceInstances = -1
189 if d.parameterMode != parameterModeConfigMap {
190 numResourceInstances = resources.MaxAllocations
191 }
192 switch d.parameterMode {
193 case parameterModeConfigMap, parameterModeTranslated:
194 d.parameterAPIGroup = ""
195 d.parameterAPIVersion = "v1"
196 d.claimParameterAPIKind = "ConfigMap"
197 d.classParameterAPIKind = "ConfigMap"
198 case parameterModeStructured:
199 d.parameterAPIGroup = "resource.k8s.io"
200 d.parameterAPIVersion = "v1alpha2"
201 d.claimParameterAPIKind = "ResourceClaimParameters"
202 d.classParameterAPIKind = "ResourceClassParameters"
203 default:
204 framework.Failf("unknown test driver parameter mode: %s", d.parameterMode)
205 }
206
207 instanceKey := "app.kubernetes.io/instance"
208 rsName := ""
209 draAddr := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name+".sock")
210 numNodes := int32(len(nodes.NodeNames))
211 err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error {
212 switch item := item.(type) {
213 case *appsv1.ReplicaSet:
214 item.Name += d.NameSuffix
215 rsName = item.Name
216 item.Spec.Replicas = &numNodes
217 item.Spec.Selector.MatchLabels[instanceKey] = d.Name
218 item.Spec.Template.Labels[instanceKey] = d.Name
219 item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = d.Name
220 item.Spec.Template.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{
221 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
222 NodeSelectorTerms: []v1.NodeSelectorTerm{
223 {
224 MatchExpressions: []v1.NodeSelectorRequirement{
225 {
226 Key: "kubernetes.io/hostname",
227 Operator: v1.NodeSelectorOpIn,
228 Values: nodes.NodeNames,
229 },
230 },
231 },
232 },
233 },
234 }
235 item.Spec.Template.Spec.Volumes[0].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins")
236 item.Spec.Template.Spec.Volumes[2].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
237 item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint=/plugins_registry/"+d.Name+"-reg.sock")
238 item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint=/dra/"+d.Name+".sock")
239 case *apiextensionsv1.CustomResourceDefinition:
240 item.Name = strings.ReplaceAll(item.Name, "dra.e2e.example.com", d.parameterAPIGroup)
241 item.Spec.Group = d.parameterAPIGroup
242
243 }
244 return nil
245 }, manifests...)
246 framework.ExpectNoError(err, "deploy kubelet plugin replicaset")
247
248 rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{})
249 framework.ExpectNoError(err, "get replicaset")
250
251
252 if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, numNodes); err != nil {
253 framework.ExpectNoError(err, "all kubelet plugin proxies running")
254 }
255 requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{d.Name})
256 framework.ExpectNoError(err, "create label selector requirement")
257 selector := labels.NewSelector().Add(*requirement)
258 pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
259 framework.ExpectNoError(err, "list proxy pods")
260 gomega.Expect(numNodes).To(gomega.Equal(int32(len(pods.Items))), "number of proxy pods")
261
262
263 for _, pod := range pods.Items {
264
265
266 pod := pod
267 nodename := pod.Spec.NodeName
268 logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
269 loggerCtx := klog.NewContext(ctx, logger)
270 plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, nodename,
271 app.FileOperations{
272 Create: func(name string, content []byte) error {
273 klog.Background().Info("creating CDI file", "node", nodename, "filename", name, "content", string(content))
274 return d.createFile(&pod, name, content)
275 },
276 Remove: func(name string) error {
277 klog.Background().Info("deleting CDI file", "node", nodename, "filename", name)
278 return d.removeFile(&pod, name)
279 },
280 NumResourceInstances: numResourceInstances,
281 },
282 kubeletplugin.GRPCVerbosity(0),
283 kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
284 return d.interceptor(nodename, ctx, req, info, handler)
285 }),
286 kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
287 return d.streamInterceptor(nodename, srv, ss, info, handler)
288 }),
289 kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)),
290 kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)),
291 kubeletplugin.KubeletPluginSocketPath(draAddr),
292 kubeletplugin.NodeV1alpha2(d.NodeV1alpha2),
293 kubeletplugin.NodeV1alpha3(d.NodeV1alpha3),
294 )
295 framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
296 d.cleanup = append(d.cleanup, func() {
297
298 plugin.Stop()
299 })
300 d.Nodes[nodename] = plugin
301 }
302
303
304 ginkgo.By("wait for plugin registration")
305 gomega.Eventually(func() map[string][]app.GRPCCall {
306 notRegistered := make(map[string][]app.GRPCCall)
307 for nodename, plugin := range d.Nodes {
308 calls := plugin.GetGRPCCalls()
309 if contains, err := app.BeRegistered.Match(calls); err != nil || !contains {
310 notRegistered[nodename] = calls
311 }
312 }
313 return notRegistered
314 }).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "hosts where the plugin has not been registered yet")
315 }
316
317 func (d *Driver) createFile(pod *v1.Pod, name string, content []byte) error {
318 buffer := bytes.NewBuffer(content)
319
320
321 tmpName := name + ".tmp"
322 if err := d.podIO(pod).CreateFile(tmpName, buffer); err != nil {
323 _ = d.podIO(pod).RemoveAll(tmpName)
324 return err
325 }
326 return d.podIO(pod).Rename(tmpName, name)
327 }
328
329 func (d *Driver) removeFile(pod *v1.Pod, name string) error {
330 return d.podIO(pod).RemoveAll(name)
331 }
332
333 func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO {
334 logger := klog.Background()
335 return proxy.PodDirIO{
336 F: d.f,
337 Namespace: pod.Namespace,
338 PodName: pod.Name,
339 ContainerName: "plugin",
340 Logger: &logger,
341 }
342 }
343
344 func listen(ctx context.Context, f *framework.Framework, podName, containerName string, port int) net.Listener {
345 addr := proxy.Addr{
346 Namespace: f.Namespace.Name,
347 PodName: podName,
348 ContainerName: containerName,
349 Port: port,
350 }
351 listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr)
352 framework.ExpectNoError(err, "listen for connections from %+v", addr)
353 return listener
354 }
355
356 func (d *Driver) TearDown() {
357 for _, c := range d.cleanup {
358 c()
359 }
360 d.cleanup = nil
361 d.wg.Wait()
362 }
363
364 func (d *Driver) IsGone(ctx context.Context) {
365 gomega.Eventually(ctx, func(ctx context.Context) ([]resourcev1alpha2.ResourceSlice, error) {
366 slices, err := d.f.ClientSet.ResourceV1alpha2().ResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: "driverName=" + d.Name})
367 if err != nil {
368 return nil, err
369 }
370 return slices.Items, err
371 }).Should(gomega.BeEmpty())
372 }
373
374 func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
375 d.mutex.Lock()
376 defer d.mutex.Unlock()
377
378 m := MethodInstance{nodename, info.FullMethod}
379 d.callCounts[m]++
380 if d.fail[m] {
381 return nil, errors.New("injected error")
382 }
383
384 return handler(ctx, req)
385 }
386
387 func (d *Driver) streamInterceptor(nodename string, srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
388
389
390 d.mutex.Lock()
391 m := MethodInstance{nodename, info.FullMethod}
392 d.callCounts[m]++
393 fail := d.fail[m]
394 d.mutex.Unlock()
395
396 if fail {
397 return errors.New("injected error")
398 }
399
400 return handler(srv, stream)
401 }
402
403 func (d *Driver) Fail(m MethodInstance, injectError bool) {
404 d.mutex.Lock()
405 defer d.mutex.Unlock()
406
407 d.fail[m] = injectError
408 }
409
410 func (d *Driver) CallCount(m MethodInstance) int64 {
411 d.mutex.Lock()
412 defer d.mutex.Unlock()
413
414 return d.callCounts[m]
415 }
416
417 func (d *Driver) Nodenames() (nodenames []string) {
418 for nodename := range d.Nodes {
419 nodenames = append(nodenames, nodename)
420 }
421 sort.Strings(nodenames)
422 return
423 }
424
View as plain text