...

Source file src/k8s.io/kubernetes/test/utils/runners.go

Documentation: k8s.io/kubernetes/test/utils

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package utils
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math"
    23  	"os"
    24  	"strings"
    25  	"sync"
    26  	"time"
    27  
    28  	apps "k8s.io/api/apps/v1"
    29  	batch "k8s.io/api/batch/v1"
    30  	v1 "k8s.io/api/core/v1"
    31  	storagev1 "k8s.io/api/storage/v1"
    32  	storagev1beta1 "k8s.io/api/storage/v1beta1"
    33  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    34  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    35  	"k8s.io/apimachinery/pkg/api/resource"
    36  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    37  	"k8s.io/apimachinery/pkg/fields"
    38  	"k8s.io/apimachinery/pkg/labels"
    39  	"k8s.io/apimachinery/pkg/runtime/schema"
    40  	"k8s.io/apimachinery/pkg/types"
    41  	"k8s.io/apimachinery/pkg/util/json"
    42  	"k8s.io/apimachinery/pkg/util/sets"
    43  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    44  	"k8s.io/apimachinery/pkg/util/uuid"
    45  	"k8s.io/apimachinery/pkg/util/wait"
    46  	clientset "k8s.io/client-go/kubernetes"
    47  	scaleclient "k8s.io/client-go/scale"
    48  	"k8s.io/client-go/util/workqueue"
    49  	batchinternal "k8s.io/kubernetes/pkg/apis/batch"
    50  	api "k8s.io/kubernetes/pkg/apis/core"
    51  	extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
    52  	"k8s.io/utils/pointer"
    53  
    54  	"k8s.io/klog/v2"
    55  )
    56  
    57  const (
    58  	// String used to mark pod deletion
    59  	nonExist = "NonExist"
    60  )
    61  
    62  func removePtr(replicas *int32) int32 {
    63  	if replicas == nil {
    64  		return 0
    65  	}
    66  	return *replicas
    67  }
    68  
    69  func WaitUntilPodIsScheduled(ctx context.Context, c clientset.Interface, name, namespace string, timeout time.Duration) (*v1.Pod, error) {
    70  	// Wait until it's scheduled
    71  	p, err := c.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{ResourceVersion: "0"})
    72  	if err == nil && p.Spec.NodeName != "" {
    73  		return p, nil
    74  	}
    75  	pollingPeriod := 200 * time.Millisecond
    76  	startTime := time.Now()
    77  	for startTime.Add(timeout).After(time.Now()) && ctx.Err() == nil {
    78  		time.Sleep(pollingPeriod)
    79  		p, err := c.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{ResourceVersion: "0"})
    80  		if err == nil && p.Spec.NodeName != "" {
    81  			return p, nil
    82  		}
    83  	}
    84  	return nil, fmt.Errorf("timed out after %v when waiting for pod %v/%v to start", timeout, namespace, name)
    85  }
    86  
    87  func RunPodAndGetNodeName(ctx context.Context, c clientset.Interface, pod *v1.Pod, timeout time.Duration) (string, error) {
    88  	name := pod.Name
    89  	namespace := pod.Namespace
    90  	if err := CreatePodWithRetries(c, namespace, pod); err != nil {
    91  		return "", err
    92  	}
    93  	p, err := WaitUntilPodIsScheduled(ctx, c, name, namespace, timeout)
    94  	if err != nil {
    95  		return "", err
    96  	}
    97  	return p.Spec.NodeName, nil
    98  }
    99  
   100  type RunObjectConfig interface {
   101  	Run() error
   102  	GetName() string
   103  	GetNamespace() string
   104  	GetKind() schema.GroupKind
   105  	GetClient() clientset.Interface
   106  	GetScalesGetter() scaleclient.ScalesGetter
   107  	SetClient(clientset.Interface)
   108  	SetScalesClient(scaleclient.ScalesGetter)
   109  	GetReplicas() int
   110  	GetLabelValue(string) (string, bool)
   111  	GetGroupResource() schema.GroupResource
   112  	GetGroupVersionResource() schema.GroupVersionResource
   113  }
   114  
   115  type RCConfig struct {
   116  	Affinity                      *v1.Affinity
   117  	Client                        clientset.Interface
   118  	ScalesGetter                  scaleclient.ScalesGetter
   119  	Image                         string
   120  	Command                       []string
   121  	Name                          string
   122  	Namespace                     string
   123  	PollInterval                  time.Duration
   124  	Timeout                       time.Duration
   125  	PodStatusFile                 *os.File
   126  	Replicas                      int
   127  	CpuRequest                    int64 // millicores
   128  	CpuLimit                      int64 // millicores
   129  	MemRequest                    int64 // bytes
   130  	MemLimit                      int64 // bytes
   131  	GpuLimit                      int64 // count
   132  	ReadinessProbe                *v1.Probe
   133  	DNSPolicy                     *v1.DNSPolicy
   134  	PriorityClassName             string
   135  	TerminationGracePeriodSeconds *int64
   136  	Lifecycle                     *v1.Lifecycle
   137  	SchedulerName                 string
   138  
   139  	// Env vars, set the same for every pod.
   140  	Env map[string]string
   141  
   142  	// Extra labels and annotations added to every pod.
   143  	Labels      map[string]string
   144  	Annotations map[string]string
   145  
   146  	// Node selector for pods in the RC.
   147  	NodeSelector map[string]string
   148  
   149  	// Tolerations for pods in the RC.
   150  	Tolerations []v1.Toleration
   151  
   152  	// Ports to declare in the container (map of name to containerPort).
   153  	Ports map[string]int
   154  	// Ports to declare in the container as host and container ports.
   155  	HostPorts map[string]int
   156  
   157  	Volumes      []v1.Volume
   158  	VolumeMounts []v1.VolumeMount
   159  
   160  	// Pointer to a list of pods; if non-nil, will be set to a list of pods
   161  	// created by this RC by RunRC.
   162  	CreatedPods *[]*v1.Pod
   163  
   164  	// Maximum allowable container failures. If exceeded, RunRC returns an error.
   165  	// Defaults to replicas*0.1 if unspecified.
   166  	MaxContainerFailures *int
   167  	// Maximum allowed pod deletions count. If exceeded, RunRC returns an error.
   168  	// Defaults to 0.
   169  	MaxAllowedPodDeletions int
   170  
   171  	// If set to false starting RC will print progress, otherwise only errors will be printed.
   172  	Silent bool
   173  
   174  	// If set this function will be used to print log lines instead of klog.
   175  	LogFunc func(fmt string, args ...interface{})
   176  	// If set those functions will be used to gather data from Nodes - in integration tests where no
   177  	// kubelets are running those variables should be nil.
   178  	NodeDumpFunc      func(ctx context.Context, c clientset.Interface, nodeNames []string, logFunc func(fmt string, args ...interface{}))
   179  	ContainerDumpFunc func(ctx context.Context, c clientset.Interface, ns string, logFunc func(ftm string, args ...interface{}))
   180  
   181  	// Names of the secrets and configmaps to mount.
   182  	SecretNames    []string
   183  	ConfigMapNames []string
   184  
   185  	ServiceAccountTokenProjections int
   186  
   187  	// Additional containers to run in the pod
   188  	AdditionalContainers []v1.Container
   189  
   190  	// Security context for created pods
   191  	SecurityContext *v1.SecurityContext
   192  }
   193  
   194  func (rc *RCConfig) RCConfigLog(fmt string, args ...interface{}) {
   195  	if rc.LogFunc != nil {
   196  		rc.LogFunc(fmt, args...)
   197  	}
   198  	klog.Infof(fmt, args...)
   199  }
   200  
   201  type DeploymentConfig struct {
   202  	RCConfig
   203  }
   204  
   205  type ReplicaSetConfig struct {
   206  	RCConfig
   207  }
   208  
   209  type JobConfig struct {
   210  	RCConfig
   211  }
   212  
   213  // podInfo contains pod information useful for debugging e2e tests.
   214  type podInfo struct {
   215  	oldHostname string
   216  	oldPhase    string
   217  	hostname    string
   218  	phase       string
   219  }
   220  
   221  // PodDiff is a map of pod name to podInfos
   222  type PodDiff map[string]*podInfo
   223  
   224  // Print formats and prints the give PodDiff.
   225  func (p PodDiff) String(ignorePhases sets.String) string {
   226  	ret := ""
   227  	for name, info := range p {
   228  		if ignorePhases.Has(info.phase) {
   229  			continue
   230  		}
   231  		if info.phase == nonExist {
   232  			ret += fmt.Sprintf("Pod %v was deleted, had phase %v and host %v\n", name, info.oldPhase, info.oldHostname)
   233  			continue
   234  		}
   235  		phaseChange, hostChange := false, false
   236  		msg := fmt.Sprintf("Pod %v ", name)
   237  		if info.oldPhase != info.phase {
   238  			phaseChange = true
   239  			if info.oldPhase == nonExist {
   240  				msg += fmt.Sprintf("in phase %v ", info.phase)
   241  			} else {
   242  				msg += fmt.Sprintf("went from phase: %v -> %v ", info.oldPhase, info.phase)
   243  			}
   244  		}
   245  		if info.oldHostname != info.hostname {
   246  			hostChange = true
   247  			if info.oldHostname == nonExist || info.oldHostname == "" {
   248  				msg += fmt.Sprintf("assigned host %v ", info.hostname)
   249  			} else {
   250  				msg += fmt.Sprintf("went from host: %v -> %v ", info.oldHostname, info.hostname)
   251  			}
   252  		}
   253  		if phaseChange || hostChange {
   254  			ret += msg + "\n"
   255  		}
   256  	}
   257  	return ret
   258  }
   259  
   260  // DeletedPods returns a slice of pods that were present at the beginning
   261  // and then disappeared.
   262  func (p PodDiff) DeletedPods() []string {
   263  	var deletedPods []string
   264  	for podName, podInfo := range p {
   265  		if podInfo.hostname == nonExist {
   266  			deletedPods = append(deletedPods, podName)
   267  		}
   268  	}
   269  	return deletedPods
   270  }
   271  
   272  // Diff computes a PodDiff given 2 lists of pods.
   273  func Diff(oldPods []*v1.Pod, curPods []*v1.Pod) PodDiff {
   274  	podInfoMap := PodDiff{}
   275  
   276  	// New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist.
   277  	for _, pod := range curPods {
   278  		podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist}
   279  	}
   280  
   281  	// Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist.
   282  	for _, pod := range oldPods {
   283  		if info, ok := podInfoMap[pod.Name]; ok {
   284  			info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase)
   285  		} else {
   286  			podInfoMap[pod.Name] = &podInfo{hostname: nonExist, phase: nonExist, oldHostname: pod.Spec.NodeName, oldPhase: string(pod.Status.Phase)}
   287  		}
   288  	}
   289  	return podInfoMap
   290  }
   291  
   292  // RunDeployment Launches (and verifies correctness) of a Deployment
   293  // and will wait for all pods it spawns to become "Running".
   294  // It's the caller's responsibility to clean up externally (i.e. use the
   295  // namespace lifecycle for handling Cleanup).
   296  func RunDeployment(ctx context.Context, config DeploymentConfig) error {
   297  	err := config.create()
   298  	if err != nil {
   299  		return err
   300  	}
   301  	return config.start(ctx)
   302  }
   303  
   304  func (config *DeploymentConfig) Run(ctx context.Context) error {
   305  	return RunDeployment(ctx, *config)
   306  }
   307  
   308  func (config *DeploymentConfig) GetKind() schema.GroupKind {
   309  	return extensionsinternal.Kind("Deployment")
   310  }
   311  
   312  func (config *DeploymentConfig) GetGroupResource() schema.GroupResource {
   313  	return extensionsinternal.Resource("deployments")
   314  }
   315  
   316  func (config *DeploymentConfig) GetGroupVersionResource() schema.GroupVersionResource {
   317  	return extensionsinternal.SchemeGroupVersion.WithResource("deployments")
   318  }
   319  
   320  func (config *DeploymentConfig) create() error {
   321  	deployment := &apps.Deployment{
   322  		ObjectMeta: metav1.ObjectMeta{
   323  			Name: config.Name,
   324  		},
   325  		Spec: apps.DeploymentSpec{
   326  			Replicas: pointer.Int32(int32(config.Replicas)),
   327  			Selector: &metav1.LabelSelector{
   328  				MatchLabels: map[string]string{
   329  					"name": config.Name,
   330  				},
   331  			},
   332  			Template: v1.PodTemplateSpec{
   333  				ObjectMeta: metav1.ObjectMeta{
   334  					Labels:      map[string]string{"name": config.Name},
   335  					Annotations: config.Annotations,
   336  				},
   337  				Spec: v1.PodSpec{
   338  					Affinity:                      config.Affinity,
   339  					TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
   340  					Containers: []v1.Container{
   341  						{
   342  							Name:            config.Name,
   343  							Image:           config.Image,
   344  							Command:         config.Command,
   345  							Ports:           []v1.ContainerPort{{ContainerPort: 80}},
   346  							Lifecycle:       config.Lifecycle,
   347  							SecurityContext: config.SecurityContext,
   348  						},
   349  					},
   350  				},
   351  			},
   352  		},
   353  	}
   354  
   355  	if len(config.AdditionalContainers) > 0 {
   356  		deployment.Spec.Template.Spec.Containers = append(deployment.Spec.Template.Spec.Containers, config.AdditionalContainers...)
   357  	}
   358  
   359  	if len(config.SecretNames) > 0 {
   360  		attachSecrets(&deployment.Spec.Template, config.SecretNames)
   361  	}
   362  	if len(config.ConfigMapNames) > 0 {
   363  		attachConfigMaps(&deployment.Spec.Template, config.ConfigMapNames)
   364  	}
   365  
   366  	for i := 0; i < config.ServiceAccountTokenProjections; i++ {
   367  		attachServiceAccountTokenProjection(&deployment.Spec.Template, fmt.Sprintf("tok-%d", i))
   368  	}
   369  
   370  	config.applyTo(&deployment.Spec.Template)
   371  
   372  	if err := CreateDeploymentWithRetries(config.Client, config.Namespace, deployment); err != nil {
   373  		return fmt.Errorf("error creating deployment: %v", err)
   374  	}
   375  	config.RCConfigLog("Created deployment with name: %v, namespace: %v, replica count: %v", deployment.Name, config.Namespace, removePtr(deployment.Spec.Replicas))
   376  	return nil
   377  }
   378  
   379  // RunReplicaSet launches (and verifies correctness) of a ReplicaSet
   380  // and waits until all the pods it launches to reach the "Running" state.
   381  // It's the caller's responsibility to clean up externally (i.e. use the
   382  // namespace lifecycle for handling Cleanup).
   383  func RunReplicaSet(ctx context.Context, config ReplicaSetConfig) error {
   384  	err := config.create()
   385  	if err != nil {
   386  		return err
   387  	}
   388  	return config.start(ctx)
   389  }
   390  
   391  func (config *ReplicaSetConfig) Run(ctx context.Context) error {
   392  	return RunReplicaSet(ctx, *config)
   393  }
   394  
   395  func (config *ReplicaSetConfig) GetKind() schema.GroupKind {
   396  	return extensionsinternal.Kind("ReplicaSet")
   397  }
   398  
   399  func (config *ReplicaSetConfig) GetGroupResource() schema.GroupResource {
   400  	return extensionsinternal.Resource("replicasets")
   401  }
   402  
   403  func (config *ReplicaSetConfig) GetGroupVersionResource() schema.GroupVersionResource {
   404  	return extensionsinternal.SchemeGroupVersion.WithResource("replicasets")
   405  }
   406  
   407  func (config *ReplicaSetConfig) create() error {
   408  	rs := &apps.ReplicaSet{
   409  		ObjectMeta: metav1.ObjectMeta{
   410  			Name: config.Name,
   411  		},
   412  		Spec: apps.ReplicaSetSpec{
   413  			Replicas: pointer.Int32(int32(config.Replicas)),
   414  			Selector: &metav1.LabelSelector{
   415  				MatchLabels: map[string]string{
   416  					"name": config.Name,
   417  				},
   418  			},
   419  			Template: v1.PodTemplateSpec{
   420  				ObjectMeta: metav1.ObjectMeta{
   421  					Labels:      map[string]string{"name": config.Name},
   422  					Annotations: config.Annotations,
   423  				},
   424  				Spec: v1.PodSpec{
   425  					Affinity:                      config.Affinity,
   426  					TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
   427  					Containers: []v1.Container{
   428  						{
   429  							Name:            config.Name,
   430  							Image:           config.Image,
   431  							Command:         config.Command,
   432  							Ports:           []v1.ContainerPort{{ContainerPort: 80}},
   433  							Lifecycle:       config.Lifecycle,
   434  							SecurityContext: config.SecurityContext,
   435  						},
   436  					},
   437  				},
   438  			},
   439  		},
   440  	}
   441  
   442  	if len(config.AdditionalContainers) > 0 {
   443  		rs.Spec.Template.Spec.Containers = append(rs.Spec.Template.Spec.Containers, config.AdditionalContainers...)
   444  	}
   445  
   446  	if len(config.SecretNames) > 0 {
   447  		attachSecrets(&rs.Spec.Template, config.SecretNames)
   448  	}
   449  	if len(config.ConfigMapNames) > 0 {
   450  		attachConfigMaps(&rs.Spec.Template, config.ConfigMapNames)
   451  	}
   452  
   453  	config.applyTo(&rs.Spec.Template)
   454  
   455  	if err := CreateReplicaSetWithRetries(config.Client, config.Namespace, rs); err != nil {
   456  		return fmt.Errorf("error creating replica set: %v", err)
   457  	}
   458  	config.RCConfigLog("Created replica set with name: %v, namespace: %v, replica count: %v", rs.Name, config.Namespace, removePtr(rs.Spec.Replicas))
   459  	return nil
   460  }
   461  
   462  // RunJob baunches (and verifies correctness) of a Job
   463  // and will wait for all pods it spawns to become "Running".
   464  // It's the caller's responsibility to clean up externally (i.e. use the
   465  // namespace lifecycle for handling Cleanup).
   466  func RunJob(ctx context.Context, config JobConfig) error {
   467  	err := config.create()
   468  	if err != nil {
   469  		return err
   470  	}
   471  	return config.start(ctx)
   472  }
   473  
   474  func (config *JobConfig) Run(ctx context.Context) error {
   475  	return RunJob(ctx, *config)
   476  }
   477  
   478  func (config *JobConfig) GetKind() schema.GroupKind {
   479  	return batchinternal.Kind("Job")
   480  }
   481  
   482  func (config *JobConfig) GetGroupResource() schema.GroupResource {
   483  	return batchinternal.Resource("jobs")
   484  }
   485  
   486  func (config *JobConfig) GetGroupVersionResource() schema.GroupVersionResource {
   487  	return batchinternal.SchemeGroupVersion.WithResource("jobs")
   488  }
   489  
   490  func (config *JobConfig) create() error {
   491  	job := &batch.Job{
   492  		ObjectMeta: metav1.ObjectMeta{
   493  			Name: config.Name,
   494  		},
   495  		Spec: batch.JobSpec{
   496  			Parallelism: pointer.Int32(int32(config.Replicas)),
   497  			Completions: pointer.Int32(int32(config.Replicas)),
   498  			Template: v1.PodTemplateSpec{
   499  				ObjectMeta: metav1.ObjectMeta{
   500  					Labels:      map[string]string{"name": config.Name},
   501  					Annotations: config.Annotations,
   502  				},
   503  				Spec: v1.PodSpec{
   504  					Affinity:                      config.Affinity,
   505  					TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
   506  					Containers: []v1.Container{
   507  						{
   508  							Name:            config.Name,
   509  							Image:           config.Image,
   510  							Command:         config.Command,
   511  							Lifecycle:       config.Lifecycle,
   512  							SecurityContext: config.SecurityContext,
   513  						},
   514  					},
   515  					RestartPolicy: v1.RestartPolicyOnFailure,
   516  				},
   517  			},
   518  		},
   519  	}
   520  
   521  	if len(config.SecretNames) > 0 {
   522  		attachSecrets(&job.Spec.Template, config.SecretNames)
   523  	}
   524  	if len(config.ConfigMapNames) > 0 {
   525  		attachConfigMaps(&job.Spec.Template, config.ConfigMapNames)
   526  	}
   527  
   528  	config.applyTo(&job.Spec.Template)
   529  
   530  	if err := CreateJobWithRetries(config.Client, config.Namespace, job); err != nil {
   531  		return fmt.Errorf("error creating job: %v", err)
   532  	}
   533  	config.RCConfigLog("Created job with name: %v, namespace: %v, parallelism/completions: %v", job.Name, config.Namespace, job.Spec.Parallelism)
   534  	return nil
   535  }
   536  
   537  // RunRC Launches (and verifies correctness) of a Replication Controller
   538  // and will wait for all pods it spawns to become "Running".
   539  // It's the caller's responsibility to clean up externally (i.e. use the
   540  // namespace lifecycle for handling Cleanup).
   541  func RunRC(ctx context.Context, config RCConfig) error {
   542  	err := config.create()
   543  	if err != nil {
   544  		return err
   545  	}
   546  	return config.start(ctx)
   547  }
   548  
   549  func (config *RCConfig) Run(ctx context.Context) error {
   550  	return RunRC(ctx, *config)
   551  }
   552  
   553  func (config *RCConfig) GetName() string {
   554  	return config.Name
   555  }
   556  
   557  func (config *RCConfig) GetNamespace() string {
   558  	return config.Namespace
   559  }
   560  
   561  func (config *RCConfig) GetKind() schema.GroupKind {
   562  	return api.Kind("ReplicationController")
   563  }
   564  
   565  func (config *RCConfig) GetGroupResource() schema.GroupResource {
   566  	return api.Resource("replicationcontrollers")
   567  }
   568  
   569  func (config *RCConfig) GetGroupVersionResource() schema.GroupVersionResource {
   570  	return api.SchemeGroupVersion.WithResource("replicationcontrollers")
   571  }
   572  
   573  func (config *RCConfig) GetClient() clientset.Interface {
   574  	return config.Client
   575  }
   576  
   577  func (config *RCConfig) GetScalesGetter() scaleclient.ScalesGetter {
   578  	return config.ScalesGetter
   579  }
   580  
   581  func (config *RCConfig) SetClient(c clientset.Interface) {
   582  	config.Client = c
   583  }
   584  
   585  func (config *RCConfig) SetScalesClient(getter scaleclient.ScalesGetter) {
   586  	config.ScalesGetter = getter
   587  }
   588  
   589  func (config *RCConfig) GetReplicas() int {
   590  	return config.Replicas
   591  }
   592  
   593  func (config *RCConfig) GetLabelValue(key string) (string, bool) {
   594  	value, found := config.Labels[key]
   595  	return value, found
   596  }
   597  
   598  func (config *RCConfig) create() error {
   599  	dnsDefault := v1.DNSDefault
   600  	if config.DNSPolicy == nil {
   601  		config.DNSPolicy = &dnsDefault
   602  	}
   603  	one := int64(1)
   604  	rc := &v1.ReplicationController{
   605  		ObjectMeta: metav1.ObjectMeta{
   606  			Name: config.Name,
   607  		},
   608  		Spec: v1.ReplicationControllerSpec{
   609  			Replicas: pointer.Int32(int32(config.Replicas)),
   610  			Selector: map[string]string{
   611  				"name": config.Name,
   612  			},
   613  			Template: &v1.PodTemplateSpec{
   614  				ObjectMeta: metav1.ObjectMeta{
   615  					Labels:      map[string]string{"name": config.Name},
   616  					Annotations: config.Annotations,
   617  				},
   618  				Spec: v1.PodSpec{
   619  					SchedulerName: config.SchedulerName,
   620  					Affinity:      config.Affinity,
   621  					Containers: []v1.Container{
   622  						{
   623  							Name:            config.Name,
   624  							Image:           config.Image,
   625  							Command:         config.Command,
   626  							Ports:           []v1.ContainerPort{{ContainerPort: 80}},
   627  							ReadinessProbe:  config.ReadinessProbe,
   628  							Lifecycle:       config.Lifecycle,
   629  							SecurityContext: config.SecurityContext,
   630  						},
   631  					},
   632  					DNSPolicy:                     *config.DNSPolicy,
   633  					NodeSelector:                  config.NodeSelector,
   634  					Tolerations:                   config.Tolerations,
   635  					TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(&one),
   636  					PriorityClassName:             config.PriorityClassName,
   637  				},
   638  			},
   639  		},
   640  	}
   641  
   642  	if len(config.AdditionalContainers) > 0 {
   643  		rc.Spec.Template.Spec.Containers = append(rc.Spec.Template.Spec.Containers, config.AdditionalContainers...)
   644  	}
   645  
   646  	if len(config.SecretNames) > 0 {
   647  		attachSecrets(rc.Spec.Template, config.SecretNames)
   648  	}
   649  	if len(config.ConfigMapNames) > 0 {
   650  		attachConfigMaps(rc.Spec.Template, config.ConfigMapNames)
   651  	}
   652  
   653  	config.applyTo(rc.Spec.Template)
   654  
   655  	if err := CreateRCWithRetries(config.Client, config.Namespace, rc); err != nil {
   656  		return fmt.Errorf("error creating replication controller: %v", err)
   657  	}
   658  	config.RCConfigLog("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, config.Namespace, removePtr(rc.Spec.Replicas))
   659  	return nil
   660  }
   661  
   662  func (config *RCConfig) applyTo(template *v1.PodTemplateSpec) {
   663  	for k, v := range config.Env {
   664  		c := &template.Spec.Containers[0]
   665  		c.Env = append(c.Env, v1.EnvVar{Name: k, Value: v})
   666  	}
   667  	for k, v := range config.Labels {
   668  		template.ObjectMeta.Labels[k] = v
   669  	}
   670  	template.Spec.NodeSelector = make(map[string]string)
   671  	for k, v := range config.NodeSelector {
   672  		template.Spec.NodeSelector[k] = v
   673  	}
   674  	if config.Tolerations != nil {
   675  		template.Spec.Tolerations = append([]v1.Toleration{}, config.Tolerations...)
   676  	}
   677  	for k, v := range config.Ports {
   678  		c := &template.Spec.Containers[0]
   679  		c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v)})
   680  	}
   681  	for k, v := range config.HostPorts {
   682  		c := &template.Spec.Containers[0]
   683  		c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v), HostPort: int32(v)})
   684  	}
   685  	if config.CpuLimit > 0 || config.MemLimit > 0 || config.GpuLimit > 0 {
   686  		template.Spec.Containers[0].Resources.Limits = v1.ResourceList{}
   687  	}
   688  	if config.CpuLimit > 0 {
   689  		template.Spec.Containers[0].Resources.Limits[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI)
   690  	}
   691  	if config.MemLimit > 0 {
   692  		template.Spec.Containers[0].Resources.Limits[v1.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI)
   693  	}
   694  	if config.CpuRequest > 0 || config.MemRequest > 0 {
   695  		template.Spec.Containers[0].Resources.Requests = v1.ResourceList{}
   696  	}
   697  	if config.CpuRequest > 0 {
   698  		template.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI)
   699  	}
   700  	if config.MemRequest > 0 {
   701  		template.Spec.Containers[0].Resources.Requests[v1.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI)
   702  	}
   703  	if config.GpuLimit > 0 {
   704  		template.Spec.Containers[0].Resources.Limits["nvidia.com/gpu"] = *resource.NewQuantity(config.GpuLimit, resource.DecimalSI)
   705  	}
   706  	if config.Lifecycle != nil {
   707  		template.Spec.Containers[0].Lifecycle = config.Lifecycle
   708  	}
   709  	if len(config.Volumes) > 0 {
   710  		template.Spec.Volumes = config.Volumes
   711  	}
   712  	if len(config.VolumeMounts) > 0 {
   713  		template.Spec.Containers[0].VolumeMounts = config.VolumeMounts
   714  	}
   715  	if config.PriorityClassName != "" {
   716  		template.Spec.PriorityClassName = config.PriorityClassName
   717  	}
   718  }
   719  
   720  type RCStartupStatus struct {
   721  	Expected              int
   722  	Terminating           int
   723  	Running               int
   724  	RunningButNotReady    int
   725  	Waiting               int
   726  	Pending               int
   727  	Scheduled             int
   728  	Unknown               int
   729  	Inactive              int
   730  	FailedContainers      int
   731  	Created               []*v1.Pod
   732  	ContainerRestartNodes sets.String
   733  }
   734  
   735  func (s *RCStartupStatus) String(name string) string {
   736  	return fmt.Sprintf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d terminating, %d unknown, %d runningButNotReady ",
   737  		name, len(s.Created), s.Expected, s.Running, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady)
   738  }
   739  
   740  func ComputeRCStartupStatus(pods []*v1.Pod, expected int) RCStartupStatus {
   741  	startupStatus := RCStartupStatus{
   742  		Expected:              expected,
   743  		Created:               make([]*v1.Pod, 0, expected),
   744  		ContainerRestartNodes: sets.NewString(),
   745  	}
   746  	for _, p := range pods {
   747  		if p.DeletionTimestamp != nil {
   748  			startupStatus.Terminating++
   749  			continue
   750  		}
   751  		startupStatus.Created = append(startupStatus.Created, p)
   752  		if p.Status.Phase == v1.PodRunning {
   753  			ready := false
   754  			for _, c := range p.Status.Conditions {
   755  				if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
   756  					ready = true
   757  					break
   758  				}
   759  			}
   760  			if ready {
   761  				// Only count a pod is running when it is also ready.
   762  				startupStatus.Running++
   763  			} else {
   764  				startupStatus.RunningButNotReady++
   765  			}
   766  			for _, v := range FailedContainers(p) {
   767  				startupStatus.FailedContainers = startupStatus.FailedContainers + v.Restarts
   768  				startupStatus.ContainerRestartNodes.Insert(p.Spec.NodeName)
   769  			}
   770  		} else if p.Status.Phase == v1.PodPending {
   771  			if p.Spec.NodeName == "" {
   772  				startupStatus.Waiting++
   773  			} else {
   774  				startupStatus.Pending++
   775  			}
   776  		} else if p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed {
   777  			startupStatus.Inactive++
   778  		} else if p.Status.Phase == v1.PodUnknown {
   779  			startupStatus.Unknown++
   780  		}
   781  		// Record count of scheduled pods (useful for computing scheduler throughput).
   782  		if p.Spec.NodeName != "" {
   783  			startupStatus.Scheduled++
   784  		}
   785  	}
   786  	return startupStatus
   787  }
   788  
   789  func (config *RCConfig) start(ctx context.Context) error {
   790  	// Don't force tests to fail if they don't care about containers restarting.
   791  	var maxContainerFailures int
   792  	if config.MaxContainerFailures == nil {
   793  		maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01))
   794  	} else {
   795  		maxContainerFailures = *config.MaxContainerFailures
   796  	}
   797  
   798  	label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
   799  
   800  	ps, err := NewPodStore(config.Client, config.Namespace, label, fields.Everything())
   801  	if err != nil {
   802  		return err
   803  	}
   804  	defer ps.Stop()
   805  
   806  	interval := config.PollInterval
   807  	if interval <= 0 {
   808  		interval = 10 * time.Second
   809  	}
   810  	timeout := config.Timeout
   811  	if timeout <= 0 {
   812  		timeout = 5 * time.Minute
   813  	}
   814  	oldPods := make([]*v1.Pod, 0)
   815  	oldRunning := 0
   816  	lastChange := time.Now()
   817  	podDeletionsCount := 0
   818  	for oldRunning != config.Replicas {
   819  		time.Sleep(interval)
   820  
   821  		pods := ps.List()
   822  		startupStatus := ComputeRCStartupStatus(pods, config.Replicas)
   823  
   824  		if config.CreatedPods != nil {
   825  			*config.CreatedPods = startupStatus.Created
   826  		}
   827  		if !config.Silent {
   828  			config.RCConfigLog(startupStatus.String(config.Name))
   829  		}
   830  
   831  		if config.PodStatusFile != nil {
   832  			fmt.Fprintf(config.PodStatusFile, "%d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown, %d, runningButNotReady\n", startupStatus.Running, startupStatus.Pending, startupStatus.Waiting, startupStatus.Inactive, startupStatus.Unknown, startupStatus.RunningButNotReady)
   833  		}
   834  
   835  		if startupStatus.FailedContainers > maxContainerFailures {
   836  			if config.NodeDumpFunc != nil {
   837  				config.NodeDumpFunc(ctx, config.Client, startupStatus.ContainerRestartNodes.List(), config.RCConfigLog)
   838  			}
   839  			if config.ContainerDumpFunc != nil {
   840  				// Get the logs from the failed containers to help diagnose what caused them to fail
   841  				config.ContainerDumpFunc(ctx, config.Client, config.Namespace, config.RCConfigLog)
   842  			}
   843  			return fmt.Errorf("%d containers failed which is more than allowed %d", startupStatus.FailedContainers, maxContainerFailures)
   844  		}
   845  
   846  		diff := Diff(oldPods, pods)
   847  		deletedPods := diff.DeletedPods()
   848  		podDeletionsCount += len(deletedPods)
   849  		if podDeletionsCount > config.MaxAllowedPodDeletions {
   850  			// Number of pods which disappeared is over threshold
   851  			err := fmt.Errorf("%d pods disappeared for %s: %v", podDeletionsCount, config.Name, strings.Join(deletedPods, ", "))
   852  			config.RCConfigLog(err.Error())
   853  			config.RCConfigLog(diff.String(sets.NewString()))
   854  			return err
   855  		}
   856  
   857  		if len(pods) > len(oldPods) || startupStatus.Running > oldRunning {
   858  			lastChange = time.Now()
   859  		}
   860  		oldPods = pods
   861  		oldRunning = startupStatus.Running
   862  
   863  		if time.Since(lastChange) > timeout {
   864  			break
   865  		}
   866  	}
   867  
   868  	if oldRunning != config.Replicas {
   869  		// List only pods from a given replication controller.
   870  		options := metav1.ListOptions{LabelSelector: label.String()}
   871  		if pods, err := config.Client.CoreV1().Pods(config.Namespace).List(ctx, options); err == nil {
   872  			for _, pod := range pods.Items {
   873  				config.RCConfigLog("Pod %s\t%s\t%s\t%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, pod.DeletionTimestamp)
   874  			}
   875  		} else {
   876  			config.RCConfigLog("Can't list pod debug info: %v", err)
   877  		}
   878  		return fmt.Errorf("only %d pods started out of %d", oldRunning, config.Replicas)
   879  	}
   880  	return nil
   881  }
   882  
   883  // Simplified version of RunRC, that does not create RC, but creates plain Pods.
   884  // Optionally waits for pods to start running (if waitForRunning == true).
   885  // The number of replicas must be non-zero.
   886  func StartPods(c clientset.Interface, replicas int, namespace string, podNamePrefix string,
   887  	pod v1.Pod, waitForRunning bool, logFunc func(fmt string, args ...interface{})) error {
   888  	// no pod to start
   889  	if replicas < 1 {
   890  		panic("StartPods: number of replicas must be non-zero")
   891  	}
   892  	startPodsID := string(uuid.NewUUID()) // So that we can label and find them
   893  	for i := 0; i < replicas; i++ {
   894  		podName := fmt.Sprintf("%v-%v", podNamePrefix, i)
   895  		pod.ObjectMeta.Name = podName
   896  		pod.ObjectMeta.Labels["name"] = podName
   897  		pod.ObjectMeta.Labels["startPodsID"] = startPodsID
   898  		pod.Spec.Containers[0].Name = podName
   899  		if err := CreatePodWithRetries(c, namespace, &pod); err != nil {
   900  			return err
   901  		}
   902  	}
   903  	logFunc("Waiting for running...")
   904  	if waitForRunning {
   905  		label := labels.SelectorFromSet(labels.Set(map[string]string{"startPodsID": startPodsID}))
   906  		err := WaitForPodsWithLabelRunning(c, namespace, label)
   907  		if err != nil {
   908  			return fmt.Errorf("error waiting for %d pods to be running - probably a timeout: %v", replicas, err)
   909  		}
   910  	}
   911  	return nil
   912  }
   913  
   914  // Wait up to 10 minutes for all matching pods to become Running and at least one
   915  // matching pod exists.
   916  func WaitForPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector) error {
   917  	return WaitForEnoughPodsWithLabelRunning(c, ns, label, -1)
   918  }
   919  
   920  // Wait up to 10 minutes for at least 'replicas' many pods to be Running and at least
   921  // one matching pod exists. If 'replicas' is < 0, wait for all matching pods running.
   922  func WaitForEnoughPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector, replicas int) error {
   923  	running := false
   924  	ps, err := NewPodStore(c, ns, label, fields.Everything())
   925  	if err != nil {
   926  		return err
   927  	}
   928  	defer ps.Stop()
   929  
   930  	for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
   931  		pods := ps.List()
   932  		if len(pods) == 0 {
   933  			continue
   934  		}
   935  		runningPodsCount := 0
   936  		for _, p := range pods {
   937  			if p.Status.Phase == v1.PodRunning {
   938  				runningPodsCount++
   939  			}
   940  		}
   941  		if (replicas < 0 && runningPodsCount < len(pods)) || (runningPodsCount < replicas) {
   942  			continue
   943  		}
   944  		running = true
   945  		break
   946  	}
   947  	if !running {
   948  		return fmt.Errorf("timeout while waiting for pods with labels %q to be running", label.String())
   949  	}
   950  	return nil
   951  }
   952  
   953  type CountToStrategy struct {
   954  	Count    int
   955  	Strategy PrepareNodeStrategy
   956  }
   957  
   958  type TestNodePreparer interface {
   959  	PrepareNodes(ctx context.Context, nextNodeIndex int) error
   960  	CleanupNodes(ctx context.Context) error
   961  }
   962  
   963  type PrepareNodeStrategy interface {
   964  	// Modify pre-created Node objects before the test starts.
   965  	PreparePatch(node *v1.Node) []byte
   966  	// Create or modify any objects that depend on the node before the test starts.
   967  	// Caller will re-try when http.StatusConflict error is returned.
   968  	PrepareDependentObjects(ctx context.Context, node *v1.Node, client clientset.Interface) error
   969  	// Clean up any node modifications after the test finishes.
   970  	CleanupNode(ctx context.Context, node *v1.Node) *v1.Node
   971  	// Clean up any objects that depend on the node after the test finishes.
   972  	// Caller will re-try when http.StatusConflict error is returned.
   973  	CleanupDependentObjects(ctx context.Context, nodeName string, client clientset.Interface) error
   974  }
   975  
   976  type TrivialNodePrepareStrategy struct{}
   977  
   978  var _ PrepareNodeStrategy = &TrivialNodePrepareStrategy{}
   979  
   980  func (*TrivialNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
   981  	return []byte{}
   982  }
   983  
   984  func (*TrivialNodePrepareStrategy) CleanupNode(ctx context.Context, node *v1.Node) *v1.Node {
   985  	nodeCopy := *node
   986  	return &nodeCopy
   987  }
   988  
   989  func (*TrivialNodePrepareStrategy) PrepareDependentObjects(ctx context.Context, node *v1.Node, client clientset.Interface) error {
   990  	return nil
   991  }
   992  
   993  func (*TrivialNodePrepareStrategy) CleanupDependentObjects(ctx context.Context, nodeName string, client clientset.Interface) error {
   994  	return nil
   995  }
   996  
   997  type LabelNodePrepareStrategy struct {
   998  	LabelKey      string
   999  	LabelValues   []string
  1000  	roundRobinIdx int
  1001  }
  1002  
  1003  var _ PrepareNodeStrategy = &LabelNodePrepareStrategy{}
  1004  
  1005  func NewLabelNodePrepareStrategy(labelKey string, labelValues ...string) *LabelNodePrepareStrategy {
  1006  	return &LabelNodePrepareStrategy{
  1007  		LabelKey:    labelKey,
  1008  		LabelValues: labelValues,
  1009  	}
  1010  }
  1011  
  1012  func (s *LabelNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
  1013  	labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.LabelKey, s.LabelValues[s.roundRobinIdx])
  1014  	patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
  1015  	s.roundRobinIdx++
  1016  	if s.roundRobinIdx == len(s.LabelValues) {
  1017  		s.roundRobinIdx = 0
  1018  	}
  1019  	return []byte(patch)
  1020  }
  1021  
  1022  func (s *LabelNodePrepareStrategy) CleanupNode(ctx context.Context, node *v1.Node) *v1.Node {
  1023  	nodeCopy := node.DeepCopy()
  1024  	if node.Labels != nil && len(node.Labels[s.LabelKey]) != 0 {
  1025  		delete(nodeCopy.Labels, s.LabelKey)
  1026  	}
  1027  	return nodeCopy
  1028  }
  1029  
  1030  func (*LabelNodePrepareStrategy) PrepareDependentObjects(ctx context.Context, node *v1.Node, client clientset.Interface) error {
  1031  	return nil
  1032  }
  1033  
  1034  func (*LabelNodePrepareStrategy) CleanupDependentObjects(ctx context.Context, nodeName string, client clientset.Interface) error {
  1035  	return nil
  1036  }
  1037  
  1038  // NodeAllocatableStrategy fills node.status.allocatable and csiNode.spec.drivers[*].allocatable.
  1039  // csiNode is created if it does not exist. On cleanup, any csiNode.spec.drivers[*].allocatable is
  1040  // set to nil.
  1041  type NodeAllocatableStrategy struct {
  1042  	// Node.status.allocatable to fill to all nodes.
  1043  	NodeAllocatable map[v1.ResourceName]string
  1044  	// Map <driver_name> -> VolumeNodeResources to fill into csiNode.spec.drivers[<driver_name>].
  1045  	CsiNodeAllocatable map[string]*storagev1.VolumeNodeResources
  1046  	// List of in-tree volume plugins migrated to CSI.
  1047  	MigratedPlugins []string
  1048  }
  1049  
  1050  var _ PrepareNodeStrategy = &NodeAllocatableStrategy{}
  1051  
  1052  func NewNodeAllocatableStrategy(nodeAllocatable map[v1.ResourceName]string, csiNodeAllocatable map[string]*storagev1.VolumeNodeResources, migratedPlugins []string) *NodeAllocatableStrategy {
  1053  	return &NodeAllocatableStrategy{
  1054  		NodeAllocatable:    nodeAllocatable,
  1055  		CsiNodeAllocatable: csiNodeAllocatable,
  1056  		MigratedPlugins:    migratedPlugins,
  1057  	}
  1058  }
  1059  
  1060  func (s *NodeAllocatableStrategy) PreparePatch(node *v1.Node) []byte {
  1061  	newNode := node.DeepCopy()
  1062  	for name, value := range s.NodeAllocatable {
  1063  		newNode.Status.Allocatable[name] = resource.MustParse(value)
  1064  	}
  1065  
  1066  	oldJSON, err := json.Marshal(node)
  1067  	if err != nil {
  1068  		panic(err)
  1069  	}
  1070  	newJSON, err := json.Marshal(newNode)
  1071  	if err != nil {
  1072  		panic(err)
  1073  	}
  1074  
  1075  	patch, err := strategicpatch.CreateTwoWayMergePatch(oldJSON, newJSON, v1.Node{})
  1076  	if err != nil {
  1077  		panic(err)
  1078  	}
  1079  	return patch
  1080  }
  1081  
  1082  func (s *NodeAllocatableStrategy) CleanupNode(ctx context.Context, node *v1.Node) *v1.Node {
  1083  	nodeCopy := node.DeepCopy()
  1084  	for name := range s.NodeAllocatable {
  1085  		delete(nodeCopy.Status.Allocatable, name)
  1086  	}
  1087  	return nodeCopy
  1088  }
  1089  
  1090  func (s *NodeAllocatableStrategy) createCSINode(ctx context.Context, nodeName string, client clientset.Interface) error {
  1091  	csiNode := &storagev1.CSINode{
  1092  		ObjectMeta: metav1.ObjectMeta{
  1093  			Name: nodeName,
  1094  			Annotations: map[string]string{
  1095  				v1.MigratedPluginsAnnotationKey: strings.Join(s.MigratedPlugins, ","),
  1096  			},
  1097  		},
  1098  		Spec: storagev1.CSINodeSpec{
  1099  			Drivers: []storagev1.CSINodeDriver{},
  1100  		},
  1101  	}
  1102  
  1103  	for driver, allocatable := range s.CsiNodeAllocatable {
  1104  		d := storagev1.CSINodeDriver{
  1105  			Name:        driver,
  1106  			Allocatable: allocatable,
  1107  			NodeID:      nodeName,
  1108  		}
  1109  		csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
  1110  	}
  1111  
  1112  	_, err := client.StorageV1().CSINodes().Create(ctx, csiNode, metav1.CreateOptions{})
  1113  	if apierrors.IsAlreadyExists(err) {
  1114  		// Something created CSINode instance after we checked it did not exist.
  1115  		// Make the caller to re-try PrepareDependentObjects by returning Conflict error
  1116  		err = apierrors.NewConflict(storagev1beta1.Resource("csinodes"), nodeName, err)
  1117  	}
  1118  	return err
  1119  }
  1120  
  1121  func (s *NodeAllocatableStrategy) updateCSINode(ctx context.Context, csiNode *storagev1.CSINode, client clientset.Interface) error {
  1122  	for driverName, allocatable := range s.CsiNodeAllocatable {
  1123  		found := false
  1124  		for i, driver := range csiNode.Spec.Drivers {
  1125  			if driver.Name == driverName {
  1126  				found = true
  1127  				csiNode.Spec.Drivers[i].Allocatable = allocatable
  1128  				break
  1129  			}
  1130  		}
  1131  		if !found {
  1132  			d := storagev1.CSINodeDriver{
  1133  				Name:        driverName,
  1134  				Allocatable: allocatable,
  1135  			}
  1136  
  1137  			csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
  1138  		}
  1139  	}
  1140  	csiNode.Annotations[v1.MigratedPluginsAnnotationKey] = strings.Join(s.MigratedPlugins, ",")
  1141  
  1142  	_, err := client.StorageV1().CSINodes().Update(ctx, csiNode, metav1.UpdateOptions{})
  1143  	return err
  1144  }
  1145  
  1146  func (s *NodeAllocatableStrategy) PrepareDependentObjects(ctx context.Context, node *v1.Node, client clientset.Interface) error {
  1147  	csiNode, err := client.StorageV1().CSINodes().Get(ctx, node.Name, metav1.GetOptions{})
  1148  	if err != nil {
  1149  		if apierrors.IsNotFound(err) {
  1150  			return s.createCSINode(ctx, node.Name, client)
  1151  		}
  1152  		return err
  1153  	}
  1154  	return s.updateCSINode(ctx, csiNode, client)
  1155  }
  1156  
  1157  func (s *NodeAllocatableStrategy) CleanupDependentObjects(ctx context.Context, nodeName string, client clientset.Interface) error {
  1158  	csiNode, err := client.StorageV1().CSINodes().Get(ctx, nodeName, metav1.GetOptions{})
  1159  	if err != nil {
  1160  		if apierrors.IsNotFound(err) {
  1161  			return nil
  1162  		}
  1163  		return err
  1164  	}
  1165  
  1166  	for driverName := range s.CsiNodeAllocatable {
  1167  		for i, driver := range csiNode.Spec.Drivers {
  1168  			if driver.Name == driverName {
  1169  				csiNode.Spec.Drivers[i].Allocatable = nil
  1170  			}
  1171  		}
  1172  	}
  1173  	return s.updateCSINode(ctx, csiNode, client)
  1174  }
  1175  
  1176  // UniqueNodeLabelStrategy sets a unique label for each node.
  1177  type UniqueNodeLabelStrategy struct {
  1178  	LabelKey string
  1179  }
  1180  
  1181  var _ PrepareNodeStrategy = &UniqueNodeLabelStrategy{}
  1182  
  1183  func NewUniqueNodeLabelStrategy(labelKey string) *UniqueNodeLabelStrategy {
  1184  	return &UniqueNodeLabelStrategy{
  1185  		LabelKey: labelKey,
  1186  	}
  1187  }
  1188  
  1189  func (s *UniqueNodeLabelStrategy) PreparePatch(*v1.Node) []byte {
  1190  	labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.LabelKey, string(uuid.NewUUID()))
  1191  	patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
  1192  	return []byte(patch)
  1193  }
  1194  
  1195  func (s *UniqueNodeLabelStrategy) CleanupNode(ctx context.Context, node *v1.Node) *v1.Node {
  1196  	nodeCopy := node.DeepCopy()
  1197  	if node.Labels != nil && len(node.Labels[s.LabelKey]) != 0 {
  1198  		delete(nodeCopy.Labels, s.LabelKey)
  1199  	}
  1200  	return nodeCopy
  1201  }
  1202  
  1203  func (*UniqueNodeLabelStrategy) PrepareDependentObjects(ctx context.Context, node *v1.Node, client clientset.Interface) error {
  1204  	return nil
  1205  }
  1206  
  1207  func (*UniqueNodeLabelStrategy) CleanupDependentObjects(ctx context.Context, nodeName string, client clientset.Interface) error {
  1208  	return nil
  1209  }
  1210  
  1211  func DoPrepareNode(ctx context.Context, client clientset.Interface, node *v1.Node, strategy PrepareNodeStrategy) error {
  1212  	var err error
  1213  	patch := strategy.PreparePatch(node)
  1214  	if len(patch) == 0 {
  1215  		return nil
  1216  	}
  1217  	for attempt := 0; attempt < retries; attempt++ {
  1218  		if _, err = client.CoreV1().Nodes().Patch(ctx, node.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err == nil {
  1219  			break
  1220  		}
  1221  		if !apierrors.IsConflict(err) {
  1222  			return fmt.Errorf("error while applying patch %v to Node %v: %v", string(patch), node.Name, err)
  1223  		}
  1224  		time.Sleep(100 * time.Millisecond)
  1225  	}
  1226  	if err != nil {
  1227  		return fmt.Errorf("too many conflicts when applying patch %v to Node %v: %s", string(patch), node.Name, err)
  1228  	}
  1229  
  1230  	for attempt := 0; attempt < retries; attempt++ {
  1231  		if err = strategy.PrepareDependentObjects(ctx, node, client); err == nil {
  1232  			break
  1233  		}
  1234  		if !apierrors.IsConflict(err) {
  1235  			return fmt.Errorf("error while preparing objects for node %s: %s", node.Name, err)
  1236  		}
  1237  		time.Sleep(100 * time.Millisecond)
  1238  	}
  1239  	if err != nil {
  1240  		return fmt.Errorf("too many conflicts when creating objects for node %s: %s", node.Name, err)
  1241  	}
  1242  	return nil
  1243  }
  1244  
  1245  func DoCleanupNode(ctx context.Context, client clientset.Interface, nodeName string, strategy PrepareNodeStrategy) error {
  1246  	var err error
  1247  	for attempt := 0; attempt < retries; attempt++ {
  1248  		var node *v1.Node
  1249  		node, err = client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
  1250  		if err != nil {
  1251  			return fmt.Errorf("skipping cleanup of Node: failed to get Node %v: %v", nodeName, err)
  1252  		}
  1253  		updatedNode := strategy.CleanupNode(ctx, node)
  1254  		if apiequality.Semantic.DeepEqual(node, updatedNode) {
  1255  			return nil
  1256  		}
  1257  		if _, err = client.CoreV1().Nodes().Update(ctx, updatedNode, metav1.UpdateOptions{}); err == nil {
  1258  			break
  1259  		}
  1260  		if !apierrors.IsConflict(err) {
  1261  			return fmt.Errorf("error when updating Node %v: %v", nodeName, err)
  1262  		}
  1263  		time.Sleep(100 * time.Millisecond)
  1264  	}
  1265  	if err != nil {
  1266  		return fmt.Errorf("too many conflicts when trying to cleanup Node %v: %s", nodeName, err)
  1267  	}
  1268  
  1269  	for attempt := 0; attempt < retries; attempt++ {
  1270  		err = strategy.CleanupDependentObjects(ctx, nodeName, client)
  1271  		if err == nil {
  1272  			break
  1273  		}
  1274  		if !apierrors.IsConflict(err) {
  1275  			return fmt.Errorf("error when cleaning up Node %v objects: %v", nodeName, err)
  1276  		}
  1277  		time.Sleep(100 * time.Millisecond)
  1278  	}
  1279  	if err != nil {
  1280  		return fmt.Errorf("too many conflicts when trying to cleanup Node %v objects: %s", nodeName, err)
  1281  	}
  1282  	return nil
  1283  }
  1284  
  1285  type TestPodCreateStrategy func(ctx context.Context, client clientset.Interface, namespace string, podCount int) error
  1286  
  1287  type CountToPodStrategy struct {
  1288  	Count    int
  1289  	Strategy TestPodCreateStrategy
  1290  }
  1291  
  1292  type TestPodCreatorConfig map[string][]CountToPodStrategy
  1293  
  1294  func NewTestPodCreatorConfig() *TestPodCreatorConfig {
  1295  	config := make(TestPodCreatorConfig)
  1296  	return &config
  1297  }
  1298  
  1299  func (c *TestPodCreatorConfig) AddStrategy(
  1300  	namespace string, podCount int, strategy TestPodCreateStrategy) {
  1301  	(*c)[namespace] = append((*c)[namespace], CountToPodStrategy{Count: podCount, Strategy: strategy})
  1302  }
  1303  
  1304  type TestPodCreator struct {
  1305  	Client clientset.Interface
  1306  	// namespace -> count -> strategy
  1307  	Config *TestPodCreatorConfig
  1308  }
  1309  
  1310  func NewTestPodCreator(client clientset.Interface, config *TestPodCreatorConfig) *TestPodCreator {
  1311  	return &TestPodCreator{
  1312  		Client: client,
  1313  		Config: config,
  1314  	}
  1315  }
  1316  
  1317  func (c *TestPodCreator) CreatePods(ctx context.Context) error {
  1318  	for ns, v := range *(c.Config) {
  1319  		for _, countToStrategy := range v {
  1320  			if err := countToStrategy.Strategy(ctx, c.Client, ns, countToStrategy.Count); err != nil {
  1321  				return err
  1322  			}
  1323  		}
  1324  	}
  1325  	return nil
  1326  }
  1327  
  1328  func MakePodSpec() v1.PodSpec {
  1329  	return v1.PodSpec{
  1330  		Containers: []v1.Container{{
  1331  			Name:  "pause",
  1332  			Image: "registry.k8s.io/pause:3.9",
  1333  			Ports: []v1.ContainerPort{{ContainerPort: 80}},
  1334  			Resources: v1.ResourceRequirements{
  1335  				Limits: v1.ResourceList{
  1336  					v1.ResourceCPU:    resource.MustParse("100m"),
  1337  					v1.ResourceMemory: resource.MustParse("500Mi"),
  1338  				},
  1339  				Requests: v1.ResourceList{
  1340  					v1.ResourceCPU:    resource.MustParse("100m"),
  1341  					v1.ResourceMemory: resource.MustParse("500Mi"),
  1342  				},
  1343  			},
  1344  		}},
  1345  	}
  1346  }
  1347  
  1348  func makeCreatePod(client clientset.Interface, namespace string, podTemplate *v1.Pod) error {
  1349  	if err := CreatePodWithRetries(client, namespace, podTemplate); err != nil {
  1350  		return fmt.Errorf("error creating pod: %v", err)
  1351  	}
  1352  	return nil
  1353  }
  1354  
  1355  func CreatePod(ctx context.Context, client clientset.Interface, namespace string, podCount int, podTemplate *v1.Pod) error {
  1356  	var createError error
  1357  	lock := sync.Mutex{}
  1358  	createPodFunc := func(i int) {
  1359  		// client-go writes into the object that is passed to Create,
  1360  		// causing a data race unless we create a new copy for each
  1361  		// parallel call.
  1362  		if err := makeCreatePod(client, namespace, podTemplate.DeepCopy()); err != nil {
  1363  			lock.Lock()
  1364  			defer lock.Unlock()
  1365  			createError = err
  1366  		}
  1367  	}
  1368  
  1369  	if podCount < 30 {
  1370  		workqueue.ParallelizeUntil(ctx, podCount, podCount, createPodFunc)
  1371  	} else {
  1372  		workqueue.ParallelizeUntil(ctx, 30, podCount, createPodFunc)
  1373  	}
  1374  	return createError
  1375  }
  1376  
  1377  func CreatePodWithPersistentVolume(ctx context.Context, client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int, bindVolume bool) error {
  1378  	var createError error
  1379  	lock := sync.Mutex{}
  1380  	createPodFunc := func(i int) {
  1381  		pvcName := fmt.Sprintf("pvc-%d", i)
  1382  		// pvc
  1383  		pvc := claimTemplate.DeepCopy()
  1384  		pvc.Name = pvcName
  1385  		// pv
  1386  		pv := factory(i)
  1387  		// PVs are cluster-wide resources.
  1388  		// Prepend a namespace to make the name globally unique.
  1389  		pv.Name = fmt.Sprintf("%s-%s", namespace, pv.Name)
  1390  		if bindVolume {
  1391  			// bind pv to "pvc-$i"
  1392  			pv.Spec.ClaimRef = &v1.ObjectReference{
  1393  				Kind:       "PersistentVolumeClaim",
  1394  				Namespace:  namespace,
  1395  				Name:       pvcName,
  1396  				APIVersion: "v1",
  1397  			}
  1398  			pv.Status.Phase = v1.VolumeBound
  1399  
  1400  			// bind pvc to "pv-$i"
  1401  			pvc.Spec.VolumeName = pv.Name
  1402  			pvc.Status.Phase = v1.ClaimBound
  1403  		} else {
  1404  			pv.Status.Phase = v1.VolumeAvailable
  1405  		}
  1406  
  1407  		// Create PVC first as it's referenced by the PV when the `bindVolume` is true.
  1408  		if err := CreatePersistentVolumeClaimWithRetries(client, namespace, pvc); err != nil {
  1409  			lock.Lock()
  1410  			defer lock.Unlock()
  1411  			createError = fmt.Errorf("error creating PVC: %s", err)
  1412  			return
  1413  		}
  1414  
  1415  		// We need to update statuses separately, as creating pv/pvc resets status to the default one.
  1416  		if _, err := client.CoreV1().PersistentVolumeClaims(namespace).UpdateStatus(ctx, pvc, metav1.UpdateOptions{}); err != nil {
  1417  			lock.Lock()
  1418  			defer lock.Unlock()
  1419  			createError = fmt.Errorf("error updating PVC status: %s", err)
  1420  			return
  1421  		}
  1422  
  1423  		if err := CreatePersistentVolumeWithRetries(client, pv); err != nil {
  1424  			lock.Lock()
  1425  			defer lock.Unlock()
  1426  			createError = fmt.Errorf("error creating PV: %s", err)
  1427  			return
  1428  		}
  1429  		// We need to update statuses separately, as creating pv/pvc resets status to the default one.
  1430  		if _, err := client.CoreV1().PersistentVolumes().UpdateStatus(ctx, pv, metav1.UpdateOptions{}); err != nil {
  1431  			lock.Lock()
  1432  			defer lock.Unlock()
  1433  			createError = fmt.Errorf("error updating PV status: %s", err)
  1434  			return
  1435  		}
  1436  
  1437  		// pod
  1438  		pod := podTemplate.DeepCopy()
  1439  		pod.Spec.Volumes = []v1.Volume{
  1440  			{
  1441  				Name: "vol",
  1442  				VolumeSource: v1.VolumeSource{
  1443  					PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  1444  						ClaimName: pvcName,
  1445  					},
  1446  				},
  1447  			},
  1448  		}
  1449  		if err := makeCreatePod(client, namespace, pod); err != nil {
  1450  			lock.Lock()
  1451  			defer lock.Unlock()
  1452  			createError = err
  1453  			return
  1454  		}
  1455  	}
  1456  
  1457  	if count < 30 {
  1458  		workqueue.ParallelizeUntil(ctx, count, count, createPodFunc)
  1459  	} else {
  1460  		workqueue.ParallelizeUntil(ctx, 30, count, createPodFunc)
  1461  	}
  1462  	return createError
  1463  }
  1464  
  1465  func createController(client clientset.Interface, controllerName, namespace string, podCount int, podTemplate *v1.Pod) error {
  1466  	rc := &v1.ReplicationController{
  1467  		ObjectMeta: metav1.ObjectMeta{
  1468  			Name: controllerName,
  1469  		},
  1470  		Spec: v1.ReplicationControllerSpec{
  1471  			Replicas: pointer.Int32(int32(podCount)),
  1472  			Selector: map[string]string{"name": controllerName},
  1473  			Template: &v1.PodTemplateSpec{
  1474  				ObjectMeta: metav1.ObjectMeta{
  1475  					Labels: map[string]string{"name": controllerName},
  1476  				},
  1477  				Spec: podTemplate.Spec,
  1478  			},
  1479  		},
  1480  	}
  1481  	if err := CreateRCWithRetries(client, namespace, rc); err != nil {
  1482  		return fmt.Errorf("error creating replication controller: %v", err)
  1483  	}
  1484  	return nil
  1485  }
  1486  
  1487  func NewCustomCreatePodStrategy(podTemplate *v1.Pod) TestPodCreateStrategy {
  1488  	return func(ctx context.Context, client clientset.Interface, namespace string, podCount int) error {
  1489  		return CreatePod(ctx, client, namespace, podCount, podTemplate)
  1490  	}
  1491  }
  1492  
  1493  // volumeFactory creates an unique PersistentVolume for given integer.
  1494  type volumeFactory func(uniqueID int) *v1.PersistentVolume
  1495  
  1496  func NewCreatePodWithPersistentVolumeStrategy(claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
  1497  	return func(ctx context.Context, client clientset.Interface, namespace string, podCount int) error {
  1498  		return CreatePodWithPersistentVolume(ctx, client, namespace, claimTemplate, factory, podTemplate, podCount, true /* bindVolume */)
  1499  	}
  1500  }
  1501  
  1502  func makeUnboundPersistentVolumeClaim(storageClass string) *v1.PersistentVolumeClaim {
  1503  	return &v1.PersistentVolumeClaim{
  1504  		Spec: v1.PersistentVolumeClaimSpec{
  1505  			AccessModes:      []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
  1506  			StorageClassName: &storageClass,
  1507  			Resources: v1.VolumeResourceRequirements{
  1508  				Requests: v1.ResourceList{
  1509  					v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
  1510  				},
  1511  			},
  1512  		},
  1513  	}
  1514  }
  1515  
  1516  func NewCreatePodWithPersistentVolumeWithFirstConsumerStrategy(factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
  1517  	return func(ctx context.Context, client clientset.Interface, namespace string, podCount int) error {
  1518  		volumeBindingMode := storagev1.VolumeBindingWaitForFirstConsumer
  1519  		storageClass := &storagev1.StorageClass{
  1520  			ObjectMeta: metav1.ObjectMeta{
  1521  				Name: "storagev1-class-1",
  1522  			},
  1523  			Provisioner:       "kubernetes.io/gce-pd",
  1524  			VolumeBindingMode: &volumeBindingMode,
  1525  		}
  1526  		claimTemplate := makeUnboundPersistentVolumeClaim(storageClass.Name)
  1527  
  1528  		if err := CreateStorageClassWithRetries(client, storageClass); err != nil {
  1529  			return fmt.Errorf("failed to create storagev1 class: %v", err)
  1530  		}
  1531  
  1532  		factoryWithStorageClass := func(i int) *v1.PersistentVolume {
  1533  			pv := factory(i)
  1534  			pv.Spec.StorageClassName = storageClass.Name
  1535  			return pv
  1536  		}
  1537  
  1538  		return CreatePodWithPersistentVolume(ctx, client, namespace, claimTemplate, factoryWithStorageClass, podTemplate, podCount, false /* bindVolume */)
  1539  	}
  1540  }
  1541  
  1542  func NewSimpleCreatePodStrategy() TestPodCreateStrategy {
  1543  	basePod := &v1.Pod{
  1544  		ObjectMeta: metav1.ObjectMeta{
  1545  			GenerateName: "simple-pod-",
  1546  		},
  1547  		Spec: MakePodSpec(),
  1548  	}
  1549  	return NewCustomCreatePodStrategy(basePod)
  1550  }
  1551  
  1552  func NewSimpleWithControllerCreatePodStrategy(controllerName string) TestPodCreateStrategy {
  1553  	return func(ctx context.Context, client clientset.Interface, namespace string, podCount int) error {
  1554  		basePod := &v1.Pod{
  1555  			ObjectMeta: metav1.ObjectMeta{
  1556  				GenerateName: controllerName + "-pod-",
  1557  				Labels:       map[string]string{"name": controllerName},
  1558  			},
  1559  			Spec: MakePodSpec(),
  1560  		}
  1561  		if err := createController(client, controllerName, namespace, podCount, basePod); err != nil {
  1562  			return err
  1563  		}
  1564  		return CreatePod(ctx, client, namespace, podCount, basePod)
  1565  	}
  1566  }
  1567  
  1568  type SecretConfig struct {
  1569  	Content   map[string]string
  1570  	Client    clientset.Interface
  1571  	Name      string
  1572  	Namespace string
  1573  	// If set this function will be used to print log lines instead of klog.
  1574  	LogFunc func(fmt string, args ...interface{})
  1575  }
  1576  
  1577  func (config *SecretConfig) Run() error {
  1578  	secret := &v1.Secret{
  1579  		ObjectMeta: metav1.ObjectMeta{
  1580  			Name: config.Name,
  1581  		},
  1582  		StringData: map[string]string{},
  1583  	}
  1584  	for k, v := range config.Content {
  1585  		secret.StringData[k] = v
  1586  	}
  1587  
  1588  	if err := CreateSecretWithRetries(config.Client, config.Namespace, secret); err != nil {
  1589  		return fmt.Errorf("error creating secret: %v", err)
  1590  	}
  1591  	config.LogFunc("Created secret %v/%v", config.Namespace, config.Name)
  1592  	return nil
  1593  }
  1594  
  1595  func (config *SecretConfig) Stop() error {
  1596  	if err := DeleteResourceWithRetries(config.Client, api.Kind("Secret"), config.Namespace, config.Name, metav1.DeleteOptions{}); err != nil {
  1597  		return fmt.Errorf("error deleting secret: %v", err)
  1598  	}
  1599  	config.LogFunc("Deleted secret %v/%v", config.Namespace, config.Name)
  1600  	return nil
  1601  }
  1602  
  1603  // TODO: attach secrets using different possibilities: env vars, image pull secrets.
  1604  func attachSecrets(template *v1.PodTemplateSpec, secretNames []string) {
  1605  	volumes := make([]v1.Volume, 0, len(secretNames))
  1606  	mounts := make([]v1.VolumeMount, 0, len(secretNames))
  1607  	for _, name := range secretNames {
  1608  		volumes = append(volumes, v1.Volume{
  1609  			Name: name,
  1610  			VolumeSource: v1.VolumeSource{
  1611  				Secret: &v1.SecretVolumeSource{
  1612  					SecretName: name,
  1613  				},
  1614  			},
  1615  		})
  1616  		mounts = append(mounts, v1.VolumeMount{
  1617  			Name:      name,
  1618  			MountPath: fmt.Sprintf("/%v", name),
  1619  		})
  1620  	}
  1621  
  1622  	template.Spec.Volumes = volumes
  1623  	template.Spec.Containers[0].VolumeMounts = mounts
  1624  }
  1625  
  1626  type ConfigMapConfig struct {
  1627  	Content   map[string]string
  1628  	Client    clientset.Interface
  1629  	Name      string
  1630  	Namespace string
  1631  	// If set this function will be used to print log lines instead of klog.
  1632  	LogFunc func(fmt string, args ...interface{})
  1633  }
  1634  
  1635  func (config *ConfigMapConfig) Run() error {
  1636  	configMap := &v1.ConfigMap{
  1637  		ObjectMeta: metav1.ObjectMeta{
  1638  			Name: config.Name,
  1639  		},
  1640  		Data: map[string]string{},
  1641  	}
  1642  	for k, v := range config.Content {
  1643  		configMap.Data[k] = v
  1644  	}
  1645  
  1646  	if err := CreateConfigMapWithRetries(config.Client, config.Namespace, configMap); err != nil {
  1647  		return fmt.Errorf("error creating configmap: %v", err)
  1648  	}
  1649  	config.LogFunc("Created configmap %v/%v", config.Namespace, config.Name)
  1650  	return nil
  1651  }
  1652  
  1653  func (config *ConfigMapConfig) Stop() error {
  1654  	if err := DeleteResourceWithRetries(config.Client, api.Kind("ConfigMap"), config.Namespace, config.Name, metav1.DeleteOptions{}); err != nil {
  1655  		return fmt.Errorf("error deleting configmap: %v", err)
  1656  	}
  1657  	config.LogFunc("Deleted configmap %v/%v", config.Namespace, config.Name)
  1658  	return nil
  1659  }
  1660  
  1661  // TODO: attach configmaps using different possibilities: env vars.
  1662  func attachConfigMaps(template *v1.PodTemplateSpec, configMapNames []string) {
  1663  	volumes := make([]v1.Volume, 0, len(configMapNames))
  1664  	mounts := make([]v1.VolumeMount, 0, len(configMapNames))
  1665  	for _, name := range configMapNames {
  1666  		volumes = append(volumes, v1.Volume{
  1667  			Name: name,
  1668  			VolumeSource: v1.VolumeSource{
  1669  				ConfigMap: &v1.ConfigMapVolumeSource{
  1670  					LocalObjectReference: v1.LocalObjectReference{
  1671  						Name: name,
  1672  					},
  1673  				},
  1674  			},
  1675  		})
  1676  		mounts = append(mounts, v1.VolumeMount{
  1677  			Name:      name,
  1678  			MountPath: fmt.Sprintf("/%v", name),
  1679  		})
  1680  	}
  1681  
  1682  	template.Spec.Volumes = volumes
  1683  	template.Spec.Containers[0].VolumeMounts = mounts
  1684  }
  1685  
  1686  func (config *RCConfig) getTerminationGracePeriodSeconds(defaultGrace *int64) *int64 {
  1687  	if config.TerminationGracePeriodSeconds == nil || *config.TerminationGracePeriodSeconds < 0 {
  1688  		return defaultGrace
  1689  	}
  1690  	return config.TerminationGracePeriodSeconds
  1691  }
  1692  
  1693  func attachServiceAccountTokenProjection(template *v1.PodTemplateSpec, name string) {
  1694  	template.Spec.Containers[0].VolumeMounts = append(template.Spec.Containers[0].VolumeMounts,
  1695  		v1.VolumeMount{
  1696  			Name:      name,
  1697  			MountPath: "/var/service-account-tokens/" + name,
  1698  		})
  1699  
  1700  	template.Spec.Volumes = append(template.Spec.Volumes,
  1701  		v1.Volume{
  1702  			Name: name,
  1703  			VolumeSource: v1.VolumeSource{
  1704  				Projected: &v1.ProjectedVolumeSource{
  1705  					Sources: []v1.VolumeProjection{
  1706  						{
  1707  							ServiceAccountToken: &v1.ServiceAccountTokenProjection{
  1708  								Path:     "token",
  1709  								Audience: name,
  1710  							},
  1711  						},
  1712  						{
  1713  							ConfigMap: &v1.ConfigMapProjection{
  1714  								LocalObjectReference: v1.LocalObjectReference{
  1715  									Name: "kube-root-ca-crt",
  1716  								},
  1717  								Items: []v1.KeyToPath{
  1718  									{
  1719  										Key:  "ca.crt",
  1720  										Path: "ca.crt",
  1721  									},
  1722  								},
  1723  							},
  1724  						},
  1725  						{
  1726  							DownwardAPI: &v1.DownwardAPIProjection{
  1727  								Items: []v1.DownwardAPIVolumeFile{
  1728  									{
  1729  										Path: "namespace",
  1730  										FieldRef: &v1.ObjectFieldSelector{
  1731  											APIVersion: "v1",
  1732  											FieldPath:  "metadata.namespace",
  1733  										},
  1734  									},
  1735  								},
  1736  							},
  1737  						},
  1738  					},
  1739  				},
  1740  			},
  1741  		})
  1742  }
  1743  
  1744  type DaemonConfig struct {
  1745  	Client    clientset.Interface
  1746  	Name      string
  1747  	Namespace string
  1748  	Image     string
  1749  	// If set this function will be used to print log lines instead of klog.
  1750  	LogFunc func(fmt string, args ...interface{})
  1751  	// How long we wait for DaemonSet to become running.
  1752  	Timeout time.Duration
  1753  }
  1754  
  1755  func (config *DaemonConfig) Run(ctx context.Context) error {
  1756  	if config.Image == "" {
  1757  		config.Image = "registry.k8s.io/pause:3.9"
  1758  	}
  1759  	nameLabel := map[string]string{
  1760  		"name": config.Name + "-daemon",
  1761  	}
  1762  	daemon := &apps.DaemonSet{
  1763  		ObjectMeta: metav1.ObjectMeta{
  1764  			Name: config.Name,
  1765  		},
  1766  		Spec: apps.DaemonSetSpec{
  1767  			Template: v1.PodTemplateSpec{
  1768  				ObjectMeta: metav1.ObjectMeta{
  1769  					Labels: nameLabel,
  1770  				},
  1771  				Spec: v1.PodSpec{
  1772  					Containers: []v1.Container{
  1773  						{
  1774  							Name:  config.Name,
  1775  							Image: config.Image,
  1776  						},
  1777  					},
  1778  				},
  1779  			},
  1780  		},
  1781  	}
  1782  
  1783  	if err := CreateDaemonSetWithRetries(config.Client, config.Namespace, daemon); err != nil {
  1784  		return fmt.Errorf("error creating daemonset: %v", err)
  1785  	}
  1786  
  1787  	var nodes *v1.NodeList
  1788  	var err error
  1789  	for i := 0; i < retries; i++ {
  1790  		// Wait for all daemons to be running
  1791  		nodes, err = config.Client.CoreV1().Nodes().List(ctx, metav1.ListOptions{ResourceVersion: "0"})
  1792  		if err == nil {
  1793  			break
  1794  		} else if i+1 == retries {
  1795  			return fmt.Errorf("error listing Nodes while waiting for DaemonSet %v: %v", config.Name, err)
  1796  		}
  1797  	}
  1798  
  1799  	timeout := config.Timeout
  1800  	if timeout <= 0 {
  1801  		timeout = 5 * time.Minute
  1802  	}
  1803  
  1804  	ps, err := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything())
  1805  	if err != nil {
  1806  		return err
  1807  	}
  1808  	defer ps.Stop()
  1809  
  1810  	err = wait.Poll(time.Second, timeout, func() (bool, error) {
  1811  		pods := ps.List()
  1812  
  1813  		nodeHasDaemon := sets.NewString()
  1814  		for _, pod := range pods {
  1815  			podReady, _ := PodRunningReady(pod)
  1816  			if pod.Spec.NodeName != "" && podReady {
  1817  				nodeHasDaemon.Insert(pod.Spec.NodeName)
  1818  			}
  1819  		}
  1820  
  1821  		running := len(nodeHasDaemon)
  1822  		config.LogFunc("Found %v/%v Daemons %v running", running, config.Name, len(nodes.Items))
  1823  		return running == len(nodes.Items), nil
  1824  	})
  1825  	if err != nil {
  1826  		config.LogFunc("Timed out while waiting for DaemonSet %v/%v to be running.", config.Namespace, config.Name)
  1827  	} else {
  1828  		config.LogFunc("Created Daemon %v/%v", config.Namespace, config.Name)
  1829  	}
  1830  
  1831  	return err
  1832  }
  1833  

View as plain text