...

Source file src/k8s.io/kubernetes/test/e2e/dra/deploy.go

Documentation: k8s.io/kubernetes/test/e2e/dra

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package dra
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"errors"
    23  	"fmt"
    24  	"net"
    25  	"path"
    26  	"sort"
    27  	"strings"
    28  	"sync"
    29  	"time"
    30  
    31  	"github.com/onsi/ginkgo/v2"
    32  	"github.com/onsi/gomega"
    33  	"google.golang.org/grpc"
    34  
    35  	appsv1 "k8s.io/api/apps/v1"
    36  	v1 "k8s.io/api/core/v1"
    37  	resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
    38  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    39  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    40  	"k8s.io/apimachinery/pkg/labels"
    41  	"k8s.io/apimachinery/pkg/selection"
    42  	"k8s.io/dynamic-resource-allocation/kubeletplugin"
    43  	"k8s.io/klog/v2"
    44  	"k8s.io/kubernetes/test/e2e/dra/test-driver/app"
    45  	"k8s.io/kubernetes/test/e2e/framework"
    46  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    47  	e2ereplicaset "k8s.io/kubernetes/test/e2e/framework/replicaset"
    48  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    49  	"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
    50  	"k8s.io/kubernetes/test/e2e/storage/utils"
    51  )
    52  
    53  const (
    54  	NodePrepareResourceMethod       = "/v1alpha2.Node/NodePrepareResource"
    55  	NodePrepareResourcesMethod      = "/v1alpha3.Node/NodePrepareResources"
    56  	NodeUnprepareResourceMethod     = "/v1alpha2.Node/NodeUnprepareResource"
    57  	NodeUnprepareResourcesMethod    = "/v1alpha3.Node/NodeUnprepareResources"
    58  	NodeListAndWatchResourcesMethod = "/v1alpha3.Node/NodeListAndWatchResources"
    59  )
    60  
    61  type Nodes struct {
    62  	NodeNames []string
    63  }
    64  
    65  // NewNodes selects nodes to run the test on.
    66  func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes {
    67  	nodes := &Nodes{}
    68  	ginkgo.BeforeEach(func(ctx context.Context) {
    69  		ginkgo.By("selecting nodes")
    70  		// The kubelet plugin is harder. We deploy the builtin manifest
    71  		// after patching in the driver name and all nodes on which we
    72  		// want the plugin to run.
    73  		//
    74  		// Only a subset of the nodes are picked to avoid causing
    75  		// unnecessary load on a big cluster.
    76  		nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes)
    77  		framework.ExpectNoError(err, "get nodes")
    78  		numNodes := int32(len(nodeList.Items))
    79  		if int(numNodes) < minNodes {
    80  			e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes, numNodes)
    81  		}
    82  		nodes.NodeNames = nil
    83  		for _, node := range nodeList.Items {
    84  			nodes.NodeNames = append(nodes.NodeNames, node.Name)
    85  		}
    86  		framework.Logf("testing on nodes %v", nodes.NodeNames)
    87  	})
    88  	return nodes
    89  }
    90  
    91  // NewDriver sets up controller (as client of the cluster) and
    92  // kubelet plugin (via proxy) before the test runs. It cleans
    93  // up after the test.
    94  func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() app.Resources) *Driver {
    95  	d := &Driver{
    96  		f:            f,
    97  		fail:         map[MethodInstance]bool{},
    98  		callCounts:   map[MethodInstance]int64{},
    99  		NodeV1alpha2: true,
   100  		NodeV1alpha3: true,
   101  	}
   102  
   103  	ginkgo.BeforeEach(func() {
   104  		resources := configureResources()
   105  		if len(resources.Nodes) == 0 {
   106  			// This always has to be set because the driver might
   107  			// not run on all nodes.
   108  			resources.Nodes = nodes.NodeNames
   109  		}
   110  		ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last.
   111  		d.SetUp(nodes, resources)
   112  		ginkgo.DeferCleanup(d.TearDown)
   113  	})
   114  	return d
   115  }
   116  
   117  type MethodInstance struct {
   118  	Nodename   string
   119  	FullMethod string
   120  }
   121  
   122  type Driver struct {
   123  	f       *framework.Framework
   124  	ctx     context.Context
   125  	cleanup []func() // executed first-in-first-out
   126  	wg      sync.WaitGroup
   127  
   128  	NameSuffix string
   129  	Controller *app.ExampleController
   130  	Name       string
   131  	Nodes      map[string]*app.ExamplePlugin
   132  
   133  	parameterMode         parameterMode
   134  	parameterAPIGroup     string
   135  	parameterAPIVersion   string
   136  	claimParameterAPIKind string
   137  	classParameterAPIKind string
   138  
   139  	NodeV1alpha2, NodeV1alpha3 bool
   140  
   141  	mutex      sync.Mutex
   142  	fail       map[MethodInstance]bool
   143  	callCounts map[MethodInstance]int64
   144  }
   145  
   146  type parameterMode string
   147  
   148  const (
   149  	parameterModeConfigMap  parameterMode = "configmap"  // ConfigMap parameters, control plane controller.
   150  	parameterModeStructured parameterMode = "structured" // No ConfigMaps, directly create and reference in-tree parameter objects.
   151  	parameterModeTranslated parameterMode = "translated" // Reference ConfigMaps in claim and class, generate in-tree parameter objects.
   152  )
   153  
   154  func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
   155  	ginkgo.By(fmt.Sprintf("deploying driver on nodes %v", nodes.NodeNames))
   156  	d.Nodes = map[string]*app.ExamplePlugin{}
   157  	d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io"
   158  	resources.DriverName = d.Name
   159  
   160  	ctx, cancel := context.WithCancel(context.Background())
   161  	if d.NameSuffix != "" {
   162  		logger := klog.FromContext(ctx)
   163  		logger = klog.LoggerWithName(logger, "instance"+d.NameSuffix)
   164  		ctx = klog.NewContext(ctx, logger)
   165  	}
   166  	d.ctx = ctx
   167  	d.cleanup = append(d.cleanup, cancel)
   168  
   169  	switch d.parameterMode {
   170  	case "", parameterModeConfigMap:
   171  		// The controller is easy: we simply connect to the API server.
   172  		d.Controller = app.NewController(d.f.ClientSet, resources)
   173  		d.wg.Add(1)
   174  		go func() {
   175  			defer d.wg.Done()
   176  			d.Controller.Run(d.ctx, 5 /* workers */)
   177  		}()
   178  	}
   179  
   180  	manifests := []string{
   181  		// The code below matches the content of this manifest (ports,
   182  		// container names, etc.).
   183  		"test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml",
   184  	}
   185  	if d.parameterMode == "" {
   186  		d.parameterMode = parameterModeConfigMap
   187  	}
   188  	var numResourceInstances = -1 // disabled
   189  	if d.parameterMode != parameterModeConfigMap {
   190  		numResourceInstances = resources.MaxAllocations
   191  	}
   192  	switch d.parameterMode {
   193  	case parameterModeConfigMap, parameterModeTranslated:
   194  		d.parameterAPIGroup = ""
   195  		d.parameterAPIVersion = "v1"
   196  		d.claimParameterAPIKind = "ConfigMap"
   197  		d.classParameterAPIKind = "ConfigMap"
   198  	case parameterModeStructured:
   199  		d.parameterAPIGroup = "resource.k8s.io"
   200  		d.parameterAPIVersion = "v1alpha2"
   201  		d.claimParameterAPIKind = "ResourceClaimParameters"
   202  		d.classParameterAPIKind = "ResourceClassParameters"
   203  	default:
   204  		framework.Failf("unknown test driver parameter mode: %s", d.parameterMode)
   205  	}
   206  
   207  	instanceKey := "app.kubernetes.io/instance"
   208  	rsName := ""
   209  	draAddr := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name+".sock")
   210  	numNodes := int32(len(nodes.NodeNames))
   211  	err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error {
   212  		switch item := item.(type) {
   213  		case *appsv1.ReplicaSet:
   214  			item.Name += d.NameSuffix
   215  			rsName = item.Name
   216  			item.Spec.Replicas = &numNodes
   217  			item.Spec.Selector.MatchLabels[instanceKey] = d.Name
   218  			item.Spec.Template.Labels[instanceKey] = d.Name
   219  			item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = d.Name
   220  			item.Spec.Template.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{
   221  				RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
   222  					NodeSelectorTerms: []v1.NodeSelectorTerm{
   223  						{
   224  							MatchExpressions: []v1.NodeSelectorRequirement{
   225  								{
   226  									Key:      "kubernetes.io/hostname",
   227  									Operator: v1.NodeSelectorOpIn,
   228  									Values:   nodes.NodeNames,
   229  								},
   230  							},
   231  						},
   232  					},
   233  				},
   234  			}
   235  			item.Spec.Template.Spec.Volumes[0].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins")
   236  			item.Spec.Template.Spec.Volumes[2].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
   237  			item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint=/plugins_registry/"+d.Name+"-reg.sock")
   238  			item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint=/dra/"+d.Name+".sock")
   239  		case *apiextensionsv1.CustomResourceDefinition:
   240  			item.Name = strings.ReplaceAll(item.Name, "dra.e2e.example.com", d.parameterAPIGroup)
   241  			item.Spec.Group = d.parameterAPIGroup
   242  
   243  		}
   244  		return nil
   245  	}, manifests...)
   246  	framework.ExpectNoError(err, "deploy kubelet plugin replicaset")
   247  
   248  	rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{})
   249  	framework.ExpectNoError(err, "get replicaset")
   250  
   251  	// Wait for all pods to be running.
   252  	if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, numNodes); err != nil {
   253  		framework.ExpectNoError(err, "all kubelet plugin proxies running")
   254  	}
   255  	requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{d.Name})
   256  	framework.ExpectNoError(err, "create label selector requirement")
   257  	selector := labels.NewSelector().Add(*requirement)
   258  	pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
   259  	framework.ExpectNoError(err, "list proxy pods")
   260  	gomega.Expect(numNodes).To(gomega.Equal(int32(len(pods.Items))), "number of proxy pods")
   261  
   262  	// Run registar and plugin for each of the pods.
   263  	for _, pod := range pods.Items {
   264  		// Need a local variable, not the loop variable, for the anonymous
   265  		// callback functions below.
   266  		pod := pod
   267  		nodename := pod.Spec.NodeName
   268  		logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
   269  		loggerCtx := klog.NewContext(ctx, logger)
   270  		plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, nodename,
   271  			app.FileOperations{
   272  				Create: func(name string, content []byte) error {
   273  					klog.Background().Info("creating CDI file", "node", nodename, "filename", name, "content", string(content))
   274  					return d.createFile(&pod, name, content)
   275  				},
   276  				Remove: func(name string) error {
   277  					klog.Background().Info("deleting CDI file", "node", nodename, "filename", name)
   278  					return d.removeFile(&pod, name)
   279  				},
   280  				NumResourceInstances: numResourceInstances,
   281  			},
   282  			kubeletplugin.GRPCVerbosity(0),
   283  			kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
   284  				return d.interceptor(nodename, ctx, req, info, handler)
   285  			}),
   286  			kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
   287  				return d.streamInterceptor(nodename, srv, ss, info, handler)
   288  			}),
   289  			kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)),
   290  			kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)),
   291  			kubeletplugin.KubeletPluginSocketPath(draAddr),
   292  			kubeletplugin.NodeV1alpha2(d.NodeV1alpha2),
   293  			kubeletplugin.NodeV1alpha3(d.NodeV1alpha3),
   294  		)
   295  		framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
   296  		d.cleanup = append(d.cleanup, func() {
   297  			// Depends on cancel being called first.
   298  			plugin.Stop()
   299  		})
   300  		d.Nodes[nodename] = plugin
   301  	}
   302  
   303  	// Wait for registration.
   304  	ginkgo.By("wait for plugin registration")
   305  	gomega.Eventually(func() map[string][]app.GRPCCall {
   306  		notRegistered := make(map[string][]app.GRPCCall)
   307  		for nodename, plugin := range d.Nodes {
   308  			calls := plugin.GetGRPCCalls()
   309  			if contains, err := app.BeRegistered.Match(calls); err != nil || !contains {
   310  				notRegistered[nodename] = calls
   311  			}
   312  		}
   313  		return notRegistered
   314  	}).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "hosts where the plugin has not been registered yet")
   315  }
   316  
   317  func (d *Driver) createFile(pod *v1.Pod, name string, content []byte) error {
   318  	buffer := bytes.NewBuffer(content)
   319  	// Writing the content can be slow. Better create a temporary file and
   320  	// move it to the final destination once it is complete.
   321  	tmpName := name + ".tmp"
   322  	if err := d.podIO(pod).CreateFile(tmpName, buffer); err != nil {
   323  		_ = d.podIO(pod).RemoveAll(tmpName)
   324  		return err
   325  	}
   326  	return d.podIO(pod).Rename(tmpName, name)
   327  }
   328  
   329  func (d *Driver) removeFile(pod *v1.Pod, name string) error {
   330  	return d.podIO(pod).RemoveAll(name)
   331  }
   332  
   333  func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO {
   334  	logger := klog.Background()
   335  	return proxy.PodDirIO{
   336  		F:             d.f,
   337  		Namespace:     pod.Namespace,
   338  		PodName:       pod.Name,
   339  		ContainerName: "plugin",
   340  		Logger:        &logger,
   341  	}
   342  }
   343  
   344  func listen(ctx context.Context, f *framework.Framework, podName, containerName string, port int) net.Listener {
   345  	addr := proxy.Addr{
   346  		Namespace:     f.Namespace.Name,
   347  		PodName:       podName,
   348  		ContainerName: containerName,
   349  		Port:          port,
   350  	}
   351  	listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr)
   352  	framework.ExpectNoError(err, "listen for connections from %+v", addr)
   353  	return listener
   354  }
   355  
   356  func (d *Driver) TearDown() {
   357  	for _, c := range d.cleanup {
   358  		c()
   359  	}
   360  	d.cleanup = nil
   361  	d.wg.Wait()
   362  }
   363  
   364  func (d *Driver) IsGone(ctx context.Context) {
   365  	gomega.Eventually(ctx, func(ctx context.Context) ([]resourcev1alpha2.ResourceSlice, error) {
   366  		slices, err := d.f.ClientSet.ResourceV1alpha2().ResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: "driverName=" + d.Name})
   367  		if err != nil {
   368  			return nil, err
   369  		}
   370  		return slices.Items, err
   371  	}).Should(gomega.BeEmpty())
   372  }
   373  
   374  func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
   375  	d.mutex.Lock()
   376  	defer d.mutex.Unlock()
   377  
   378  	m := MethodInstance{nodename, info.FullMethod}
   379  	d.callCounts[m]++
   380  	if d.fail[m] {
   381  		return nil, errors.New("injected error")
   382  	}
   383  
   384  	return handler(ctx, req)
   385  }
   386  
   387  func (d *Driver) streamInterceptor(nodename string, srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
   388  	// Stream calls block for a long time. We must not hold the lock while
   389  	// they are running.
   390  	d.mutex.Lock()
   391  	m := MethodInstance{nodename, info.FullMethod}
   392  	d.callCounts[m]++
   393  	fail := d.fail[m]
   394  	d.mutex.Unlock()
   395  
   396  	if fail {
   397  		return errors.New("injected error")
   398  	}
   399  
   400  	return handler(srv, stream)
   401  }
   402  
   403  func (d *Driver) Fail(m MethodInstance, injectError bool) {
   404  	d.mutex.Lock()
   405  	defer d.mutex.Unlock()
   406  
   407  	d.fail[m] = injectError
   408  }
   409  
   410  func (d *Driver) CallCount(m MethodInstance) int64 {
   411  	d.mutex.Lock()
   412  	defer d.mutex.Unlock()
   413  
   414  	return d.callCounts[m]
   415  }
   416  
   417  func (d *Driver) Nodenames() (nodenames []string) {
   418  	for nodename := range d.Nodes {
   419  		nodenames = append(nodenames, nodename)
   420  	}
   421  	sort.Strings(nodenames)
   422  	return
   423  }
   424  

View as plain text