1
16
17 package kube
18
19 import (
20 "bytes"
21 "context"
22 "encoding/json"
23 "fmt"
24 "io"
25 "os"
26 "path/filepath"
27 "reflect"
28 "strings"
29 "sync"
30 "time"
31
32 jsonpatch "github.com/evanphx/json-patch"
33 "github.com/pkg/errors"
34 batch "k8s.io/api/batch/v1"
35 v1 "k8s.io/api/core/v1"
36 apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
37 apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
38 apierrors "k8s.io/apimachinery/pkg/api/errors"
39
40 multierror "github.com/hashicorp/go-multierror"
41 "k8s.io/apimachinery/pkg/api/meta"
42 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
43 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
44 metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
45 "k8s.io/apimachinery/pkg/fields"
46 "k8s.io/apimachinery/pkg/labels"
47 "k8s.io/apimachinery/pkg/runtime"
48 "k8s.io/apimachinery/pkg/types"
49 "k8s.io/apimachinery/pkg/util/strategicpatch"
50 "k8s.io/apimachinery/pkg/watch"
51 "k8s.io/cli-runtime/pkg/genericclioptions"
52 "k8s.io/cli-runtime/pkg/resource"
53 "k8s.io/client-go/kubernetes"
54 "k8s.io/client-go/kubernetes/scheme"
55 "k8s.io/client-go/rest"
56 cachetools "k8s.io/client-go/tools/cache"
57 watchtools "k8s.io/client-go/tools/watch"
58 cmdutil "k8s.io/kubectl/pkg/cmd/util"
59 )
60
61
62 var ErrNoObjectsVisited = errors.New("no objects visited")
63
64 var metadataAccessor = meta.NewAccessor()
65
66
67
68 var ManagedFieldsManager string
69
70
71 type Client struct {
72
73
74
75
76
77
78
79
80 Factory Factory
81 Log func(string, ...interface{})
82
83 Namespace string
84
85 kubeClient *kubernetes.Clientset
86 }
87
88 var addToScheme sync.Once
89
90
91 func New(getter genericclioptions.RESTClientGetter) *Client {
92 if getter == nil {
93 getter = genericclioptions.NewConfigFlags(true)
94 }
95
96 addToScheme.Do(func() {
97 if err := apiextv1.AddToScheme(scheme.Scheme); err != nil {
98
99 panic(err)
100 }
101 if err := apiextv1beta1.AddToScheme(scheme.Scheme); err != nil {
102 panic(err)
103 }
104 })
105 return &Client{
106 Factory: cmdutil.NewFactory(getter),
107 Log: nopLogger,
108 }
109 }
110
111 var nopLogger = func(_ string, _ ...interface{}) {}
112
113
114 func (c *Client) getKubeClient() (*kubernetes.Clientset, error) {
115 var err error
116 if c.kubeClient == nil {
117 c.kubeClient, err = c.Factory.KubernetesClientSet()
118 }
119
120 return c.kubeClient, err
121 }
122
123
124 func (c *Client) IsReachable() error {
125 client, err := c.getKubeClient()
126 if err == genericclioptions.ErrEmptyConfig {
127
128
129 return errors.New("Kubernetes cluster unreachable")
130 }
131 if err != nil {
132 return errors.Wrap(err, "Kubernetes cluster unreachable")
133 }
134 if _, err := client.ServerVersion(); err != nil {
135 return errors.Wrap(err, "Kubernetes cluster unreachable")
136 }
137 return nil
138 }
139
140
141 func (c *Client) Create(resources ResourceList) (*Result, error) {
142 c.Log("creating %d resource(s)", len(resources))
143 if err := perform(resources, createResource); err != nil {
144 return nil, err
145 }
146 return &Result{Created: resources}, nil
147 }
148
149 func transformRequests(req *rest.Request) {
150 tableParam := strings.Join([]string{
151 fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1.SchemeGroupVersion.Version, metav1.GroupName),
152 fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1beta1.SchemeGroupVersion.Version, metav1beta1.GroupName),
153 "application/json",
154 }, ",")
155 req.SetHeader("Accept", tableParam)
156
157
158 req.Param("includeObject", "Object")
159 }
160
161
162
163
164 func (c *Client) Get(resources ResourceList, related bool) (map[string][]runtime.Object, error) {
165 buf := new(bytes.Buffer)
166 objs := make(map[string][]runtime.Object)
167
168 podSelectors := []map[string]string{}
169 err := resources.Visit(func(info *resource.Info, err error) error {
170 if err != nil {
171 return err
172 }
173
174 gvk := info.ResourceMapping().GroupVersionKind
175 vk := gvk.Version + "/" + gvk.Kind
176 obj, err := getResource(info)
177 if err != nil {
178 fmt.Fprintf(buf, "Get resource %s failed, err:%v\n", info.Name, err)
179 } else {
180 objs[vk] = append(objs[vk], obj)
181
182
183 if related {
184
185
186 objGVK := obj.GetObjectKind().GroupVersionKind()
187 var isTable bool
188 if objGVK.Kind == "Table" {
189 isTable = true
190 }
191
192 objs, err = c.getSelectRelationPod(info, objs, isTable, &podSelectors)
193 if err != nil {
194 c.Log("Warning: get the relation pod is failed, err:%s", err.Error())
195 }
196 }
197 }
198
199 return nil
200 })
201 if err != nil {
202 return nil, err
203 }
204
205 return objs, nil
206 }
207
208 func (c *Client) getSelectRelationPod(info *resource.Info, objs map[string][]runtime.Object, table bool, podSelectors *[]map[string]string) (map[string][]runtime.Object, error) {
209 if info == nil {
210 return objs, nil
211 }
212 c.Log("get relation pod of object: %s/%s/%s", info.Namespace, info.Mapping.GroupVersionKind.Kind, info.Name)
213 selector, ok, _ := getSelectorFromObject(info.Object)
214 if !ok {
215 return objs, nil
216 }
217
218 for index := range *podSelectors {
219 if reflect.DeepEqual((*podSelectors)[index], selector) {
220
221 return objs, nil
222 }
223 }
224
225 *podSelectors = append(*podSelectors, selector)
226
227 var infos []*resource.Info
228 var err error
229 if table {
230 infos, err = c.Factory.NewBuilder().
231 Unstructured().
232 ContinueOnError().
233 NamespaceParam(info.Namespace).
234 DefaultNamespace().
235 ResourceTypes("pods").
236 LabelSelector(labels.Set(selector).AsSelector().String()).
237 TransformRequests(transformRequests).
238 Do().Infos()
239 if err != nil {
240 return objs, err
241 }
242 } else {
243 infos, err = c.Factory.NewBuilder().
244 Unstructured().
245 ContinueOnError().
246 NamespaceParam(info.Namespace).
247 DefaultNamespace().
248 ResourceTypes("pods").
249 LabelSelector(labels.Set(selector).AsSelector().String()).
250 Do().Infos()
251 if err != nil {
252 return objs, err
253 }
254 }
255 vk := "v1/Pod(related)"
256
257 for _, info := range infos {
258 objs[vk] = append(objs[vk], info.Object)
259 }
260 return objs, nil
261 }
262
263 func getSelectorFromObject(obj runtime.Object) (map[string]string, bool, error) {
264 typed := obj.(*unstructured.Unstructured)
265 kind := typed.Object["kind"]
266 switch kind {
267 case "ReplicaSet", "Deployment", "StatefulSet", "DaemonSet", "Job":
268 return unstructured.NestedStringMap(typed.Object, "spec", "selector", "matchLabels")
269 case "ReplicationController":
270 return unstructured.NestedStringMap(typed.Object, "spec", "selector")
271 default:
272 return nil, false, nil
273 }
274 }
275
276 func getResource(info *resource.Info) (runtime.Object, error) {
277 obj, err := resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name)
278 if err != nil {
279 return nil, err
280 }
281 return obj, nil
282 }
283
284
285 func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
286 cs, err := c.getKubeClient()
287 if err != nil {
288 return err
289 }
290 checker := NewReadyChecker(cs, c.Log, PausedAsReady(true))
291 w := waiter{
292 c: checker,
293 log: c.Log,
294 timeout: timeout,
295 }
296 return w.waitForResources(resources)
297 }
298
299
300 func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error {
301 cs, err := c.getKubeClient()
302 if err != nil {
303 return err
304 }
305 checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true))
306 w := waiter{
307 c: checker,
308 log: c.Log,
309 timeout: timeout,
310 }
311 return w.waitForResources(resources)
312 }
313
314
315 func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) error {
316 w := waiter{
317 log: c.Log,
318 timeout: timeout,
319 }
320 return w.waitForDeletedResources(resources)
321 }
322
323 func (c *Client) namespace() string {
324 if c.Namespace != "" {
325 return c.Namespace
326 }
327 if ns, _, err := c.Factory.ToRawKubeConfigLoader().Namespace(); err == nil {
328 return ns
329 }
330 return v1.NamespaceDefault
331 }
332
333
334 func (c *Client) newBuilder() *resource.Builder {
335 return c.Factory.NewBuilder().
336 ContinueOnError().
337 NamespaceParam(c.namespace()).
338 DefaultNamespace().
339 Flatten()
340 }
341
342
343 func (c *Client) Build(reader io.Reader, validate bool) (ResourceList, error) {
344 validationDirective := metav1.FieldValidationIgnore
345 if validate {
346 validationDirective = metav1.FieldValidationStrict
347 }
348
349 schema, err := c.Factory.Validator(validationDirective)
350 if err != nil {
351 return nil, err
352 }
353 result, err := c.newBuilder().
354 Unstructured().
355 Schema(schema).
356 Stream(reader, "").
357 Do().Infos()
358 return result, scrubValidationError(err)
359 }
360
361
362
363 func (c *Client) BuildTable(reader io.Reader, validate bool) (ResourceList, error) {
364 validationDirective := metav1.FieldValidationIgnore
365 if validate {
366 validationDirective = metav1.FieldValidationStrict
367 }
368
369 schema, err := c.Factory.Validator(validationDirective)
370 if err != nil {
371 return nil, err
372 }
373 result, err := c.newBuilder().
374 Unstructured().
375 Schema(schema).
376 Stream(reader, "").
377 TransformRequests(transformRequests).
378 Do().Infos()
379 return result, scrubValidationError(err)
380 }
381
382
383
384
385
386
387
388
389 func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) {
390 updateErrors := []string{}
391 res := &Result{}
392
393 c.Log("checking %d resources for changes", len(target))
394 err := target.Visit(func(info *resource.Info, err error) error {
395 if err != nil {
396 return err
397 }
398
399 helper := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager())
400 if _, err := helper.Get(info.Namespace, info.Name); err != nil {
401 if !apierrors.IsNotFound(err) {
402 return errors.Wrap(err, "could not get information about the resource")
403 }
404
405
406 res.Created = append(res.Created, info)
407
408
409 if err := createResource(info); err != nil {
410 return errors.Wrap(err, "failed to create resource")
411 }
412
413 kind := info.Mapping.GroupVersionKind.Kind
414 c.Log("Created a new %s called %q in %s\n", kind, info.Name, info.Namespace)
415 return nil
416 }
417
418 originalInfo := original.Get(info)
419 if originalInfo == nil {
420 kind := info.Mapping.GroupVersionKind.Kind
421 return errors.Errorf("no %s with the name %q found", kind, info.Name)
422 }
423
424 if err := updateResource(c, info, originalInfo.Object, force); err != nil {
425 c.Log("error updating the resource %q:\n\t %v", info.Name, err)
426 updateErrors = append(updateErrors, err.Error())
427 }
428
429 res.Updated = append(res.Updated, info)
430
431 return nil
432 })
433
434 switch {
435 case err != nil:
436 return res, err
437 case len(updateErrors) != 0:
438 return res, errors.Errorf(strings.Join(updateErrors, " && "))
439 }
440
441 for _, info := range original.Difference(target) {
442 c.Log("Deleting %s %q in namespace %s...", info.Mapping.GroupVersionKind.Kind, info.Name, info.Namespace)
443
444 if err := info.Get(); err != nil {
445 c.Log("Unable to get obj %q, err: %s", info.Name, err)
446 continue
447 }
448 annotations, err := metadataAccessor.Annotations(info.Object)
449 if err != nil {
450 c.Log("Unable to get annotations on %q, err: %s", info.Name, err)
451 }
452 if annotations != nil && annotations[ResourcePolicyAnno] == KeepPolicy {
453 c.Log("Skipping delete of %q due to annotation [%s=%s]", info.Name, ResourcePolicyAnno, KeepPolicy)
454 continue
455 }
456 if err := deleteResource(info, metav1.DeletePropagationBackground); err != nil {
457 c.Log("Failed to delete %q, err: %s", info.ObjectName(), err)
458 continue
459 }
460 res.Deleted = append(res.Deleted, info)
461 }
462 return res, nil
463 }
464
465
466
467
468
469 func (c *Client) Delete(resources ResourceList) (*Result, []error) {
470 return rdelete(c, resources, metav1.DeletePropagationBackground)
471 }
472
473
474
475
476
477 func (c *Client) DeleteWithPropagationPolicy(resources ResourceList, policy metav1.DeletionPropagation) (*Result, []error) {
478 return rdelete(c, resources, policy)
479 }
480
481 func rdelete(c *Client, resources ResourceList, propagation metav1.DeletionPropagation) (*Result, []error) {
482 var errs []error
483 res := &Result{}
484 mtx := sync.Mutex{}
485 err := perform(resources, func(info *resource.Info) error {
486 c.Log("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind)
487 err := deleteResource(info, propagation)
488 if err == nil || apierrors.IsNotFound(err) {
489 if err != nil {
490 c.Log("Ignoring delete failure for %q %s: %v", info.Name, info.Mapping.GroupVersionKind, err)
491 }
492 mtx.Lock()
493 defer mtx.Unlock()
494 res.Deleted = append(res.Deleted, info)
495 return nil
496 }
497 mtx.Lock()
498 defer mtx.Unlock()
499
500 errs = append(errs, err)
501 return nil
502 })
503 if err != nil {
504 if errors.Is(err, ErrNoObjectsVisited) {
505 err = fmt.Errorf("object not found, skipping delete: %w", err)
506 }
507 errs = append(errs, err)
508 }
509 if errs != nil {
510 return nil, errs
511 }
512 return res, nil
513 }
514
515 func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
516 return func(info *resource.Info) error {
517 return c.watchUntilReady(t, info)
518 }
519 }
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535 func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error {
536
537
538 return perform(resources, c.watchTimeout(timeout))
539 }
540
541 func perform(infos ResourceList, fn func(*resource.Info) error) error {
542 var result error
543
544 if len(infos) == 0 {
545 return ErrNoObjectsVisited
546 }
547
548 errs := make(chan error)
549 go batchPerform(infos, fn, errs)
550
551 for range infos {
552 err := <-errs
553 if err != nil {
554 result = multierror.Append(result, err)
555 }
556 }
557
558 return result
559 }
560
561
562
563 func getManagedFieldsManager() string {
564
565
566 if ManagedFieldsManager != "" {
567 return ManagedFieldsManager
568 }
569
570
571 if len(os.Args[0]) == 0 {
572 return "unknown"
573 }
574
575
576
577
578 return filepath.Base(os.Args[0])
579 }
580
581 func batchPerform(infos ResourceList, fn func(*resource.Info) error, errs chan<- error) {
582 var kind string
583 var wg sync.WaitGroup
584 for _, info := range infos {
585 currentKind := info.Object.GetObjectKind().GroupVersionKind().Kind
586 if kind != currentKind {
587 wg.Wait()
588 kind = currentKind
589 }
590 wg.Add(1)
591 go func(i *resource.Info) {
592 errs <- fn(i)
593 wg.Done()
594 }(info)
595 }
596 }
597
598 func createResource(info *resource.Info) error {
599 obj, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).Create(info.Namespace, true, info.Object)
600 if err != nil {
601 return err
602 }
603 return info.Refresh(obj, true)
604 }
605
606 func deleteResource(info *resource.Info, policy metav1.DeletionPropagation) error {
607 opts := &metav1.DeleteOptions{PropagationPolicy: &policy}
608 _, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).DeleteWithOptions(info.Namespace, info.Name, opts)
609 return err
610 }
611
612 func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.PatchType, error) {
613 oldData, err := json.Marshal(current)
614 if err != nil {
615 return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing current configuration")
616 }
617 newData, err := json.Marshal(target.Object)
618 if err != nil {
619 return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing target configuration")
620 }
621
622
623 helper := resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager())
624 currentObj, err := helper.Get(target.Namespace, target.Name)
625 if err != nil && !apierrors.IsNotFound(err) {
626 return nil, types.StrategicMergePatchType, errors.Wrapf(err, "unable to get data for current object %s/%s", target.Namespace, target.Name)
627 }
628
629
630 currentData, err := json.Marshal(currentObj)
631 if err != nil {
632 return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing live configuration")
633 }
634
635
636 versionedObject := AsVersioned(target)
637
638
639
640
641
642 _, isUnstructured := versionedObject.(runtime.Unstructured)
643
644
645 _, isCRD := versionedObject.(*apiextv1beta1.CustomResourceDefinition)
646
647 if isUnstructured || isCRD {
648
649 patch, err := jsonpatch.CreateMergePatch(oldData, newData)
650 return patch, types.MergePatchType, err
651 }
652
653 patchMeta, err := strategicpatch.NewPatchMetaFromStruct(versionedObject)
654 if err != nil {
655 return nil, types.StrategicMergePatchType, errors.Wrap(err, "unable to create patch metadata from object")
656 }
657
658 patch, err := strategicpatch.CreateThreeWayMergePatch(oldData, newData, currentData, patchMeta, true)
659 return patch, types.StrategicMergePatchType, err
660 }
661
662 func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool) error {
663 var (
664 obj runtime.Object
665 helper = resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager())
666 kind = target.Mapping.GroupVersionKind.Kind
667 )
668
669
670 if force {
671 var err error
672 obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object)
673 if err != nil {
674 return errors.Wrap(err, "failed to replace object")
675 }
676 c.Log("Replaced %q with kind %s for kind %s", target.Name, currentObj.GetObjectKind().GroupVersionKind().Kind, kind)
677 } else {
678 patch, patchType, err := createPatch(target, currentObj)
679 if err != nil {
680 return errors.Wrap(err, "failed to create patch")
681 }
682
683 if patch == nil || string(patch) == "{}" {
684 c.Log("Looks like there are no changes for %s %q", kind, target.Name)
685
686
687 if err := target.Get(); err != nil {
688 return errors.Wrap(err, "failed to refresh resource information")
689 }
690 return nil
691 }
692
693 c.Log("Patch %s %q in namespace %s", kind, target.Name, target.Namespace)
694 obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil)
695 if err != nil {
696 return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, kind)
697 }
698 }
699
700 target.Refresh(obj, true)
701 return nil
702 }
703
704 func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error {
705 kind := info.Mapping.GroupVersionKind.Kind
706 switch kind {
707 case "Job", "Pod":
708 default:
709 return nil
710 }
711
712 c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout)
713
714
715
716 selector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s", info.Name))
717 if err != nil {
718 return err
719 }
720 lw := cachetools.NewListWatchFromClient(info.Client, info.Mapping.Resource.Resource, info.Namespace, selector)
721
722
723
724
725
726
727
728 ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
729 defer cancel()
730 _, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, nil, func(e watch.Event) (bool, error) {
731
732
733 obj := convertWithMapper(e.Object, info.Mapping)
734 switch e.Type {
735 case watch.Added, watch.Modified:
736
737
738
739
740 c.Log("Add/Modify event for %s: %v", info.Name, e.Type)
741 switch kind {
742 case "Job":
743 return c.waitForJob(obj, info.Name)
744 case "Pod":
745 return c.waitForPodSuccess(obj, info.Name)
746 }
747 return true, nil
748 case watch.Deleted:
749 c.Log("Deleted event for %s", info.Name)
750 return true, nil
751 case watch.Error:
752
753 c.Log("Error event for %s", info.Name)
754 return true, errors.Errorf("failed to deploy %s", info.Name)
755 default:
756 return false, nil
757 }
758 })
759 return err
760 }
761
762
763
764
765 func (c *Client) waitForJob(obj runtime.Object, name string) (bool, error) {
766 o, ok := obj.(*batch.Job)
767 if !ok {
768 return true, errors.Errorf("expected %s to be a *batch.Job, got %T", name, obj)
769 }
770
771 for _, c := range o.Status.Conditions {
772 if c.Type == batch.JobComplete && c.Status == "True" {
773 return true, nil
774 } else if c.Type == batch.JobFailed && c.Status == "True" {
775 return true, errors.Errorf("job %s failed: %s", name, c.Reason)
776 }
777 }
778
779 c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded)
780 return false, nil
781 }
782
783
784
785
786 func (c *Client) waitForPodSuccess(obj runtime.Object, name string) (bool, error) {
787 o, ok := obj.(*v1.Pod)
788 if !ok {
789 return true, errors.Errorf("expected %s to be a *v1.Pod, got %T", name, obj)
790 }
791
792 switch o.Status.Phase {
793 case v1.PodSucceeded:
794 c.Log("Pod %s succeeded", o.Name)
795 return true, nil
796 case v1.PodFailed:
797 return true, errors.Errorf("pod %s failed", o.Name)
798 case v1.PodPending:
799 c.Log("Pod %s pending", o.Name)
800 case v1.PodRunning:
801 c.Log("Pod %s running", o.Name)
802 }
803
804 return false, nil
805 }
806
807
808 func scrubValidationError(err error) error {
809 if err == nil {
810 return nil
811 }
812 const stopValidateMessage = "if you choose to ignore these errors, turn validation off with --validate=false"
813
814 if strings.Contains(err.Error(), stopValidateMessage) {
815 return errors.New(strings.ReplaceAll(err.Error(), "; "+stopValidateMessage, ""))
816 }
817 return err
818 }
819
820
821
822 func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) {
823 client, err := c.getKubeClient()
824 if err != nil {
825 return v1.PodUnknown, err
826 }
827 to := int64(timeout)
828 watcher, err := client.CoreV1().Pods(c.namespace()).Watch(context.Background(), metav1.ListOptions{
829 FieldSelector: fmt.Sprintf("metadata.name=%s", name),
830 TimeoutSeconds: &to,
831 })
832 if err != nil {
833 return v1.PodUnknown, err
834 }
835
836 for event := range watcher.ResultChan() {
837 p, ok := event.Object.(*v1.Pod)
838 if !ok {
839 return v1.PodUnknown, fmt.Errorf("%s not a pod", name)
840 }
841 switch p.Status.Phase {
842 case v1.PodFailed:
843 return v1.PodFailed, nil
844 case v1.PodSucceeded:
845 return v1.PodSucceeded, nil
846 }
847 }
848
849 return v1.PodUnknown, err
850 }
851
View as plain text