1
16
17 package wait
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "reflect"
25 "strings"
26 "time"
27
28 "github.com/spf13/cobra"
29
30 apierrors "k8s.io/apimachinery/pkg/api/errors"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
33 "k8s.io/apimachinery/pkg/fields"
34 "k8s.io/apimachinery/pkg/runtime"
35 "k8s.io/apimachinery/pkg/runtime/schema"
36 "k8s.io/apimachinery/pkg/types"
37 "k8s.io/apimachinery/pkg/util/wait"
38 "k8s.io/apimachinery/pkg/watch"
39 "k8s.io/cli-runtime/pkg/genericclioptions"
40 "k8s.io/cli-runtime/pkg/genericiooptions"
41 "k8s.io/cli-runtime/pkg/printers"
42 "k8s.io/cli-runtime/pkg/resource"
43 "k8s.io/client-go/dynamic"
44 "k8s.io/client-go/tools/cache"
45 watchtools "k8s.io/client-go/tools/watch"
46 "k8s.io/client-go/util/jsonpath"
47 cmdget "k8s.io/kubectl/pkg/cmd/get"
48 cmdutil "k8s.io/kubectl/pkg/cmd/util"
49 "k8s.io/kubectl/pkg/util/i18n"
50 "k8s.io/kubectl/pkg/util/interrupt"
51 "k8s.io/kubectl/pkg/util/templates"
52 )
53
54 var (
55 waitLong = templates.LongDesc(i18n.T(`
56 Experimental: Wait for a specific condition on one or many resources.
57
58 The command takes multiple resources and waits until the specified condition
59 is seen in the Status field of every given resource.
60
61 Alternatively, the command can wait for the given set of resources to be deleted
62 by providing the "delete" keyword as the value to the --for flag.
63
64 A successful message will be printed to stdout indicating when the specified
65 condition has been met. You can use -o option to change to output destination.`))
66
67 waitExample = templates.Examples(i18n.T(`
68 # Wait for the pod "busybox1" to contain the status condition of type "Ready"
69 kubectl wait --for=condition=Ready pod/busybox1
70
71 # The default value of status condition is true; you can wait for other targets after an equal delimiter (compared after Unicode simple case folding, which is a more general form of case-insensitivity)
72 kubectl wait --for=condition=Ready=false pod/busybox1
73
74 # Wait for the pod "busybox1" to contain the status phase to be "Running"
75 kubectl wait --for=jsonpath='{.status.phase}'=Running pod/busybox1
76
77 # Wait for pod "busybox1" to be Ready
78 kubectl wait --for='jsonpath={.status.conditions[?(@.type=="Ready")].status}=True' pod/busybox1
79
80 # Wait for the service "loadbalancer" to have ingress.
81 kubectl wait --for=jsonpath='{.status.loadBalancer.ingress}' service/loadbalancer
82
83 # Wait for the pod "busybox1" to be deleted, with a timeout of 60s, after having issued the "delete" command
84 kubectl delete pod/busybox1
85 kubectl wait --for=delete pod/busybox1 --timeout=60s`))
86 )
87
88
89 var errNoMatchingResources = errors.New("no matching resources found")
90
91
92
93
94 type WaitFlags struct {
95 RESTClientGetter genericclioptions.RESTClientGetter
96 PrintFlags *genericclioptions.PrintFlags
97 ResourceBuilderFlags *genericclioptions.ResourceBuilderFlags
98
99 Timeout time.Duration
100 ForCondition string
101
102 genericiooptions.IOStreams
103 }
104
105
106 func NewWaitFlags(restClientGetter genericclioptions.RESTClientGetter, streams genericiooptions.IOStreams) *WaitFlags {
107 return &WaitFlags{
108 RESTClientGetter: restClientGetter,
109 PrintFlags: genericclioptions.NewPrintFlags("condition met"),
110 ResourceBuilderFlags: genericclioptions.NewResourceBuilderFlags().
111 WithLabelSelector("").
112 WithFieldSelector("").
113 WithAll(false).
114 WithAllNamespaces(false).
115 WithLocal(false).
116 WithLatest(),
117
118 Timeout: 30 * time.Second,
119
120 IOStreams: streams,
121 }
122 }
123
124
125 func NewCmdWait(restClientGetter genericclioptions.RESTClientGetter, streams genericiooptions.IOStreams) *cobra.Command {
126 flags := NewWaitFlags(restClientGetter, streams)
127
128 cmd := &cobra.Command{
129 Use: "wait ([-f FILENAME] | resource.group/resource.name | resource.group [(-l label | --all)]) [--for=delete|--for condition=available|--for=jsonpath='{}'[=value]]",
130 Short: i18n.T("Experimental: Wait for a specific condition on one or many resources"),
131 Long: waitLong,
132 Example: waitExample,
133
134 DisableFlagsInUseLine: true,
135 Run: func(cmd *cobra.Command, args []string) {
136 o, err := flags.ToOptions(args)
137 cmdutil.CheckErr(err)
138 cmdutil.CheckErr(o.RunWait())
139 },
140 SuggestFor: []string{"list", "ps"},
141 }
142
143 flags.AddFlags(cmd)
144
145 return cmd
146 }
147
148
149 func (flags *WaitFlags) AddFlags(cmd *cobra.Command) {
150 flags.PrintFlags.AddFlags(cmd)
151 flags.ResourceBuilderFlags.AddFlags(cmd.Flags())
152
153 cmd.Flags().DurationVar(&flags.Timeout, "timeout", flags.Timeout, "The length of time to wait before giving up. Zero means check once and don't wait, negative means wait for a week.")
154 cmd.Flags().StringVar(&flags.ForCondition, "for", flags.ForCondition, "The condition to wait on: [delete|condition=condition-name[=condition-value]|jsonpath='{JSONPath expression}'=[JSONPath value]]. The default condition-value is true. Condition values are compared after Unicode simple case folding, which is a more general form of case-insensitivity.")
155 }
156
157
158 func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) {
159 printer, err := flags.PrintFlags.ToPrinter()
160 if err != nil {
161 return nil, err
162 }
163 builder := flags.ResourceBuilderFlags.ToBuilder(flags.RESTClientGetter, args)
164 clientConfig, err := flags.RESTClientGetter.ToRESTConfig()
165 if err != nil {
166 return nil, err
167 }
168 dynamicClient, err := dynamic.NewForConfig(clientConfig)
169 if err != nil {
170 return nil, err
171 }
172 conditionFn, err := conditionFuncFor(flags.ForCondition, flags.ErrOut)
173 if err != nil {
174 return nil, err
175 }
176
177 effectiveTimeout := flags.Timeout
178 if effectiveTimeout < 0 {
179 effectiveTimeout = 168 * time.Hour
180 }
181
182 o := &WaitOptions{
183 ResourceFinder: builder,
184 DynamicClient: dynamicClient,
185 Timeout: effectiveTimeout,
186 ForCondition: flags.ForCondition,
187
188 Printer: printer,
189 ConditionFn: conditionFn,
190 IOStreams: flags.IOStreams,
191 }
192
193 return o, nil
194 }
195
196 func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) {
197 if strings.ToLower(condition) == "delete" {
198 return IsDeleted, nil
199 }
200 if strings.HasPrefix(condition, "condition=") {
201 conditionName := condition[len("condition="):]
202 conditionValue := "true"
203 if equalsIndex := strings.Index(conditionName, "="); equalsIndex != -1 {
204 conditionValue = conditionName[equalsIndex+1:]
205 conditionName = conditionName[0:equalsIndex]
206 }
207
208 return ConditionalWait{
209 conditionName: conditionName,
210 conditionStatus: conditionValue,
211 errOut: errOut,
212 }.IsConditionMet, nil
213 }
214 if strings.HasPrefix(condition, "jsonpath=") {
215 jsonPathInput := strings.TrimPrefix(condition, "jsonpath=")
216 jsonPathExp, jsonPathValue, err := processJSONPathInput(jsonPathInput)
217 if err != nil {
218 return nil, err
219 }
220 j, err := newJSONPathParser(jsonPathExp)
221 if err != nil {
222 return nil, err
223 }
224 return JSONPathWait{
225 matchAnyValue: jsonPathValue == "",
226 jsonPathValue: jsonPathValue,
227 jsonPathParser: j,
228 errOut: errOut,
229 }.IsJSONPathConditionMet, nil
230 }
231
232 return nil, fmt.Errorf("unrecognized condition: %q", condition)
233 }
234
235
236 func newJSONPathParser(jsonPathExpression string) (*jsonpath.JSONPath, error) {
237 j := jsonpath.New("wait").AllowMissingKeys(true)
238 if jsonPathExpression == "" {
239 return nil, errors.New("jsonpath expression cannot be empty")
240 }
241 if err := j.Parse(jsonPathExpression); err != nil {
242 return nil, err
243 }
244 return j, nil
245 }
246
247
248
249 func processJSONPathInput(input string) (string, string, error) {
250 jsonPathInput := splitJSONPathInput(input)
251 if numOfArgs := len(jsonPathInput); numOfArgs < 1 || numOfArgs > 2 {
252 return "", "", fmt.Errorf("jsonpath wait format must be --for=jsonpath='{.status.readyReplicas}'=3 or --for=jsonpath='{.status.readyReplicas}'")
253 }
254 relaxedJSONPathExp, err := cmdget.RelaxedJSONPathExpression(jsonPathInput[0])
255 if err != nil {
256 return "", "", err
257 }
258 if len(jsonPathInput) == 1 {
259 return relaxedJSONPathExp, "", nil
260 }
261 jsonPathValue := strings.Trim(jsonPathInput[1], `'"`)
262 if jsonPathValue == "" {
263 return "", "", errors.New("jsonpath wait has to have a value after equal sign, like --for=jsonpath='{.status.readyReplicas}'=3")
264 }
265 return relaxedJSONPathExp, jsonPathValue, nil
266 }
267
268
269
270 func splitJSONPathInput(input string) []string {
271 var output []string
272 var element strings.Builder
273 for i := 0; i < len(input); i++ {
274 if input[i] == '=' {
275 if i < len(input)-1 && input[i+1] == '=' {
276 element.WriteString("==")
277 i++
278 continue
279 }
280 output = append(output, element.String())
281 element.Reset()
282 continue
283 }
284 element.WriteByte(input[i])
285 }
286 return append(output, element.String())
287 }
288
289
290 type ResourceLocation struct {
291 GroupResource schema.GroupResource
292 Namespace string
293 Name string
294 }
295
296
297 type UIDMap map[ResourceLocation]types.UID
298
299
300
301 type WaitOptions struct {
302 ResourceFinder genericclioptions.ResourceFinder
303
304
305 UIDMap UIDMap
306 DynamicClient dynamic.Interface
307 Timeout time.Duration
308 ForCondition string
309
310 Printer printers.ResourcePrinter
311 ConditionFn ConditionFunc
312 genericiooptions.IOStreams
313 }
314
315
316 type ConditionFunc func(ctx context.Context, info *resource.Info, o *WaitOptions) (finalObject runtime.Object, done bool, err error)
317
318
319 func (o *WaitOptions) RunWait() error {
320 ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
321 defer cancel()
322
323 visitCount := 0
324 visitFunc := func(info *resource.Info, err error) error {
325 if err != nil {
326 return err
327 }
328
329 visitCount++
330 finalObject, success, err := o.ConditionFn(ctx, info, o)
331 if success {
332 o.Printer.PrintObj(finalObject, o.Out)
333 return nil
334 }
335 if err == nil {
336 return fmt.Errorf("%v unsatisified for unknown reason", finalObject)
337 }
338 return err
339 }
340 visitor := o.ResourceFinder.Do()
341 isForDelete := strings.ToLower(o.ForCondition) == "delete"
342 if visitor, ok := visitor.(*resource.Result); ok && isForDelete {
343 visitor.IgnoreErrors(apierrors.IsNotFound)
344 }
345
346 err := visitor.Visit(visitFunc)
347 if err != nil {
348 return err
349 }
350 if visitCount == 0 && !isForDelete {
351 return errNoMatchingResources
352 }
353 return err
354 }
355
356
357 func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
358 if len(info.Name) == 0 {
359 return info.Object, false, fmt.Errorf("resource name must be provided")
360 }
361
362 gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
363 if apierrors.IsNotFound(initObjGetErr) {
364 return info.Object, true, nil
365 }
366 if initObjGetErr != nil {
367
368 return info.Object, false, initObjGetErr
369 }
370 resourceLocation := ResourceLocation{
371 GroupResource: info.Mapping.Resource.GroupResource(),
372 Namespace: gottenObj.GetNamespace(),
373 Name: gottenObj.GetName(),
374 }
375 if uid, ok := o.UIDMap[resourceLocation]; ok {
376 if gottenObj.GetUID() != uid {
377 return gottenObj, true, nil
378 }
379 }
380
381 endTime := time.Now().Add(o.Timeout)
382 timeout := time.Until(endTime)
383 errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
384 if o.Timeout == 0 {
385
386 if gottenObj == nil {
387 return nil, true, nil
388 }
389 return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
390 }
391 if timeout < 0 {
392
393 return info.Object, false, errWaitTimeoutWithName
394 }
395
396 fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
397 lw := &cache.ListWatch{
398 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
399 options.FieldSelector = fieldSelector
400 return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
401 },
402 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
403 options.FieldSelector = fieldSelector
404 return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
405 },
406 }
407
408
409 preconditionFunc := func(store cache.Store) (bool, error) {
410 _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
411 if err != nil {
412 return true, err
413 }
414 if !exists {
415
416 return true, nil
417 }
418
419 return false, nil
420 }
421
422 intrCtx, cancel := context.WithCancel(ctx)
423 defer cancel()
424 intr := interrupt.New(nil, cancel)
425 err := intr.Run(func() error {
426 _, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted)
427 if errors.Is(err, context.DeadlineExceeded) {
428 return errWaitTimeoutWithName
429 }
430 return err
431 })
432 if err != nil {
433 if err == wait.ErrWaitTimeout {
434 return gottenObj, false, errWaitTimeoutWithName
435 }
436 return gottenObj, false, err
437 }
438
439 return gottenObj, true, nil
440 }
441
442
443 type Wait struct {
444 errOut io.Writer
445 }
446
447
448 func (w Wait) IsDeleted(event watch.Event) (bool, error) {
449 switch event.Type {
450 case watch.Error:
451
452
453 err := apierrors.FromObject(event.Object)
454 fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
455 return false, nil
456 case watch.Deleted:
457 return true, nil
458 default:
459 return false, nil
460 }
461 }
462
463 type isCondMetFunc func(event watch.Event) (bool, error)
464 type checkCondFunc func(obj *unstructured.Unstructured) (bool, error)
465
466
467
468 func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
469 if len(info.Name) == 0 {
470 return info.Object, false, fmt.Errorf("resource name must be provided")
471 }
472
473 endTime := time.Now().Add(o.Timeout)
474 timeout := time.Until(endTime)
475 errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
476 if o.Timeout == 0 {
477
478 gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
479 if initObjGetErr != nil {
480 return nil, false, initObjGetErr
481 }
482 if gottenObj == nil {
483 return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName())
484 }
485 conditionCheck, err := check(gottenObj)
486 if err != nil {
487 return gottenObj, false, err
488 }
489 if conditionCheck == false {
490 return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
491 }
492 return gottenObj, true, nil
493 }
494 if timeout < 0 {
495
496 return info.Object, false, errWaitTimeoutWithName
497 }
498
499 mapping := info.ResourceMapping()
500 fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
501 lw := &cache.ListWatch{
502 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
503 options.FieldSelector = fieldSelector
504 return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
505 },
506 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
507 options.FieldSelector = fieldSelector
508 return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
509 },
510 }
511
512
513 preconditionFunc := func(store cache.Store) (bool, error) {
514 _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
515 if err != nil {
516 return true, err
517 }
518 if !exists {
519 return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name)
520 }
521
522 return false, nil
523 }
524
525 intrCtx, cancel := context.WithCancel(ctx)
526 defer cancel()
527 var result runtime.Object
528 intr := interrupt.New(nil, cancel)
529 err := intr.Run(func() error {
530 ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet))
531 if ev != nil {
532 result = ev.Object
533 }
534 if errors.Is(err, context.DeadlineExceeded) {
535 return errWaitTimeoutWithName
536 }
537 return err
538 })
539 if err != nil {
540 if err == wait.ErrWaitTimeout {
541 return result, false, errWaitTimeoutWithName
542 }
543 return result, false, err
544 }
545
546 return result, true, nil
547 }
548
549
550 type ConditionalWait struct {
551 conditionName string
552 conditionStatus string
553
554 errOut io.Writer
555 }
556
557
558 func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
559 return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition)
560 }
561
562 func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
563 conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
564 if err != nil {
565 return false, err
566 }
567 if !found {
568 return false, nil
569 }
570 for _, conditionUncast := range conditions {
571 condition := conditionUncast.(map[string]interface{})
572 name, found, err := unstructured.NestedString(condition, "type")
573 if !found || err != nil || !strings.EqualFold(name, w.conditionName) {
574 continue
575 }
576 status, found, err := unstructured.NestedString(condition, "status")
577 if !found || err != nil {
578 continue
579 }
580 generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation")
581 if found {
582 observedGeneration, found := getObservedGeneration(obj, condition)
583 if found && observedGeneration < generation {
584 return false, nil
585 }
586 }
587 return strings.EqualFold(status, w.conditionStatus), nil
588 }
589
590 return false, nil
591 }
592
593 func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
594 if event.Type == watch.Error {
595
596
597 err := apierrors.FromObject(event.Object)
598 fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
599 return false, nil
600 }
601 if event.Type == watch.Deleted {
602
603 return false, nil
604 }
605 obj := event.Object.(*unstructured.Unstructured)
606 return w.checkCondition(obj)
607 }
608
609 func extendErrWaitTimeout(err error, info *resource.Info) error {
610 return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name)
611 }
612
613 func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) {
614 conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration")
615 if found {
616 return conditionObservedGeneration, true
617 }
618 statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration")
619 return statusObservedGeneration, found
620 }
621
622
623
624 type JSONPathWait struct {
625 matchAnyValue bool
626 jsonPathValue string
627 jsonPathParser *jsonpath.JSONPath
628
629 errOut io.Writer
630 }
631
632
633 func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
634 return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition)
635 }
636
637
638
639 func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) {
640 if event.Type == watch.Error {
641
642
643 err := apierrors.FromObject(event.Object)
644 fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
645 return false, nil
646 }
647 if event.Type == watch.Deleted {
648
649 return false, nil
650 }
651
652
653 obj := event.Object.(*unstructured.Unstructured)
654 return j.checkCondition(obj)
655 }
656
657
658
659 func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
660 queryObj := obj.UnstructuredContent()
661 parseResults, err := j.jsonPathParser.FindResults(queryObj)
662 if err != nil {
663 return false, err
664 }
665 if len(parseResults) == 0 || len(parseResults[0]) == 0 {
666 return false, nil
667 }
668 if err := verifyParsedJSONPath(parseResults); err != nil {
669 return false, err
670 }
671 if j.matchAnyValue {
672 return true, nil
673 }
674 isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathValue)
675 if err != nil {
676 return false, err
677 }
678 return isConditionMet, nil
679 }
680
681
682
683 func verifyParsedJSONPath(results [][]reflect.Value) error {
684 if len(results) > 1 {
685 return errors.New("given jsonpath expression matches more than one list")
686 }
687 if len(results[0]) > 1 {
688 return errors.New("given jsonpath expression matches more than one value")
689 }
690 return nil
691 }
692
693
694
695
696
697
698
699
700 func compareResults(r reflect.Value, expectedVal string) (bool, error) {
701 switch r.Interface().(type) {
702 case map[string]interface{}, []interface{}:
703 return false, errors.New("jsonpath leads to a nested object or list which is not supported")
704 }
705 s := fmt.Sprintf("%v", r.Interface())
706 return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil
707 }
708
View as plain text