1
16
17 package utils
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "os"
24 "strings"
25 "sync"
26 "time"
27
28 apps "k8s.io/api/apps/v1"
29 batch "k8s.io/api/batch/v1"
30 v1 "k8s.io/api/core/v1"
31 storagev1 "k8s.io/api/storage/v1"
32 storagev1beta1 "k8s.io/api/storage/v1beta1"
33 apiequality "k8s.io/apimachinery/pkg/api/equality"
34 apierrors "k8s.io/apimachinery/pkg/api/errors"
35 "k8s.io/apimachinery/pkg/api/resource"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/fields"
38 "k8s.io/apimachinery/pkg/labels"
39 "k8s.io/apimachinery/pkg/runtime/schema"
40 "k8s.io/apimachinery/pkg/types"
41 "k8s.io/apimachinery/pkg/util/json"
42 "k8s.io/apimachinery/pkg/util/sets"
43 "k8s.io/apimachinery/pkg/util/strategicpatch"
44 "k8s.io/apimachinery/pkg/util/uuid"
45 "k8s.io/apimachinery/pkg/util/wait"
46 clientset "k8s.io/client-go/kubernetes"
47 scaleclient "k8s.io/client-go/scale"
48 "k8s.io/client-go/util/workqueue"
49 batchinternal "k8s.io/kubernetes/pkg/apis/batch"
50 api "k8s.io/kubernetes/pkg/apis/core"
51 extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
52 "k8s.io/utils/pointer"
53
54 "k8s.io/klog/v2"
55 )
56
57 const (
58
59 nonExist = "NonExist"
60 )
61
62 func removePtr(replicas *int32) int32 {
63 if replicas == nil {
64 return 0
65 }
66 return *replicas
67 }
68
69 func WaitUntilPodIsScheduled(ctx context.Context, c clientset.Interface, name, namespace string, timeout time.Duration) (*v1.Pod, error) {
70
71 p, err := c.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{ResourceVersion: "0"})
72 if err == nil && p.Spec.NodeName != "" {
73 return p, nil
74 }
75 pollingPeriod := 200 * time.Millisecond
76 startTime := time.Now()
77 for startTime.Add(timeout).After(time.Now()) && ctx.Err() == nil {
78 time.Sleep(pollingPeriod)
79 p, err := c.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{ResourceVersion: "0"})
80 if err == nil && p.Spec.NodeName != "" {
81 return p, nil
82 }
83 }
84 return nil, fmt.Errorf("timed out after %v when waiting for pod %v/%v to start", timeout, namespace, name)
85 }
86
87 func RunPodAndGetNodeName(ctx context.Context, c clientset.Interface, pod *v1.Pod, timeout time.Duration) (string, error) {
88 name := pod.Name
89 namespace := pod.Namespace
90 if err := CreatePodWithRetries(c, namespace, pod); err != nil {
91 return "", err
92 }
93 p, err := WaitUntilPodIsScheduled(ctx, c, name, namespace, timeout)
94 if err != nil {
95 return "", err
96 }
97 return p.Spec.NodeName, nil
98 }
99
100 type RunObjectConfig interface {
101 Run() error
102 GetName() string
103 GetNamespace() string
104 GetKind() schema.GroupKind
105 GetClient() clientset.Interface
106 GetScalesGetter() scaleclient.ScalesGetter
107 SetClient(clientset.Interface)
108 SetScalesClient(scaleclient.ScalesGetter)
109 GetReplicas() int
110 GetLabelValue(string) (string, bool)
111 GetGroupResource() schema.GroupResource
112 GetGroupVersionResource() schema.GroupVersionResource
113 }
114
115 type RCConfig struct {
116 Affinity *v1.Affinity
117 Client clientset.Interface
118 ScalesGetter scaleclient.ScalesGetter
119 Image string
120 Command []string
121 Name string
122 Namespace string
123 PollInterval time.Duration
124 Timeout time.Duration
125 PodStatusFile *os.File
126 Replicas int
127 CpuRequest int64
128 CpuLimit int64
129 MemRequest int64
130 MemLimit int64
131 GpuLimit int64
132 ReadinessProbe *v1.Probe
133 DNSPolicy *v1.DNSPolicy
134 PriorityClassName string
135 TerminationGracePeriodSeconds *int64
136 Lifecycle *v1.Lifecycle
137 SchedulerName string
138
139
140 Env map[string]string
141
142
143 Labels map[string]string
144 Annotations map[string]string
145
146
147 NodeSelector map[string]string
148
149
150 Tolerations []v1.Toleration
151
152
153 Ports map[string]int
154
155 HostPorts map[string]int
156
157 Volumes []v1.Volume
158 VolumeMounts []v1.VolumeMount
159
160
161
162 CreatedPods *[]*v1.Pod
163
164
165
166 MaxContainerFailures *int
167
168
169 MaxAllowedPodDeletions int
170
171
172 Silent bool
173
174
175 LogFunc func(fmt string, args ...interface{})
176
177
178 NodeDumpFunc func(ctx context.Context, c clientset.Interface, nodeNames []string, logFunc func(fmt string, args ...interface{}))
179 ContainerDumpFunc func(ctx context.Context, c clientset.Interface, ns string, logFunc func(ftm string, args ...interface{}))
180
181
182 SecretNames []string
183 ConfigMapNames []string
184
185 ServiceAccountTokenProjections int
186
187
188 AdditionalContainers []v1.Container
189
190
191 SecurityContext *v1.SecurityContext
192 }
193
194 func (rc *RCConfig) RCConfigLog(fmt string, args ...interface{}) {
195 if rc.LogFunc != nil {
196 rc.LogFunc(fmt, args...)
197 }
198 klog.Infof(fmt, args...)
199 }
200
201 type DeploymentConfig struct {
202 RCConfig
203 }
204
205 type ReplicaSetConfig struct {
206 RCConfig
207 }
208
209 type JobConfig struct {
210 RCConfig
211 }
212
213
214 type podInfo struct {
215 oldHostname string
216 oldPhase string
217 hostname string
218 phase string
219 }
220
221
222 type PodDiff map[string]*podInfo
223
224
225 func (p PodDiff) String(ignorePhases sets.String) string {
226 ret := ""
227 for name, info := range p {
228 if ignorePhases.Has(info.phase) {
229 continue
230 }
231 if info.phase == nonExist {
232 ret += fmt.Sprintf("Pod %v was deleted, had phase %v and host %v\n", name, info.oldPhase, info.oldHostname)
233 continue
234 }
235 phaseChange, hostChange := false, false
236 msg := fmt.Sprintf("Pod %v ", name)
237 if info.oldPhase != info.phase {
238 phaseChange = true
239 if info.oldPhase == nonExist {
240 msg += fmt.Sprintf("in phase %v ", info.phase)
241 } else {
242 msg += fmt.Sprintf("went from phase: %v -> %v ", info.oldPhase, info.phase)
243 }
244 }
245 if info.oldHostname != info.hostname {
246 hostChange = true
247 if info.oldHostname == nonExist || info.oldHostname == "" {
248 msg += fmt.Sprintf("assigned host %v ", info.hostname)
249 } else {
250 msg += fmt.Sprintf("went from host: %v -> %v ", info.oldHostname, info.hostname)
251 }
252 }
253 if phaseChange || hostChange {
254 ret += msg + "\n"
255 }
256 }
257 return ret
258 }
259
260
261
262 func (p PodDiff) DeletedPods() []string {
263 var deletedPods []string
264 for podName, podInfo := range p {
265 if podInfo.hostname == nonExist {
266 deletedPods = append(deletedPods, podName)
267 }
268 }
269 return deletedPods
270 }
271
272
273 func Diff(oldPods []*v1.Pod, curPods []*v1.Pod) PodDiff {
274 podInfoMap := PodDiff{}
275
276
277 for _, pod := range curPods {
278 podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist}
279 }
280
281
282 for _, pod := range oldPods {
283 if info, ok := podInfoMap[pod.Name]; ok {
284 info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase)
285 } else {
286 podInfoMap[pod.Name] = &podInfo{hostname: nonExist, phase: nonExist, oldHostname: pod.Spec.NodeName, oldPhase: string(pod.Status.Phase)}
287 }
288 }
289 return podInfoMap
290 }
291
292
293
294
295
296 func RunDeployment(ctx context.Context, config DeploymentConfig) error {
297 err := config.create()
298 if err != nil {
299 return err
300 }
301 return config.start(ctx)
302 }
303
304 func (config *DeploymentConfig) Run(ctx context.Context) error {
305 return RunDeployment(ctx, *config)
306 }
307
308 func (config *DeploymentConfig) GetKind() schema.GroupKind {
309 return extensionsinternal.Kind("Deployment")
310 }
311
312 func (config *DeploymentConfig) GetGroupResource() schema.GroupResource {
313 return extensionsinternal.Resource("deployments")
314 }
315
316 func (config *DeploymentConfig) GetGroupVersionResource() schema.GroupVersionResource {
317 return extensionsinternal.SchemeGroupVersion.WithResource("deployments")
318 }
319
320 func (config *DeploymentConfig) create() error {
321 deployment := &apps.Deployment{
322 ObjectMeta: metav1.ObjectMeta{
323 Name: config.Name,
324 },
325 Spec: apps.DeploymentSpec{
326 Replicas: pointer.Int32(int32(config.Replicas)),
327 Selector: &metav1.LabelSelector{
328 MatchLabels: map[string]string{
329 "name": config.Name,
330 },
331 },
332 Template: v1.PodTemplateSpec{
333 ObjectMeta: metav1.ObjectMeta{
334 Labels: map[string]string{"name": config.Name},
335 Annotations: config.Annotations,
336 },
337 Spec: v1.PodSpec{
338 Affinity: config.Affinity,
339 TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
340 Containers: []v1.Container{
341 {
342 Name: config.Name,
343 Image: config.Image,
344 Command: config.Command,
345 Ports: []v1.ContainerPort{{ContainerPort: 80}},
346 Lifecycle: config.Lifecycle,
347 SecurityContext: config.SecurityContext,
348 },
349 },
350 },
351 },
352 },
353 }
354
355 if len(config.AdditionalContainers) > 0 {
356 deployment.Spec.Template.Spec.Containers = append(deployment.Spec.Template.Spec.Containers, config.AdditionalContainers...)
357 }
358
359 if len(config.SecretNames) > 0 {
360 attachSecrets(&deployment.Spec.Template, config.SecretNames)
361 }
362 if len(config.ConfigMapNames) > 0 {
363 attachConfigMaps(&deployment.Spec.Template, config.ConfigMapNames)
364 }
365
366 for i := 0; i < config.ServiceAccountTokenProjections; i++ {
367 attachServiceAccountTokenProjection(&deployment.Spec.Template, fmt.Sprintf("tok-%d", i))
368 }
369
370 config.applyTo(&deployment.Spec.Template)
371
372 if err := CreateDeploymentWithRetries(config.Client, config.Namespace, deployment); err != nil {
373 return fmt.Errorf("error creating deployment: %v", err)
374 }
375 config.RCConfigLog("Created deployment with name: %v, namespace: %v, replica count: %v", deployment.Name, config.Namespace, removePtr(deployment.Spec.Replicas))
376 return nil
377 }
378
379
380
381
382
383 func RunReplicaSet(ctx context.Context, config ReplicaSetConfig) error {
384 err := config.create()
385 if err != nil {
386 return err
387 }
388 return config.start(ctx)
389 }
390
391 func (config *ReplicaSetConfig) Run(ctx context.Context) error {
392 return RunReplicaSet(ctx, *config)
393 }
394
395 func (config *ReplicaSetConfig) GetKind() schema.GroupKind {
396 return extensionsinternal.Kind("ReplicaSet")
397 }
398
399 func (config *ReplicaSetConfig) GetGroupResource() schema.GroupResource {
400 return extensionsinternal.Resource("replicasets")
401 }
402
403 func (config *ReplicaSetConfig) GetGroupVersionResource() schema.GroupVersionResource {
404 return extensionsinternal.SchemeGroupVersion.WithResource("replicasets")
405 }
406
407 func (config *ReplicaSetConfig) create() error {
408 rs := &apps.ReplicaSet{
409 ObjectMeta: metav1.ObjectMeta{
410 Name: config.Name,
411 },
412 Spec: apps.ReplicaSetSpec{
413 Replicas: pointer.Int32(int32(config.Replicas)),
414 Selector: &metav1.LabelSelector{
415 MatchLabels: map[string]string{
416 "name": config.Name,
417 },
418 },
419 Template: v1.PodTemplateSpec{
420 ObjectMeta: metav1.ObjectMeta{
421 Labels: map[string]string{"name": config.Name},
422 Annotations: config.Annotations,
423 },
424 Spec: v1.PodSpec{
425 Affinity: config.Affinity,
426 TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
427 Containers: []v1.Container{
428 {
429 Name: config.Name,
430 Image: config.Image,
431 Command: config.Command,
432 Ports: []v1.ContainerPort{{ContainerPort: 80}},
433 Lifecycle: config.Lifecycle,
434 SecurityContext: config.SecurityContext,
435 },
436 },
437 },
438 },
439 },
440 }
441
442 if len(config.AdditionalContainers) > 0 {
443 rs.Spec.Template.Spec.Containers = append(rs.Spec.Template.Spec.Containers, config.AdditionalContainers...)
444 }
445
446 if len(config.SecretNames) > 0 {
447 attachSecrets(&rs.Spec.Template, config.SecretNames)
448 }
449 if len(config.ConfigMapNames) > 0 {
450 attachConfigMaps(&rs.Spec.Template, config.ConfigMapNames)
451 }
452
453 config.applyTo(&rs.Spec.Template)
454
455 if err := CreateReplicaSetWithRetries(config.Client, config.Namespace, rs); err != nil {
456 return fmt.Errorf("error creating replica set: %v", err)
457 }
458 config.RCConfigLog("Created replica set with name: %v, namespace: %v, replica count: %v", rs.Name, config.Namespace, removePtr(rs.Spec.Replicas))
459 return nil
460 }
461
462
463
464
465
466 func RunJob(ctx context.Context, config JobConfig) error {
467 err := config.create()
468 if err != nil {
469 return err
470 }
471 return config.start(ctx)
472 }
473
474 func (config *JobConfig) Run(ctx context.Context) error {
475 return RunJob(ctx, *config)
476 }
477
478 func (config *JobConfig) GetKind() schema.GroupKind {
479 return batchinternal.Kind("Job")
480 }
481
482 func (config *JobConfig) GetGroupResource() schema.GroupResource {
483 return batchinternal.Resource("jobs")
484 }
485
486 func (config *JobConfig) GetGroupVersionResource() schema.GroupVersionResource {
487 return batchinternal.SchemeGroupVersion.WithResource("jobs")
488 }
489
490 func (config *JobConfig) create() error {
491 job := &batch.Job{
492 ObjectMeta: metav1.ObjectMeta{
493 Name: config.Name,
494 },
495 Spec: batch.JobSpec{
496 Parallelism: pointer.Int32(int32(config.Replicas)),
497 Completions: pointer.Int32(int32(config.Replicas)),
498 Template: v1.PodTemplateSpec{
499 ObjectMeta: metav1.ObjectMeta{
500 Labels: map[string]string{"name": config.Name},
501 Annotations: config.Annotations,
502 },
503 Spec: v1.PodSpec{
504 Affinity: config.Affinity,
505 TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
506 Containers: []v1.Container{
507 {
508 Name: config.Name,
509 Image: config.Image,
510 Command: config.Command,
511 Lifecycle: config.Lifecycle,
512 SecurityContext: config.SecurityContext,
513 },
514 },
515 RestartPolicy: v1.RestartPolicyOnFailure,
516 },
517 },
518 },
519 }
520
521 if len(config.SecretNames) > 0 {
522 attachSecrets(&job.Spec.Template, config.SecretNames)
523 }
524 if len(config.ConfigMapNames) > 0 {
525 attachConfigMaps(&job.Spec.Template, config.ConfigMapNames)
526 }
527
528 config.applyTo(&job.Spec.Template)
529
530 if err := CreateJobWithRetries(config.Client, config.Namespace, job); err != nil {
531 return fmt.Errorf("error creating job: %v", err)
532 }
533 config.RCConfigLog("Created job with name: %v, namespace: %v, parallelism/completions: %v", job.Name, config.Namespace, job.Spec.Parallelism)
534 return nil
535 }
536
537
538
539
540
541 func RunRC(ctx context.Context, config RCConfig) error {
542 err := config.create()
543 if err != nil {
544 return err
545 }
546 return config.start(ctx)
547 }
548
549 func (config *RCConfig) Run(ctx context.Context) error {
550 return RunRC(ctx, *config)
551 }
552
553 func (config *RCConfig) GetName() string {
554 return config.Name
555 }
556
557 func (config *RCConfig) GetNamespace() string {
558 return config.Namespace
559 }
560
561 func (config *RCConfig) GetKind() schema.GroupKind {
562 return api.Kind("ReplicationController")
563 }
564
565 func (config *RCConfig) GetGroupResource() schema.GroupResource {
566 return api.Resource("replicationcontrollers")
567 }
568
569 func (config *RCConfig) GetGroupVersionResource() schema.GroupVersionResource {
570 return api.SchemeGroupVersion.WithResource("replicationcontrollers")
571 }
572
573 func (config *RCConfig) GetClient() clientset.Interface {
574 return config.Client
575 }
576
577 func (config *RCConfig) GetScalesGetter() scaleclient.ScalesGetter {
578 return config.ScalesGetter
579 }
580
581 func (config *RCConfig) SetClient(c clientset.Interface) {
582 config.Client = c
583 }
584
585 func (config *RCConfig) SetScalesClient(getter scaleclient.ScalesGetter) {
586 config.ScalesGetter = getter
587 }
588
589 func (config *RCConfig) GetReplicas() int {
590 return config.Replicas
591 }
592
593 func (config *RCConfig) GetLabelValue(key string) (string, bool) {
594 value, found := config.Labels[key]
595 return value, found
596 }
597
598 func (config *RCConfig) create() error {
599 dnsDefault := v1.DNSDefault
600 if config.DNSPolicy == nil {
601 config.DNSPolicy = &dnsDefault
602 }
603 one := int64(1)
604 rc := &v1.ReplicationController{
605 ObjectMeta: metav1.ObjectMeta{
606 Name: config.Name,
607 },
608 Spec: v1.ReplicationControllerSpec{
609 Replicas: pointer.Int32(int32(config.Replicas)),
610 Selector: map[string]string{
611 "name": config.Name,
612 },
613 Template: &v1.PodTemplateSpec{
614 ObjectMeta: metav1.ObjectMeta{
615 Labels: map[string]string{"name": config.Name},
616 Annotations: config.Annotations,
617 },
618 Spec: v1.PodSpec{
619 SchedulerName: config.SchedulerName,
620 Affinity: config.Affinity,
621 Containers: []v1.Container{
622 {
623 Name: config.Name,
624 Image: config.Image,
625 Command: config.Command,
626 Ports: []v1.ContainerPort{{ContainerPort: 80}},
627 ReadinessProbe: config.ReadinessProbe,
628 Lifecycle: config.Lifecycle,
629 SecurityContext: config.SecurityContext,
630 },
631 },
632 DNSPolicy: *config.DNSPolicy,
633 NodeSelector: config.NodeSelector,
634 Tolerations: config.Tolerations,
635 TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(&one),
636 PriorityClassName: config.PriorityClassName,
637 },
638 },
639 },
640 }
641
642 if len(config.AdditionalContainers) > 0 {
643 rc.Spec.Template.Spec.Containers = append(rc.Spec.Template.Spec.Containers, config.AdditionalContainers...)
644 }
645
646 if len(config.SecretNames) > 0 {
647 attachSecrets(rc.Spec.Template, config.SecretNames)
648 }
649 if len(config.ConfigMapNames) > 0 {
650 attachConfigMaps(rc.Spec.Template, config.ConfigMapNames)
651 }
652
653 config.applyTo(rc.Spec.Template)
654
655 if err := CreateRCWithRetries(config.Client, config.Namespace, rc); err != nil {
656 return fmt.Errorf("error creating replication controller: %v", err)
657 }
658 config.RCConfigLog("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, config.Namespace, removePtr(rc.Spec.Replicas))
659 return nil
660 }
661
662 func (config *RCConfig) applyTo(template *v1.PodTemplateSpec) {
663 for k, v := range config.Env {
664 c := &template.Spec.Containers[0]
665 c.Env = append(c.Env, v1.EnvVar{Name: k, Value: v})
666 }
667 for k, v := range config.Labels {
668 template.ObjectMeta.Labels[k] = v
669 }
670 template.Spec.NodeSelector = make(map[string]string)
671 for k, v := range config.NodeSelector {
672 template.Spec.NodeSelector[k] = v
673 }
674 if config.Tolerations != nil {
675 template.Spec.Tolerations = append([]v1.Toleration{}, config.Tolerations...)
676 }
677 for k, v := range config.Ports {
678 c := &template.Spec.Containers[0]
679 c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v)})
680 }
681 for k, v := range config.HostPorts {
682 c := &template.Spec.Containers[0]
683 c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v), HostPort: int32(v)})
684 }
685 if config.CpuLimit > 0 || config.MemLimit > 0 || config.GpuLimit > 0 {
686 template.Spec.Containers[0].Resources.Limits = v1.ResourceList{}
687 }
688 if config.CpuLimit > 0 {
689 template.Spec.Containers[0].Resources.Limits[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI)
690 }
691 if config.MemLimit > 0 {
692 template.Spec.Containers[0].Resources.Limits[v1.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI)
693 }
694 if config.CpuRequest > 0 || config.MemRequest > 0 {
695 template.Spec.Containers[0].Resources.Requests = v1.ResourceList{}
696 }
697 if config.CpuRequest > 0 {
698 template.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI)
699 }
700 if config.MemRequest > 0 {
701 template.Spec.Containers[0].Resources.Requests[v1.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI)
702 }
703 if config.GpuLimit > 0 {
704 template.Spec.Containers[0].Resources.Limits["nvidia.com/gpu"] = *resource.NewQuantity(config.GpuLimit, resource.DecimalSI)
705 }
706 if config.Lifecycle != nil {
707 template.Spec.Containers[0].Lifecycle = config.Lifecycle
708 }
709 if len(config.Volumes) > 0 {
710 template.Spec.Volumes = config.Volumes
711 }
712 if len(config.VolumeMounts) > 0 {
713 template.Spec.Containers[0].VolumeMounts = config.VolumeMounts
714 }
715 if config.PriorityClassName != "" {
716 template.Spec.PriorityClassName = config.PriorityClassName
717 }
718 }
719
720 type RCStartupStatus struct {
721 Expected int
722 Terminating int
723 Running int
724 RunningButNotReady int
725 Waiting int
726 Pending int
727 Scheduled int
728 Unknown int
729 Inactive int
730 FailedContainers int
731 Created []*v1.Pod
732 ContainerRestartNodes sets.String
733 }
734
735 func (s *RCStartupStatus) String(name string) string {
736 return fmt.Sprintf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d terminating, %d unknown, %d runningButNotReady ",
737 name, len(s.Created), s.Expected, s.Running, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady)
738 }
739
740 func ComputeRCStartupStatus(pods []*v1.Pod, expected int) RCStartupStatus {
741 startupStatus := RCStartupStatus{
742 Expected: expected,
743 Created: make([]*v1.Pod, 0, expected),
744 ContainerRestartNodes: sets.NewString(),
745 }
746 for _, p := range pods {
747 if p.DeletionTimestamp != nil {
748 startupStatus.Terminating++
749 continue
750 }
751 startupStatus.Created = append(startupStatus.Created, p)
752 if p.Status.Phase == v1.PodRunning {
753 ready := false
754 for _, c := range p.Status.Conditions {
755 if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
756 ready = true
757 break
758 }
759 }
760 if ready {
761
762 startupStatus.Running++
763 } else {
764 startupStatus.RunningButNotReady++
765 }
766 for _, v := range FailedContainers(p) {
767 startupStatus.FailedContainers = startupStatus.FailedContainers + v.Restarts
768 startupStatus.ContainerRestartNodes.Insert(p.Spec.NodeName)
769 }
770 } else if p.Status.Phase == v1.PodPending {
771 if p.Spec.NodeName == "" {
772 startupStatus.Waiting++
773 } else {
774 startupStatus.Pending++
775 }
776 } else if p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed {
777 startupStatus.Inactive++
778 } else if p.Status.Phase == v1.PodUnknown {
779 startupStatus.Unknown++
780 }
781
782 if p.Spec.NodeName != "" {
783 startupStatus.Scheduled++
784 }
785 }
786 return startupStatus
787 }
788
789 func (config *RCConfig) start(ctx context.Context) error {
790
791 var maxContainerFailures int
792 if config.MaxContainerFailures == nil {
793 maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01))
794 } else {
795 maxContainerFailures = *config.MaxContainerFailures
796 }
797
798 label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
799
800 ps, err := NewPodStore(config.Client, config.Namespace, label, fields.Everything())
801 if err != nil {
802 return err
803 }
804 defer ps.Stop()
805
806 interval := config.PollInterval
807 if interval <= 0 {
808 interval = 10 * time.Second
809 }
810 timeout := config.Timeout
811 if timeout <= 0 {
812 timeout = 5 * time.Minute
813 }
814 oldPods := make([]*v1.Pod, 0)
815 oldRunning := 0
816 lastChange := time.Now()
817 podDeletionsCount := 0
818 for oldRunning != config.Replicas {
819 time.Sleep(interval)
820
821 pods := ps.List()
822 startupStatus := ComputeRCStartupStatus(pods, config.Replicas)
823
824 if config.CreatedPods != nil {
825 *config.CreatedPods = startupStatus.Created
826 }
827 if !config.Silent {
828 config.RCConfigLog(startupStatus.String(config.Name))
829 }
830
831 if config.PodStatusFile != nil {
832 fmt.Fprintf(config.PodStatusFile, "%d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown, %d, runningButNotReady\n", startupStatus.Running, startupStatus.Pending, startupStatus.Waiting, startupStatus.Inactive, startupStatus.Unknown, startupStatus.RunningButNotReady)
833 }
834
835 if startupStatus.FailedContainers > maxContainerFailures {
836 if config.NodeDumpFunc != nil {
837 config.NodeDumpFunc(ctx, config.Client, startupStatus.ContainerRestartNodes.List(), config.RCConfigLog)
838 }
839 if config.ContainerDumpFunc != nil {
840
841 config.ContainerDumpFunc(ctx, config.Client, config.Namespace, config.RCConfigLog)
842 }
843 return fmt.Errorf("%d containers failed which is more than allowed %d", startupStatus.FailedContainers, maxContainerFailures)
844 }
845
846 diff := Diff(oldPods, pods)
847 deletedPods := diff.DeletedPods()
848 podDeletionsCount += len(deletedPods)
849 if podDeletionsCount > config.MaxAllowedPodDeletions {
850
851 err := fmt.Errorf("%d pods disappeared for %s: %v", podDeletionsCount, config.Name, strings.Join(deletedPods, ", "))
852 config.RCConfigLog(err.Error())
853 config.RCConfigLog(diff.String(sets.NewString()))
854 return err
855 }
856
857 if len(pods) > len(oldPods) || startupStatus.Running > oldRunning {
858 lastChange = time.Now()
859 }
860 oldPods = pods
861 oldRunning = startupStatus.Running
862
863 if time.Since(lastChange) > timeout {
864 break
865 }
866 }
867
868 if oldRunning != config.Replicas {
869
870 options := metav1.ListOptions{LabelSelector: label.String()}
871 if pods, err := config.Client.CoreV1().Pods(config.Namespace).List(ctx, options); err == nil {
872 for _, pod := range pods.Items {
873 config.RCConfigLog("Pod %s\t%s\t%s\t%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, pod.DeletionTimestamp)
874 }
875 } else {
876 config.RCConfigLog("Can't list pod debug info: %v", err)
877 }
878 return fmt.Errorf("only %d pods started out of %d", oldRunning, config.Replicas)
879 }
880 return nil
881 }
882
883
884
885
886 func StartPods(c clientset.Interface, replicas int, namespace string, podNamePrefix string,
887 pod v1.Pod, waitForRunning bool, logFunc func(fmt string, args ...interface{})) error {
888
889 if replicas < 1 {
890 panic("StartPods: number of replicas must be non-zero")
891 }
892 startPodsID := string(uuid.NewUUID())
893 for i := 0; i < replicas; i++ {
894 podName := fmt.Sprintf("%v-%v", podNamePrefix, i)
895 pod.ObjectMeta.Name = podName
896 pod.ObjectMeta.Labels["name"] = podName
897 pod.ObjectMeta.Labels["startPodsID"] = startPodsID
898 pod.Spec.Containers[0].Name = podName
899 if err := CreatePodWithRetries(c, namespace, &pod); err != nil {
900 return err
901 }
902 }
903 logFunc("Waiting for running...")
904 if waitForRunning {
905 label := labels.SelectorFromSet(labels.Set(map[string]string{"startPodsID": startPodsID}))
906 err := WaitForPodsWithLabelRunning(c, namespace, label)
907 if err != nil {
908 return fmt.Errorf("error waiting for %d pods to be running - probably a timeout: %v", replicas, err)
909 }
910 }
911 return nil
912 }
913
914
915
916 func WaitForPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector) error {
917 return WaitForEnoughPodsWithLabelRunning(c, ns, label, -1)
918 }
919
920
921
922 func WaitForEnoughPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector, replicas int) error {
923 running := false
924 ps, err := NewPodStore(c, ns, label, fields.Everything())
925 if err != nil {
926 return err
927 }
928 defer ps.Stop()
929
930 for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
931 pods := ps.List()
932 if len(pods) == 0 {
933 continue
934 }
935 runningPodsCount := 0
936 for _, p := range pods {
937 if p.Status.Phase == v1.PodRunning {
938 runningPodsCount++
939 }
940 }
941 if (replicas < 0 && runningPodsCount < len(pods)) || (runningPodsCount < replicas) {
942 continue
943 }
944 running = true
945 break
946 }
947 if !running {
948 return fmt.Errorf("timeout while waiting for pods with labels %q to be running", label.String())
949 }
950 return nil
951 }
952
953 type CountToStrategy struct {
954 Count int
955 Strategy PrepareNodeStrategy
956 }
957
958 type TestNodePreparer interface {
959 PrepareNodes(ctx context.Context, nextNodeIndex int) error
960 CleanupNodes(ctx context.Context) error
961 }
962
963 type PrepareNodeStrategy interface {
964
965 PreparePatch(node *v1.Node) []byte
966
967
968 PrepareDependentObjects(ctx context.Context, node *v1.Node, client clientset.Interface) error
969
970 CleanupNode(ctx context.Context, node *v1.Node) *v1.Node
971
972
973 CleanupDependentObjects(ctx context.Context, nodeName string, client clientset.Interface) error
974 }
975
976 type TrivialNodePrepareStrategy struct{}
977
978 var _ PrepareNodeStrategy = &TrivialNodePrepareStrategy{}
979
980 func (*TrivialNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
981 return []byte{}
982 }
983
984 func (*TrivialNodePrepareStrategy) CleanupNode(ctx context.Context, node *v1.Node) *v1.Node {
985 nodeCopy := *node
986 return &nodeCopy
987 }
988
989 func (*TrivialNodePrepareStrategy) PrepareDependentObjects(ctx context.Context, node *v1.Node, client clientset.Interface) error {
990 return nil
991 }
992
993 func (*TrivialNodePrepareStrategy) CleanupDependentObjects(ctx context.Context, nodeName string, client clientset.Interface) error {
994 return nil
995 }
996
997 type LabelNodePrepareStrategy struct {
998 LabelKey string
999 LabelValues []string
1000 roundRobinIdx int
1001 }
1002
1003 var _ PrepareNodeStrategy = &LabelNodePrepareStrategy{}
1004
1005 func NewLabelNodePrepareStrategy(labelKey string, labelValues ...string) *LabelNodePrepareStrategy {
1006 return &LabelNodePrepareStrategy{
1007 LabelKey: labelKey,
1008 LabelValues: labelValues,
1009 }
1010 }
1011
1012 func (s *LabelNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
1013 labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.LabelKey, s.LabelValues[s.roundRobinIdx])
1014 patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
1015 s.roundRobinIdx++
1016 if s.roundRobinIdx == len(s.LabelValues) {
1017 s.roundRobinIdx = 0
1018 }
1019 return []byte(patch)
1020 }
1021
1022 func (s *LabelNodePrepareStrategy) CleanupNode(ctx context.Context, node *v1.Node) *v1.Node {
1023 nodeCopy := node.DeepCopy()
1024 if node.Labels != nil && len(node.Labels[s.LabelKey]) != 0 {
1025 delete(nodeCopy.Labels, s.LabelKey)
1026 }
1027 return nodeCopy
1028 }
1029
1030 func (*LabelNodePrepareStrategy) PrepareDependentObjects(ctx context.Context, node *v1.Node, client clientset.Interface) error {
1031 return nil
1032 }
1033
1034 func (*LabelNodePrepareStrategy) CleanupDependentObjects(ctx context.Context, nodeName string, client clientset.Interface) error {
1035 return nil
1036 }
1037
1038
1039
1040
1041 type NodeAllocatableStrategy struct {
1042
1043 NodeAllocatable map[v1.ResourceName]string
1044
1045 CsiNodeAllocatable map[string]*storagev1.VolumeNodeResources
1046
1047 MigratedPlugins []string
1048 }
1049
1050 var _ PrepareNodeStrategy = &NodeAllocatableStrategy{}
1051
1052 func NewNodeAllocatableStrategy(nodeAllocatable map[v1.ResourceName]string, csiNodeAllocatable map[string]*storagev1.VolumeNodeResources, migratedPlugins []string) *NodeAllocatableStrategy {
1053 return &NodeAllocatableStrategy{
1054 NodeAllocatable: nodeAllocatable,
1055 CsiNodeAllocatable: csiNodeAllocatable,
1056 MigratedPlugins: migratedPlugins,
1057 }
1058 }
1059
1060 func (s *NodeAllocatableStrategy) PreparePatch(node *v1.Node) []byte {
1061 newNode := node.DeepCopy()
1062 for name, value := range s.NodeAllocatable {
1063 newNode.Status.Allocatable[name] = resource.MustParse(value)
1064 }
1065
1066 oldJSON, err := json.Marshal(node)
1067 if err != nil {
1068 panic(err)
1069 }
1070 newJSON, err := json.Marshal(newNode)
1071 if err != nil {
1072 panic(err)
1073 }
1074
1075 patch, err := strategicpatch.CreateTwoWayMergePatch(oldJSON, newJSON, v1.Node{})
1076 if err != nil {
1077 panic(err)
1078 }
1079 return patch
1080 }
1081
1082 func (s *NodeAllocatableStrategy) CleanupNode(ctx context.Context, node *v1.Node) *v1.Node {
1083 nodeCopy := node.DeepCopy()
1084 for name := range s.NodeAllocatable {
1085 delete(nodeCopy.Status.Allocatable, name)
1086 }
1087 return nodeCopy
1088 }
1089
1090 func (s *NodeAllocatableStrategy) createCSINode(ctx context.Context, nodeName string, client clientset.Interface) error {
1091 csiNode := &storagev1.CSINode{
1092 ObjectMeta: metav1.ObjectMeta{
1093 Name: nodeName,
1094 Annotations: map[string]string{
1095 v1.MigratedPluginsAnnotationKey: strings.Join(s.MigratedPlugins, ","),
1096 },
1097 },
1098 Spec: storagev1.CSINodeSpec{
1099 Drivers: []storagev1.CSINodeDriver{},
1100 },
1101 }
1102
1103 for driver, allocatable := range s.CsiNodeAllocatable {
1104 d := storagev1.CSINodeDriver{
1105 Name: driver,
1106 Allocatable: allocatable,
1107 NodeID: nodeName,
1108 }
1109 csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
1110 }
1111
1112 _, err := client.StorageV1().CSINodes().Create(ctx, csiNode, metav1.CreateOptions{})
1113 if apierrors.IsAlreadyExists(err) {
1114
1115
1116 err = apierrors.NewConflict(storagev1beta1.Resource("csinodes"), nodeName, err)
1117 }
1118 return err
1119 }
1120
1121 func (s *NodeAllocatableStrategy) updateCSINode(ctx context.Context, csiNode *storagev1.CSINode, client clientset.Interface) error {
1122 for driverName, allocatable := range s.CsiNodeAllocatable {
1123 found := false
1124 for i, driver := range csiNode.Spec.Drivers {
1125 if driver.Name == driverName {
1126 found = true
1127 csiNode.Spec.Drivers[i].Allocatable = allocatable
1128 break
1129 }
1130 }
1131 if !found {
1132 d := storagev1.CSINodeDriver{
1133 Name: driverName,
1134 Allocatable: allocatable,
1135 }
1136
1137 csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
1138 }
1139 }
1140 csiNode.Annotations[v1.MigratedPluginsAnnotationKey] = strings.Join(s.MigratedPlugins, ",")
1141
1142 _, err := client.StorageV1().CSINodes().Update(ctx, csiNode, metav1.UpdateOptions{})
1143 return err
1144 }
1145
1146 func (s *NodeAllocatableStrategy) PrepareDependentObjects(ctx context.Context, node *v1.Node, client clientset.Interface) error {
1147 csiNode, err := client.StorageV1().CSINodes().Get(ctx, node.Name, metav1.GetOptions{})
1148 if err != nil {
1149 if apierrors.IsNotFound(err) {
1150 return s.createCSINode(ctx, node.Name, client)
1151 }
1152 return err
1153 }
1154 return s.updateCSINode(ctx, csiNode, client)
1155 }
1156
1157 func (s *NodeAllocatableStrategy) CleanupDependentObjects(ctx context.Context, nodeName string, client clientset.Interface) error {
1158 csiNode, err := client.StorageV1().CSINodes().Get(ctx, nodeName, metav1.GetOptions{})
1159 if err != nil {
1160 if apierrors.IsNotFound(err) {
1161 return nil
1162 }
1163 return err
1164 }
1165
1166 for driverName := range s.CsiNodeAllocatable {
1167 for i, driver := range csiNode.Spec.Drivers {
1168 if driver.Name == driverName {
1169 csiNode.Spec.Drivers[i].Allocatable = nil
1170 }
1171 }
1172 }
1173 return s.updateCSINode(ctx, csiNode, client)
1174 }
1175
1176
1177 type UniqueNodeLabelStrategy struct {
1178 LabelKey string
1179 }
1180
1181 var _ PrepareNodeStrategy = &UniqueNodeLabelStrategy{}
1182
1183 func NewUniqueNodeLabelStrategy(labelKey string) *UniqueNodeLabelStrategy {
1184 return &UniqueNodeLabelStrategy{
1185 LabelKey: labelKey,
1186 }
1187 }
1188
1189 func (s *UniqueNodeLabelStrategy) PreparePatch(*v1.Node) []byte {
1190 labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.LabelKey, string(uuid.NewUUID()))
1191 patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
1192 return []byte(patch)
1193 }
1194
1195 func (s *UniqueNodeLabelStrategy) CleanupNode(ctx context.Context, node *v1.Node) *v1.Node {
1196 nodeCopy := node.DeepCopy()
1197 if node.Labels != nil && len(node.Labels[s.LabelKey]) != 0 {
1198 delete(nodeCopy.Labels, s.LabelKey)
1199 }
1200 return nodeCopy
1201 }
1202
1203 func (*UniqueNodeLabelStrategy) PrepareDependentObjects(ctx context.Context, node *v1.Node, client clientset.Interface) error {
1204 return nil
1205 }
1206
1207 func (*UniqueNodeLabelStrategy) CleanupDependentObjects(ctx context.Context, nodeName string, client clientset.Interface) error {
1208 return nil
1209 }
1210
1211 func DoPrepareNode(ctx context.Context, client clientset.Interface, node *v1.Node, strategy PrepareNodeStrategy) error {
1212 var err error
1213 patch := strategy.PreparePatch(node)
1214 if len(patch) == 0 {
1215 return nil
1216 }
1217 for attempt := 0; attempt < retries; attempt++ {
1218 if _, err = client.CoreV1().Nodes().Patch(ctx, node.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err == nil {
1219 break
1220 }
1221 if !apierrors.IsConflict(err) {
1222 return fmt.Errorf("error while applying patch %v to Node %v: %v", string(patch), node.Name, err)
1223 }
1224 time.Sleep(100 * time.Millisecond)
1225 }
1226 if err != nil {
1227 return fmt.Errorf("too many conflicts when applying patch %v to Node %v: %s", string(patch), node.Name, err)
1228 }
1229
1230 for attempt := 0; attempt < retries; attempt++ {
1231 if err = strategy.PrepareDependentObjects(ctx, node, client); err == nil {
1232 break
1233 }
1234 if !apierrors.IsConflict(err) {
1235 return fmt.Errorf("error while preparing objects for node %s: %s", node.Name, err)
1236 }
1237 time.Sleep(100 * time.Millisecond)
1238 }
1239 if err != nil {
1240 return fmt.Errorf("too many conflicts when creating objects for node %s: %s", node.Name, err)
1241 }
1242 return nil
1243 }
1244
1245 func DoCleanupNode(ctx context.Context, client clientset.Interface, nodeName string, strategy PrepareNodeStrategy) error {
1246 var err error
1247 for attempt := 0; attempt < retries; attempt++ {
1248 var node *v1.Node
1249 node, err = client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
1250 if err != nil {
1251 return fmt.Errorf("skipping cleanup of Node: failed to get Node %v: %v", nodeName, err)
1252 }
1253 updatedNode := strategy.CleanupNode(ctx, node)
1254 if apiequality.Semantic.DeepEqual(node, updatedNode) {
1255 return nil
1256 }
1257 if _, err = client.CoreV1().Nodes().Update(ctx, updatedNode, metav1.UpdateOptions{}); err == nil {
1258 break
1259 }
1260 if !apierrors.IsConflict(err) {
1261 return fmt.Errorf("error when updating Node %v: %v", nodeName, err)
1262 }
1263 time.Sleep(100 * time.Millisecond)
1264 }
1265 if err != nil {
1266 return fmt.Errorf("too many conflicts when trying to cleanup Node %v: %s", nodeName, err)
1267 }
1268
1269 for attempt := 0; attempt < retries; attempt++ {
1270 err = strategy.CleanupDependentObjects(ctx, nodeName, client)
1271 if err == nil {
1272 break
1273 }
1274 if !apierrors.IsConflict(err) {
1275 return fmt.Errorf("error when cleaning up Node %v objects: %v", nodeName, err)
1276 }
1277 time.Sleep(100 * time.Millisecond)
1278 }
1279 if err != nil {
1280 return fmt.Errorf("too many conflicts when trying to cleanup Node %v objects: %s", nodeName, err)
1281 }
1282 return nil
1283 }
1284
1285 type TestPodCreateStrategy func(ctx context.Context, client clientset.Interface, namespace string, podCount int) error
1286
1287 type CountToPodStrategy struct {
1288 Count int
1289 Strategy TestPodCreateStrategy
1290 }
1291
1292 type TestPodCreatorConfig map[string][]CountToPodStrategy
1293
1294 func NewTestPodCreatorConfig() *TestPodCreatorConfig {
1295 config := make(TestPodCreatorConfig)
1296 return &config
1297 }
1298
1299 func (c *TestPodCreatorConfig) AddStrategy(
1300 namespace string, podCount int, strategy TestPodCreateStrategy) {
1301 (*c)[namespace] = append((*c)[namespace], CountToPodStrategy{Count: podCount, Strategy: strategy})
1302 }
1303
1304 type TestPodCreator struct {
1305 Client clientset.Interface
1306
1307 Config *TestPodCreatorConfig
1308 }
1309
1310 func NewTestPodCreator(client clientset.Interface, config *TestPodCreatorConfig) *TestPodCreator {
1311 return &TestPodCreator{
1312 Client: client,
1313 Config: config,
1314 }
1315 }
1316
1317 func (c *TestPodCreator) CreatePods(ctx context.Context) error {
1318 for ns, v := range *(c.Config) {
1319 for _, countToStrategy := range v {
1320 if err := countToStrategy.Strategy(ctx, c.Client, ns, countToStrategy.Count); err != nil {
1321 return err
1322 }
1323 }
1324 }
1325 return nil
1326 }
1327
1328 func MakePodSpec() v1.PodSpec {
1329 return v1.PodSpec{
1330 Containers: []v1.Container{{
1331 Name: "pause",
1332 Image: "registry.k8s.io/pause:3.9",
1333 Ports: []v1.ContainerPort{{ContainerPort: 80}},
1334 Resources: v1.ResourceRequirements{
1335 Limits: v1.ResourceList{
1336 v1.ResourceCPU: resource.MustParse("100m"),
1337 v1.ResourceMemory: resource.MustParse("500Mi"),
1338 },
1339 Requests: v1.ResourceList{
1340 v1.ResourceCPU: resource.MustParse("100m"),
1341 v1.ResourceMemory: resource.MustParse("500Mi"),
1342 },
1343 },
1344 }},
1345 }
1346 }
1347
1348 func makeCreatePod(client clientset.Interface, namespace string, podTemplate *v1.Pod) error {
1349 if err := CreatePodWithRetries(client, namespace, podTemplate); err != nil {
1350 return fmt.Errorf("error creating pod: %v", err)
1351 }
1352 return nil
1353 }
1354
1355 func CreatePod(ctx context.Context, client clientset.Interface, namespace string, podCount int, podTemplate *v1.Pod) error {
1356 var createError error
1357 lock := sync.Mutex{}
1358 createPodFunc := func(i int) {
1359
1360
1361
1362 if err := makeCreatePod(client, namespace, podTemplate.DeepCopy()); err != nil {
1363 lock.Lock()
1364 defer lock.Unlock()
1365 createError = err
1366 }
1367 }
1368
1369 if podCount < 30 {
1370 workqueue.ParallelizeUntil(ctx, podCount, podCount, createPodFunc)
1371 } else {
1372 workqueue.ParallelizeUntil(ctx, 30, podCount, createPodFunc)
1373 }
1374 return createError
1375 }
1376
1377 func CreatePodWithPersistentVolume(ctx context.Context, client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int, bindVolume bool) error {
1378 var createError error
1379 lock := sync.Mutex{}
1380 createPodFunc := func(i int) {
1381 pvcName := fmt.Sprintf("pvc-%d", i)
1382
1383 pvc := claimTemplate.DeepCopy()
1384 pvc.Name = pvcName
1385
1386 pv := factory(i)
1387
1388
1389 pv.Name = fmt.Sprintf("%s-%s", namespace, pv.Name)
1390 if bindVolume {
1391
1392 pv.Spec.ClaimRef = &v1.ObjectReference{
1393 Kind: "PersistentVolumeClaim",
1394 Namespace: namespace,
1395 Name: pvcName,
1396 APIVersion: "v1",
1397 }
1398 pv.Status.Phase = v1.VolumeBound
1399
1400
1401 pvc.Spec.VolumeName = pv.Name
1402 pvc.Status.Phase = v1.ClaimBound
1403 } else {
1404 pv.Status.Phase = v1.VolumeAvailable
1405 }
1406
1407
1408 if err := CreatePersistentVolumeClaimWithRetries(client, namespace, pvc); err != nil {
1409 lock.Lock()
1410 defer lock.Unlock()
1411 createError = fmt.Errorf("error creating PVC: %s", err)
1412 return
1413 }
1414
1415
1416 if _, err := client.CoreV1().PersistentVolumeClaims(namespace).UpdateStatus(ctx, pvc, metav1.UpdateOptions{}); err != nil {
1417 lock.Lock()
1418 defer lock.Unlock()
1419 createError = fmt.Errorf("error updating PVC status: %s", err)
1420 return
1421 }
1422
1423 if err := CreatePersistentVolumeWithRetries(client, pv); err != nil {
1424 lock.Lock()
1425 defer lock.Unlock()
1426 createError = fmt.Errorf("error creating PV: %s", err)
1427 return
1428 }
1429
1430 if _, err := client.CoreV1().PersistentVolumes().UpdateStatus(ctx, pv, metav1.UpdateOptions{}); err != nil {
1431 lock.Lock()
1432 defer lock.Unlock()
1433 createError = fmt.Errorf("error updating PV status: %s", err)
1434 return
1435 }
1436
1437
1438 pod := podTemplate.DeepCopy()
1439 pod.Spec.Volumes = []v1.Volume{
1440 {
1441 Name: "vol",
1442 VolumeSource: v1.VolumeSource{
1443 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
1444 ClaimName: pvcName,
1445 },
1446 },
1447 },
1448 }
1449 if err := makeCreatePod(client, namespace, pod); err != nil {
1450 lock.Lock()
1451 defer lock.Unlock()
1452 createError = err
1453 return
1454 }
1455 }
1456
1457 if count < 30 {
1458 workqueue.ParallelizeUntil(ctx, count, count, createPodFunc)
1459 } else {
1460 workqueue.ParallelizeUntil(ctx, 30, count, createPodFunc)
1461 }
1462 return createError
1463 }
1464
1465 func createController(client clientset.Interface, controllerName, namespace string, podCount int, podTemplate *v1.Pod) error {
1466 rc := &v1.ReplicationController{
1467 ObjectMeta: metav1.ObjectMeta{
1468 Name: controllerName,
1469 },
1470 Spec: v1.ReplicationControllerSpec{
1471 Replicas: pointer.Int32(int32(podCount)),
1472 Selector: map[string]string{"name": controllerName},
1473 Template: &v1.PodTemplateSpec{
1474 ObjectMeta: metav1.ObjectMeta{
1475 Labels: map[string]string{"name": controllerName},
1476 },
1477 Spec: podTemplate.Spec,
1478 },
1479 },
1480 }
1481 if err := CreateRCWithRetries(client, namespace, rc); err != nil {
1482 return fmt.Errorf("error creating replication controller: %v", err)
1483 }
1484 return nil
1485 }
1486
1487 func NewCustomCreatePodStrategy(podTemplate *v1.Pod) TestPodCreateStrategy {
1488 return func(ctx context.Context, client clientset.Interface, namespace string, podCount int) error {
1489 return CreatePod(ctx, client, namespace, podCount, podTemplate)
1490 }
1491 }
1492
1493
1494 type volumeFactory func(uniqueID int) *v1.PersistentVolume
1495
1496 func NewCreatePodWithPersistentVolumeStrategy(claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
1497 return func(ctx context.Context, client clientset.Interface, namespace string, podCount int) error {
1498 return CreatePodWithPersistentVolume(ctx, client, namespace, claimTemplate, factory, podTemplate, podCount, true )
1499 }
1500 }
1501
1502 func makeUnboundPersistentVolumeClaim(storageClass string) *v1.PersistentVolumeClaim {
1503 return &v1.PersistentVolumeClaim{
1504 Spec: v1.PersistentVolumeClaimSpec{
1505 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
1506 StorageClassName: &storageClass,
1507 Resources: v1.VolumeResourceRequirements{
1508 Requests: v1.ResourceList{
1509 v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
1510 },
1511 },
1512 },
1513 }
1514 }
1515
1516 func NewCreatePodWithPersistentVolumeWithFirstConsumerStrategy(factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
1517 return func(ctx context.Context, client clientset.Interface, namespace string, podCount int) error {
1518 volumeBindingMode := storagev1.VolumeBindingWaitForFirstConsumer
1519 storageClass := &storagev1.StorageClass{
1520 ObjectMeta: metav1.ObjectMeta{
1521 Name: "storagev1-class-1",
1522 },
1523 Provisioner: "kubernetes.io/gce-pd",
1524 VolumeBindingMode: &volumeBindingMode,
1525 }
1526 claimTemplate := makeUnboundPersistentVolumeClaim(storageClass.Name)
1527
1528 if err := CreateStorageClassWithRetries(client, storageClass); err != nil {
1529 return fmt.Errorf("failed to create storagev1 class: %v", err)
1530 }
1531
1532 factoryWithStorageClass := func(i int) *v1.PersistentVolume {
1533 pv := factory(i)
1534 pv.Spec.StorageClassName = storageClass.Name
1535 return pv
1536 }
1537
1538 return CreatePodWithPersistentVolume(ctx, client, namespace, claimTemplate, factoryWithStorageClass, podTemplate, podCount, false )
1539 }
1540 }
1541
1542 func NewSimpleCreatePodStrategy() TestPodCreateStrategy {
1543 basePod := &v1.Pod{
1544 ObjectMeta: metav1.ObjectMeta{
1545 GenerateName: "simple-pod-",
1546 },
1547 Spec: MakePodSpec(),
1548 }
1549 return NewCustomCreatePodStrategy(basePod)
1550 }
1551
1552 func NewSimpleWithControllerCreatePodStrategy(controllerName string) TestPodCreateStrategy {
1553 return func(ctx context.Context, client clientset.Interface, namespace string, podCount int) error {
1554 basePod := &v1.Pod{
1555 ObjectMeta: metav1.ObjectMeta{
1556 GenerateName: controllerName + "-pod-",
1557 Labels: map[string]string{"name": controllerName},
1558 },
1559 Spec: MakePodSpec(),
1560 }
1561 if err := createController(client, controllerName, namespace, podCount, basePod); err != nil {
1562 return err
1563 }
1564 return CreatePod(ctx, client, namespace, podCount, basePod)
1565 }
1566 }
1567
1568 type SecretConfig struct {
1569 Content map[string]string
1570 Client clientset.Interface
1571 Name string
1572 Namespace string
1573
1574 LogFunc func(fmt string, args ...interface{})
1575 }
1576
1577 func (config *SecretConfig) Run() error {
1578 secret := &v1.Secret{
1579 ObjectMeta: metav1.ObjectMeta{
1580 Name: config.Name,
1581 },
1582 StringData: map[string]string{},
1583 }
1584 for k, v := range config.Content {
1585 secret.StringData[k] = v
1586 }
1587
1588 if err := CreateSecretWithRetries(config.Client, config.Namespace, secret); err != nil {
1589 return fmt.Errorf("error creating secret: %v", err)
1590 }
1591 config.LogFunc("Created secret %v/%v", config.Namespace, config.Name)
1592 return nil
1593 }
1594
1595 func (config *SecretConfig) Stop() error {
1596 if err := DeleteResourceWithRetries(config.Client, api.Kind("Secret"), config.Namespace, config.Name, metav1.DeleteOptions{}); err != nil {
1597 return fmt.Errorf("error deleting secret: %v", err)
1598 }
1599 config.LogFunc("Deleted secret %v/%v", config.Namespace, config.Name)
1600 return nil
1601 }
1602
1603
1604 func attachSecrets(template *v1.PodTemplateSpec, secretNames []string) {
1605 volumes := make([]v1.Volume, 0, len(secretNames))
1606 mounts := make([]v1.VolumeMount, 0, len(secretNames))
1607 for _, name := range secretNames {
1608 volumes = append(volumes, v1.Volume{
1609 Name: name,
1610 VolumeSource: v1.VolumeSource{
1611 Secret: &v1.SecretVolumeSource{
1612 SecretName: name,
1613 },
1614 },
1615 })
1616 mounts = append(mounts, v1.VolumeMount{
1617 Name: name,
1618 MountPath: fmt.Sprintf("/%v", name),
1619 })
1620 }
1621
1622 template.Spec.Volumes = volumes
1623 template.Spec.Containers[0].VolumeMounts = mounts
1624 }
1625
1626 type ConfigMapConfig struct {
1627 Content map[string]string
1628 Client clientset.Interface
1629 Name string
1630 Namespace string
1631
1632 LogFunc func(fmt string, args ...interface{})
1633 }
1634
1635 func (config *ConfigMapConfig) Run() error {
1636 configMap := &v1.ConfigMap{
1637 ObjectMeta: metav1.ObjectMeta{
1638 Name: config.Name,
1639 },
1640 Data: map[string]string{},
1641 }
1642 for k, v := range config.Content {
1643 configMap.Data[k] = v
1644 }
1645
1646 if err := CreateConfigMapWithRetries(config.Client, config.Namespace, configMap); err != nil {
1647 return fmt.Errorf("error creating configmap: %v", err)
1648 }
1649 config.LogFunc("Created configmap %v/%v", config.Namespace, config.Name)
1650 return nil
1651 }
1652
1653 func (config *ConfigMapConfig) Stop() error {
1654 if err := DeleteResourceWithRetries(config.Client, api.Kind("ConfigMap"), config.Namespace, config.Name, metav1.DeleteOptions{}); err != nil {
1655 return fmt.Errorf("error deleting configmap: %v", err)
1656 }
1657 config.LogFunc("Deleted configmap %v/%v", config.Namespace, config.Name)
1658 return nil
1659 }
1660
1661
1662 func attachConfigMaps(template *v1.PodTemplateSpec, configMapNames []string) {
1663 volumes := make([]v1.Volume, 0, len(configMapNames))
1664 mounts := make([]v1.VolumeMount, 0, len(configMapNames))
1665 for _, name := range configMapNames {
1666 volumes = append(volumes, v1.Volume{
1667 Name: name,
1668 VolumeSource: v1.VolumeSource{
1669 ConfigMap: &v1.ConfigMapVolumeSource{
1670 LocalObjectReference: v1.LocalObjectReference{
1671 Name: name,
1672 },
1673 },
1674 },
1675 })
1676 mounts = append(mounts, v1.VolumeMount{
1677 Name: name,
1678 MountPath: fmt.Sprintf("/%v", name),
1679 })
1680 }
1681
1682 template.Spec.Volumes = volumes
1683 template.Spec.Containers[0].VolumeMounts = mounts
1684 }
1685
1686 func (config *RCConfig) getTerminationGracePeriodSeconds(defaultGrace *int64) *int64 {
1687 if config.TerminationGracePeriodSeconds == nil || *config.TerminationGracePeriodSeconds < 0 {
1688 return defaultGrace
1689 }
1690 return config.TerminationGracePeriodSeconds
1691 }
1692
1693 func attachServiceAccountTokenProjection(template *v1.PodTemplateSpec, name string) {
1694 template.Spec.Containers[0].VolumeMounts = append(template.Spec.Containers[0].VolumeMounts,
1695 v1.VolumeMount{
1696 Name: name,
1697 MountPath: "/var/service-account-tokens/" + name,
1698 })
1699
1700 template.Spec.Volumes = append(template.Spec.Volumes,
1701 v1.Volume{
1702 Name: name,
1703 VolumeSource: v1.VolumeSource{
1704 Projected: &v1.ProjectedVolumeSource{
1705 Sources: []v1.VolumeProjection{
1706 {
1707 ServiceAccountToken: &v1.ServiceAccountTokenProjection{
1708 Path: "token",
1709 Audience: name,
1710 },
1711 },
1712 {
1713 ConfigMap: &v1.ConfigMapProjection{
1714 LocalObjectReference: v1.LocalObjectReference{
1715 Name: "kube-root-ca-crt",
1716 },
1717 Items: []v1.KeyToPath{
1718 {
1719 Key: "ca.crt",
1720 Path: "ca.crt",
1721 },
1722 },
1723 },
1724 },
1725 {
1726 DownwardAPI: &v1.DownwardAPIProjection{
1727 Items: []v1.DownwardAPIVolumeFile{
1728 {
1729 Path: "namespace",
1730 FieldRef: &v1.ObjectFieldSelector{
1731 APIVersion: "v1",
1732 FieldPath: "metadata.namespace",
1733 },
1734 },
1735 },
1736 },
1737 },
1738 },
1739 },
1740 },
1741 })
1742 }
1743
1744 type DaemonConfig struct {
1745 Client clientset.Interface
1746 Name string
1747 Namespace string
1748 Image string
1749
1750 LogFunc func(fmt string, args ...interface{})
1751
1752 Timeout time.Duration
1753 }
1754
1755 func (config *DaemonConfig) Run(ctx context.Context) error {
1756 if config.Image == "" {
1757 config.Image = "registry.k8s.io/pause:3.9"
1758 }
1759 nameLabel := map[string]string{
1760 "name": config.Name + "-daemon",
1761 }
1762 daemon := &apps.DaemonSet{
1763 ObjectMeta: metav1.ObjectMeta{
1764 Name: config.Name,
1765 },
1766 Spec: apps.DaemonSetSpec{
1767 Template: v1.PodTemplateSpec{
1768 ObjectMeta: metav1.ObjectMeta{
1769 Labels: nameLabel,
1770 },
1771 Spec: v1.PodSpec{
1772 Containers: []v1.Container{
1773 {
1774 Name: config.Name,
1775 Image: config.Image,
1776 },
1777 },
1778 },
1779 },
1780 },
1781 }
1782
1783 if err := CreateDaemonSetWithRetries(config.Client, config.Namespace, daemon); err != nil {
1784 return fmt.Errorf("error creating daemonset: %v", err)
1785 }
1786
1787 var nodes *v1.NodeList
1788 var err error
1789 for i := 0; i < retries; i++ {
1790
1791 nodes, err = config.Client.CoreV1().Nodes().List(ctx, metav1.ListOptions{ResourceVersion: "0"})
1792 if err == nil {
1793 break
1794 } else if i+1 == retries {
1795 return fmt.Errorf("error listing Nodes while waiting for DaemonSet %v: %v", config.Name, err)
1796 }
1797 }
1798
1799 timeout := config.Timeout
1800 if timeout <= 0 {
1801 timeout = 5 * time.Minute
1802 }
1803
1804 ps, err := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything())
1805 if err != nil {
1806 return err
1807 }
1808 defer ps.Stop()
1809
1810 err = wait.Poll(time.Second, timeout, func() (bool, error) {
1811 pods := ps.List()
1812
1813 nodeHasDaemon := sets.NewString()
1814 for _, pod := range pods {
1815 podReady, _ := PodRunningReady(pod)
1816 if pod.Spec.NodeName != "" && podReady {
1817 nodeHasDaemon.Insert(pod.Spec.NodeName)
1818 }
1819 }
1820
1821 running := len(nodeHasDaemon)
1822 config.LogFunc("Found %v/%v Daemons %v running", running, config.Name, len(nodes.Items))
1823 return running == len(nodes.Items), nil
1824 })
1825 if err != nil {
1826 config.LogFunc("Timed out while waiting for DaemonSet %v/%v to be running.", config.Namespace, config.Name)
1827 } else {
1828 config.LogFunc("Created Daemon %v/%v", config.Namespace, config.Name)
1829 }
1830
1831 return err
1832 }
1833
View as plain text