...

Source file src/k8s.io/kubectl/pkg/cmd/wait/wait.go

Documentation: k8s.io/kubectl/pkg/cmd/wait

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package wait
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io"
    24  	"reflect"
    25  	"strings"
    26  	"time"
    27  
    28  	"github.com/spf13/cobra"
    29  
    30  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    33  	"k8s.io/apimachinery/pkg/fields"
    34  	"k8s.io/apimachinery/pkg/runtime"
    35  	"k8s.io/apimachinery/pkg/runtime/schema"
    36  	"k8s.io/apimachinery/pkg/types"
    37  	"k8s.io/apimachinery/pkg/util/wait"
    38  	"k8s.io/apimachinery/pkg/watch"
    39  	"k8s.io/cli-runtime/pkg/genericclioptions"
    40  	"k8s.io/cli-runtime/pkg/genericiooptions"
    41  	"k8s.io/cli-runtime/pkg/printers"
    42  	"k8s.io/cli-runtime/pkg/resource"
    43  	"k8s.io/client-go/dynamic"
    44  	"k8s.io/client-go/tools/cache"
    45  	watchtools "k8s.io/client-go/tools/watch"
    46  	"k8s.io/client-go/util/jsonpath"
    47  	cmdget "k8s.io/kubectl/pkg/cmd/get"
    48  	cmdutil "k8s.io/kubectl/pkg/cmd/util"
    49  	"k8s.io/kubectl/pkg/util/i18n"
    50  	"k8s.io/kubectl/pkg/util/interrupt"
    51  	"k8s.io/kubectl/pkg/util/templates"
    52  )
    53  
    54  var (
    55  	waitLong = templates.LongDesc(i18n.T(`
    56  		Experimental: Wait for a specific condition on one or many resources.
    57  
    58  		The command takes multiple resources and waits until the specified condition
    59  		is seen in the Status field of every given resource.
    60  
    61  		Alternatively, the command can wait for the given set of resources to be deleted
    62  		by providing the "delete" keyword as the value to the --for flag.
    63  
    64  		A successful message will be printed to stdout indicating when the specified
    65          condition has been met. You can use -o option to change to output destination.`))
    66  
    67  	waitExample = templates.Examples(i18n.T(`
    68  		# Wait for the pod "busybox1" to contain the status condition of type "Ready"
    69  		kubectl wait --for=condition=Ready pod/busybox1
    70  
    71  		# The default value of status condition is true; you can wait for other targets after an equal delimiter (compared after Unicode simple case folding, which is a more general form of case-insensitivity)
    72  		kubectl wait --for=condition=Ready=false pod/busybox1
    73  
    74  		# Wait for the pod "busybox1" to contain the status phase to be "Running"
    75  		kubectl wait --for=jsonpath='{.status.phase}'=Running pod/busybox1
    76  
    77  		# Wait for pod "busybox1" to be Ready
    78  		kubectl wait --for='jsonpath={.status.conditions[?(@.type=="Ready")].status}=True' pod/busybox1
    79  
    80  		# Wait for the service "loadbalancer" to have ingress.
    81  		kubectl wait --for=jsonpath='{.status.loadBalancer.ingress}' service/loadbalancer
    82  
    83  		# Wait for the pod "busybox1" to be deleted, with a timeout of 60s, after having issued the "delete" command
    84  		kubectl delete pod/busybox1
    85  		kubectl wait --for=delete pod/busybox1 --timeout=60s`))
    86  )
    87  
    88  // errNoMatchingResources is returned when there is no resources matching a query.
    89  var errNoMatchingResources = errors.New("no matching resources found")
    90  
    91  // WaitFlags directly reflect the information that CLI is gathering via flags.  They will be converted to Options, which
    92  // reflect the runtime requirements for the command.  This structure reduces the transformation to wiring and makes
    93  // the logic itself easy to unit test
    94  type WaitFlags struct {
    95  	RESTClientGetter     genericclioptions.RESTClientGetter
    96  	PrintFlags           *genericclioptions.PrintFlags
    97  	ResourceBuilderFlags *genericclioptions.ResourceBuilderFlags
    98  
    99  	Timeout      time.Duration
   100  	ForCondition string
   101  
   102  	genericiooptions.IOStreams
   103  }
   104  
   105  // NewWaitFlags returns a default WaitFlags
   106  func NewWaitFlags(restClientGetter genericclioptions.RESTClientGetter, streams genericiooptions.IOStreams) *WaitFlags {
   107  	return &WaitFlags{
   108  		RESTClientGetter: restClientGetter,
   109  		PrintFlags:       genericclioptions.NewPrintFlags("condition met"),
   110  		ResourceBuilderFlags: genericclioptions.NewResourceBuilderFlags().
   111  			WithLabelSelector("").
   112  			WithFieldSelector("").
   113  			WithAll(false).
   114  			WithAllNamespaces(false).
   115  			WithLocal(false).
   116  			WithLatest(),
   117  
   118  		Timeout: 30 * time.Second,
   119  
   120  		IOStreams: streams,
   121  	}
   122  }
   123  
   124  // NewCmdWait returns a cobra command for waiting
   125  func NewCmdWait(restClientGetter genericclioptions.RESTClientGetter, streams genericiooptions.IOStreams) *cobra.Command {
   126  	flags := NewWaitFlags(restClientGetter, streams)
   127  
   128  	cmd := &cobra.Command{
   129  		Use:     "wait ([-f FILENAME] | resource.group/resource.name | resource.group [(-l label | --all)]) [--for=delete|--for condition=available|--for=jsonpath='{}'[=value]]",
   130  		Short:   i18n.T("Experimental: Wait for a specific condition on one or many resources"),
   131  		Long:    waitLong,
   132  		Example: waitExample,
   133  
   134  		DisableFlagsInUseLine: true,
   135  		Run: func(cmd *cobra.Command, args []string) {
   136  			o, err := flags.ToOptions(args)
   137  			cmdutil.CheckErr(err)
   138  			cmdutil.CheckErr(o.RunWait())
   139  		},
   140  		SuggestFor: []string{"list", "ps"},
   141  	}
   142  
   143  	flags.AddFlags(cmd)
   144  
   145  	return cmd
   146  }
   147  
   148  // AddFlags registers flags for a cli
   149  func (flags *WaitFlags) AddFlags(cmd *cobra.Command) {
   150  	flags.PrintFlags.AddFlags(cmd)
   151  	flags.ResourceBuilderFlags.AddFlags(cmd.Flags())
   152  
   153  	cmd.Flags().DurationVar(&flags.Timeout, "timeout", flags.Timeout, "The length of time to wait before giving up.  Zero means check once and don't wait, negative means wait for a week.")
   154  	cmd.Flags().StringVar(&flags.ForCondition, "for", flags.ForCondition, "The condition to wait on: [delete|condition=condition-name[=condition-value]|jsonpath='{JSONPath expression}'=[JSONPath value]]. The default condition-value is true.  Condition values are compared after Unicode simple case folding, which is a more general form of case-insensitivity.")
   155  }
   156  
   157  // ToOptions converts from CLI inputs to runtime inputs
   158  func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) {
   159  	printer, err := flags.PrintFlags.ToPrinter()
   160  	if err != nil {
   161  		return nil, err
   162  	}
   163  	builder := flags.ResourceBuilderFlags.ToBuilder(flags.RESTClientGetter, args)
   164  	clientConfig, err := flags.RESTClientGetter.ToRESTConfig()
   165  	if err != nil {
   166  		return nil, err
   167  	}
   168  	dynamicClient, err := dynamic.NewForConfig(clientConfig)
   169  	if err != nil {
   170  		return nil, err
   171  	}
   172  	conditionFn, err := conditionFuncFor(flags.ForCondition, flags.ErrOut)
   173  	if err != nil {
   174  		return nil, err
   175  	}
   176  
   177  	effectiveTimeout := flags.Timeout
   178  	if effectiveTimeout < 0 {
   179  		effectiveTimeout = 168 * time.Hour
   180  	}
   181  
   182  	o := &WaitOptions{
   183  		ResourceFinder: builder,
   184  		DynamicClient:  dynamicClient,
   185  		Timeout:        effectiveTimeout,
   186  		ForCondition:   flags.ForCondition,
   187  
   188  		Printer:     printer,
   189  		ConditionFn: conditionFn,
   190  		IOStreams:   flags.IOStreams,
   191  	}
   192  
   193  	return o, nil
   194  }
   195  
   196  func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) {
   197  	if strings.ToLower(condition) == "delete" {
   198  		return IsDeleted, nil
   199  	}
   200  	if strings.HasPrefix(condition, "condition=") {
   201  		conditionName := condition[len("condition="):]
   202  		conditionValue := "true"
   203  		if equalsIndex := strings.Index(conditionName, "="); equalsIndex != -1 {
   204  			conditionValue = conditionName[equalsIndex+1:]
   205  			conditionName = conditionName[0:equalsIndex]
   206  		}
   207  
   208  		return ConditionalWait{
   209  			conditionName:   conditionName,
   210  			conditionStatus: conditionValue,
   211  			errOut:          errOut,
   212  		}.IsConditionMet, nil
   213  	}
   214  	if strings.HasPrefix(condition, "jsonpath=") {
   215  		jsonPathInput := strings.TrimPrefix(condition, "jsonpath=")
   216  		jsonPathExp, jsonPathValue, err := processJSONPathInput(jsonPathInput)
   217  		if err != nil {
   218  			return nil, err
   219  		}
   220  		j, err := newJSONPathParser(jsonPathExp)
   221  		if err != nil {
   222  			return nil, err
   223  		}
   224  		return JSONPathWait{
   225  			matchAnyValue:  jsonPathValue == "",
   226  			jsonPathValue:  jsonPathValue,
   227  			jsonPathParser: j,
   228  			errOut:         errOut,
   229  		}.IsJSONPathConditionMet, nil
   230  	}
   231  
   232  	return nil, fmt.Errorf("unrecognized condition: %q", condition)
   233  }
   234  
   235  // newJSONPathParser will create a new JSONPath parser based on the jsonPathExpression
   236  func newJSONPathParser(jsonPathExpression string) (*jsonpath.JSONPath, error) {
   237  	j := jsonpath.New("wait").AllowMissingKeys(true)
   238  	if jsonPathExpression == "" {
   239  		return nil, errors.New("jsonpath expression cannot be empty")
   240  	}
   241  	if err := j.Parse(jsonPathExpression); err != nil {
   242  		return nil, err
   243  	}
   244  	return j, nil
   245  }
   246  
   247  // processJSONPathInput will parse and process the provided JSONPath input containing a JSON expression and optionally
   248  // a value for the matching condition.
   249  func processJSONPathInput(input string) (string, string, error) {
   250  	jsonPathInput := splitJSONPathInput(input)
   251  	if numOfArgs := len(jsonPathInput); numOfArgs < 1 || numOfArgs > 2 {
   252  		return "", "", fmt.Errorf("jsonpath wait format must be --for=jsonpath='{.status.readyReplicas}'=3 or --for=jsonpath='{.status.readyReplicas}'")
   253  	}
   254  	relaxedJSONPathExp, err := cmdget.RelaxedJSONPathExpression(jsonPathInput[0])
   255  	if err != nil {
   256  		return "", "", err
   257  	}
   258  	if len(jsonPathInput) == 1 {
   259  		return relaxedJSONPathExp, "", nil
   260  	}
   261  	jsonPathValue := strings.Trim(jsonPathInput[1], `'"`)
   262  	if jsonPathValue == "" {
   263  		return "", "", errors.New("jsonpath wait has to have a value after equal sign, like --for=jsonpath='{.status.readyReplicas}'=3")
   264  	}
   265  	return relaxedJSONPathExp, jsonPathValue, nil
   266  }
   267  
   268  // splitJSONPathInput splits the provided input string on single '='. Double '==' will not cause the string to be
   269  // split. E.g., "a.b.c====d.e.f===g.h.i===" will split to ["a.b.c====d.e.f==","g.h.i==",""].
   270  func splitJSONPathInput(input string) []string {
   271  	var output []string
   272  	var element strings.Builder
   273  	for i := 0; i < len(input); i++ {
   274  		if input[i] == '=' {
   275  			if i < len(input)-1 && input[i+1] == '=' {
   276  				element.WriteString("==")
   277  				i++
   278  				continue
   279  			}
   280  			output = append(output, element.String())
   281  			element.Reset()
   282  			continue
   283  		}
   284  		element.WriteByte(input[i])
   285  	}
   286  	return append(output, element.String())
   287  }
   288  
   289  // ResourceLocation holds the location of a resource
   290  type ResourceLocation struct {
   291  	GroupResource schema.GroupResource
   292  	Namespace     string
   293  	Name          string
   294  }
   295  
   296  // UIDMap maps ResourceLocation with UID
   297  type UIDMap map[ResourceLocation]types.UID
   298  
   299  // WaitOptions is a set of options that allows you to wait.  This is the object reflects the runtime needs of a wait
   300  // command, making the logic itself easy to unit test with our existing mocks.
   301  type WaitOptions struct {
   302  	ResourceFinder genericclioptions.ResourceFinder
   303  	// UIDMap maps a resource location to a UID.  It is optional, but ConditionFuncs may choose to use it to make the result
   304  	// more reliable.  For instance, delete can look for UID consistency during delegated calls.
   305  	UIDMap        UIDMap
   306  	DynamicClient dynamic.Interface
   307  	Timeout       time.Duration
   308  	ForCondition  string
   309  
   310  	Printer     printers.ResourcePrinter
   311  	ConditionFn ConditionFunc
   312  	genericiooptions.IOStreams
   313  }
   314  
   315  // ConditionFunc is the interface for providing condition checks
   316  type ConditionFunc func(ctx context.Context, info *resource.Info, o *WaitOptions) (finalObject runtime.Object, done bool, err error)
   317  
   318  // RunWait runs the waiting logic
   319  func (o *WaitOptions) RunWait() error {
   320  	ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
   321  	defer cancel()
   322  
   323  	visitCount := 0
   324  	visitFunc := func(info *resource.Info, err error) error {
   325  		if err != nil {
   326  			return err
   327  		}
   328  
   329  		visitCount++
   330  		finalObject, success, err := o.ConditionFn(ctx, info, o)
   331  		if success {
   332  			o.Printer.PrintObj(finalObject, o.Out)
   333  			return nil
   334  		}
   335  		if err == nil {
   336  			return fmt.Errorf("%v unsatisified for unknown reason", finalObject)
   337  		}
   338  		return err
   339  	}
   340  	visitor := o.ResourceFinder.Do()
   341  	isForDelete := strings.ToLower(o.ForCondition) == "delete"
   342  	if visitor, ok := visitor.(*resource.Result); ok && isForDelete {
   343  		visitor.IgnoreErrors(apierrors.IsNotFound)
   344  	}
   345  
   346  	err := visitor.Visit(visitFunc)
   347  	if err != nil {
   348  		return err
   349  	}
   350  	if visitCount == 0 && !isForDelete {
   351  		return errNoMatchingResources
   352  	}
   353  	return err
   354  }
   355  
   356  // IsDeleted is a condition func for waiting for something to be deleted
   357  func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
   358  	if len(info.Name) == 0 {
   359  		return info.Object, false, fmt.Errorf("resource name must be provided")
   360  	}
   361  
   362  	gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
   363  	if apierrors.IsNotFound(initObjGetErr) {
   364  		return info.Object, true, nil
   365  	}
   366  	if initObjGetErr != nil {
   367  		// TODO this could do something slightly fancier if we wish
   368  		return info.Object, false, initObjGetErr
   369  	}
   370  	resourceLocation := ResourceLocation{
   371  		GroupResource: info.Mapping.Resource.GroupResource(),
   372  		Namespace:     gottenObj.GetNamespace(),
   373  		Name:          gottenObj.GetName(),
   374  	}
   375  	if uid, ok := o.UIDMap[resourceLocation]; ok {
   376  		if gottenObj.GetUID() != uid {
   377  			return gottenObj, true, nil
   378  		}
   379  	}
   380  
   381  	endTime := time.Now().Add(o.Timeout)
   382  	timeout := time.Until(endTime)
   383  	errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
   384  	if o.Timeout == 0 {
   385  		// If timeout is zero check if the object exists once only
   386  		if gottenObj == nil {
   387  			return nil, true, nil
   388  		}
   389  		return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
   390  	}
   391  	if timeout < 0 {
   392  		// we're out of time
   393  		return info.Object, false, errWaitTimeoutWithName
   394  	}
   395  
   396  	fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
   397  	lw := &cache.ListWatch{
   398  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   399  			options.FieldSelector = fieldSelector
   400  			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
   401  		},
   402  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   403  			options.FieldSelector = fieldSelector
   404  			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
   405  		},
   406  	}
   407  
   408  	// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
   409  	preconditionFunc := func(store cache.Store) (bool, error) {
   410  		_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
   411  		if err != nil {
   412  			return true, err
   413  		}
   414  		if !exists {
   415  			// since we're looking for it to disappear we just return here if it no longer exists
   416  			return true, nil
   417  		}
   418  
   419  		return false, nil
   420  	}
   421  
   422  	intrCtx, cancel := context.WithCancel(ctx)
   423  	defer cancel()
   424  	intr := interrupt.New(nil, cancel)
   425  	err := intr.Run(func() error {
   426  		_, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted)
   427  		if errors.Is(err, context.DeadlineExceeded) {
   428  			return errWaitTimeoutWithName
   429  		}
   430  		return err
   431  	})
   432  	if err != nil {
   433  		if err == wait.ErrWaitTimeout {
   434  			return gottenObj, false, errWaitTimeoutWithName
   435  		}
   436  		return gottenObj, false, err
   437  	}
   438  
   439  	return gottenObj, true, nil
   440  }
   441  
   442  // Wait has helper methods for handling watches, including error handling.
   443  type Wait struct {
   444  	errOut io.Writer
   445  }
   446  
   447  // IsDeleted returns true if the object is deleted. It prints any errors it encounters.
   448  func (w Wait) IsDeleted(event watch.Event) (bool, error) {
   449  	switch event.Type {
   450  	case watch.Error:
   451  		// keep waiting in the event we see an error - we expect the watch to be closed by
   452  		// the server if the error is unrecoverable.
   453  		err := apierrors.FromObject(event.Object)
   454  		fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
   455  		return false, nil
   456  	case watch.Deleted:
   457  		return true, nil
   458  	default:
   459  		return false, nil
   460  	}
   461  }
   462  
   463  type isCondMetFunc func(event watch.Event) (bool, error)
   464  type checkCondFunc func(obj *unstructured.Unstructured) (bool, error)
   465  
   466  // getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
   467  // If the condition is not met, it will make a Watch query to the server and pass in the condMet function
   468  func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
   469  	if len(info.Name) == 0 {
   470  		return info.Object, false, fmt.Errorf("resource name must be provided")
   471  	}
   472  
   473  	endTime := time.Now().Add(o.Timeout)
   474  	timeout := time.Until(endTime)
   475  	errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
   476  	if o.Timeout == 0 {
   477  		// If timeout is zero we will fetch the object(s) once only and check
   478  		gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
   479  		if initObjGetErr != nil {
   480  			return nil, false, initObjGetErr
   481  		}
   482  		if gottenObj == nil {
   483  			return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName())
   484  		}
   485  		conditionCheck, err := check(gottenObj)
   486  		if err != nil {
   487  			return gottenObj, false, err
   488  		}
   489  		if conditionCheck == false {
   490  			return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
   491  		}
   492  		return gottenObj, true, nil
   493  	}
   494  	if timeout < 0 {
   495  		// we're out of time
   496  		return info.Object, false, errWaitTimeoutWithName
   497  	}
   498  
   499  	mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears
   500  	fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
   501  	lw := &cache.ListWatch{
   502  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   503  			options.FieldSelector = fieldSelector
   504  			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
   505  		},
   506  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   507  			options.FieldSelector = fieldSelector
   508  			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
   509  		},
   510  	}
   511  
   512  	// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
   513  	preconditionFunc := func(store cache.Store) (bool, error) {
   514  		_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
   515  		if err != nil {
   516  			return true, err
   517  		}
   518  		if !exists {
   519  			return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name)
   520  		}
   521  
   522  		return false, nil
   523  	}
   524  
   525  	intrCtx, cancel := context.WithCancel(ctx)
   526  	defer cancel()
   527  	var result runtime.Object
   528  	intr := interrupt.New(nil, cancel)
   529  	err := intr.Run(func() error {
   530  		ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet))
   531  		if ev != nil {
   532  			result = ev.Object
   533  		}
   534  		if errors.Is(err, context.DeadlineExceeded) {
   535  			return errWaitTimeoutWithName
   536  		}
   537  		return err
   538  	})
   539  	if err != nil {
   540  		if err == wait.ErrWaitTimeout {
   541  			return result, false, errWaitTimeoutWithName
   542  		}
   543  		return result, false, err
   544  	}
   545  
   546  	return result, true, nil
   547  }
   548  
   549  // ConditionalWait hold information to check an API status condition
   550  type ConditionalWait struct {
   551  	conditionName   string
   552  	conditionStatus string
   553  	// errOut is written to if an error occurs
   554  	errOut io.Writer
   555  }
   556  
   557  // IsConditionMet is a conditionfunc for waiting on an API condition to be met
   558  func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
   559  	return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition)
   560  }
   561  
   562  func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
   563  	conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
   564  	if err != nil {
   565  		return false, err
   566  	}
   567  	if !found {
   568  		return false, nil
   569  	}
   570  	for _, conditionUncast := range conditions {
   571  		condition := conditionUncast.(map[string]interface{})
   572  		name, found, err := unstructured.NestedString(condition, "type")
   573  		if !found || err != nil || !strings.EqualFold(name, w.conditionName) {
   574  			continue
   575  		}
   576  		status, found, err := unstructured.NestedString(condition, "status")
   577  		if !found || err != nil {
   578  			continue
   579  		}
   580  		generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation")
   581  		if found {
   582  			observedGeneration, found := getObservedGeneration(obj, condition)
   583  			if found && observedGeneration < generation {
   584  				return false, nil
   585  			}
   586  		}
   587  		return strings.EqualFold(status, w.conditionStatus), nil
   588  	}
   589  
   590  	return false, nil
   591  }
   592  
   593  func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
   594  	if event.Type == watch.Error {
   595  		// keep waiting in the event we see an error - we expect the watch to be closed by
   596  		// the server
   597  		err := apierrors.FromObject(event.Object)
   598  		fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
   599  		return false, nil
   600  	}
   601  	if event.Type == watch.Deleted {
   602  		// this will chain back out, result in another get and an return false back up the chain
   603  		return false, nil
   604  	}
   605  	obj := event.Object.(*unstructured.Unstructured)
   606  	return w.checkCondition(obj)
   607  }
   608  
   609  func extendErrWaitTimeout(err error, info *resource.Info) error {
   610  	return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name)
   611  }
   612  
   613  func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) {
   614  	conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration")
   615  	if found {
   616  		return conditionObservedGeneration, true
   617  	}
   618  	statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration")
   619  	return statusObservedGeneration, found
   620  }
   621  
   622  // JSONPathWait holds a JSONPath Parser which has the ability
   623  // to check for the JSONPath condition and compare with the API server provided JSON output.
   624  type JSONPathWait struct {
   625  	matchAnyValue  bool
   626  	jsonPathValue  string
   627  	jsonPathParser *jsonpath.JSONPath
   628  	// errOut is written to if an error occurs
   629  	errOut io.Writer
   630  }
   631  
   632  // IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check
   633  func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
   634  	return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition)
   635  }
   636  
   637  // isJSONPathConditionMet is a helper function of IsJSONPathConditionMet
   638  // which check the watch event and check if a JSONPathWait condition is met
   639  func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) {
   640  	if event.Type == watch.Error {
   641  		// keep waiting in the event we see an error - we expect the watch to be closed by
   642  		// the server
   643  		err := apierrors.FromObject(event.Object)
   644  		fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
   645  		return false, nil
   646  	}
   647  	if event.Type == watch.Deleted {
   648  		// this will chain back out, result in another get and an return false back up the chain
   649  		return false, nil
   650  	}
   651  	// event runtime Object can be safely asserted to Unstructed
   652  	// because we are working with dynamic client
   653  	obj := event.Object.(*unstructured.Unstructured)
   654  	return j.checkCondition(obj)
   655  }
   656  
   657  // checkCondition uses JSONPath parser to parse the JSON received from the API server
   658  // and check if it matches the desired condition
   659  func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
   660  	queryObj := obj.UnstructuredContent()
   661  	parseResults, err := j.jsonPathParser.FindResults(queryObj)
   662  	if err != nil {
   663  		return false, err
   664  	}
   665  	if len(parseResults) == 0 || len(parseResults[0]) == 0 {
   666  		return false, nil
   667  	}
   668  	if err := verifyParsedJSONPath(parseResults); err != nil {
   669  		return false, err
   670  	}
   671  	if j.matchAnyValue {
   672  		return true, nil
   673  	}
   674  	isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathValue)
   675  	if err != nil {
   676  		return false, err
   677  	}
   678  	return isConditionMet, nil
   679  }
   680  
   681  // verifyParsedJSONPath verifies the JSON received from the API server is valid.
   682  // It will only accept a single JSON
   683  func verifyParsedJSONPath(results [][]reflect.Value) error {
   684  	if len(results) > 1 {
   685  		return errors.New("given jsonpath expression matches more than one list")
   686  	}
   687  	if len(results[0]) > 1 {
   688  		return errors.New("given jsonpath expression matches more than one value")
   689  	}
   690  	return nil
   691  }
   692  
   693  // compareResults will compare the reflect.Value from the result parsed by the
   694  // JSONPath parser with the expected value given by the value
   695  //
   696  // Since this is coming from an unstructured this can only ever be a primitive,
   697  // map[string]interface{}, or []interface{}.
   698  // We do not support the last two and rely on fmt to handle conversion to string
   699  // and compare the result with user input
   700  func compareResults(r reflect.Value, expectedVal string) (bool, error) {
   701  	switch r.Interface().(type) {
   702  	case map[string]interface{}, []interface{}:
   703  		return false, errors.New("jsonpath leads to a nested object or list which is not supported")
   704  	}
   705  	s := fmt.Sprintf("%v", r.Interface())
   706  	return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil
   707  }
   708  

View as plain text