...

Source file src/k8s.io/apimachinery/pkg/util/wait/wait.go

Documentation: k8s.io/apimachinery/pkg/util/wait

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package wait
    18  
    19  import (
    20  	"context"
    21  	"math/rand"
    22  	"sync"
    23  	"time"
    24  
    25  	"k8s.io/apimachinery/pkg/util/runtime"
    26  )
    27  
    28  // For any test of the style:
    29  //
    30  //	...
    31  //	<- time.After(timeout):
    32  //	   t.Errorf("Timed out")
    33  //
    34  // The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s
    35  // is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine
    36  // (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test.
    37  var ForeverTestTimeout = time.Second * 30
    38  
    39  // NeverStop may be passed to Until to make it never stop.
    40  var NeverStop <-chan struct{} = make(chan struct{})
    41  
    42  // Group allows to start a group of goroutines and wait for their completion.
    43  type Group struct {
    44  	wg sync.WaitGroup
    45  }
    46  
    47  func (g *Group) Wait() {
    48  	g.wg.Wait()
    49  }
    50  
    51  // StartWithChannel starts f in a new goroutine in the group.
    52  // stopCh is passed to f as an argument. f should stop when stopCh is available.
    53  func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) {
    54  	g.Start(func() {
    55  		f(stopCh)
    56  	})
    57  }
    58  
    59  // StartWithContext starts f in a new goroutine in the group.
    60  // ctx is passed to f as an argument. f should stop when ctx.Done() is available.
    61  func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) {
    62  	g.Start(func() {
    63  		f(ctx)
    64  	})
    65  }
    66  
    67  // Start starts f in a new goroutine in the group.
    68  func (g *Group) Start(f func()) {
    69  	g.wg.Add(1)
    70  	go func() {
    71  		defer g.wg.Done()
    72  		f()
    73  	}()
    74  }
    75  
    76  // Forever calls f every period for ever.
    77  //
    78  // Forever is syntactic sugar on top of Until.
    79  func Forever(f func(), period time.Duration) {
    80  	Until(f, period, NeverStop)
    81  }
    82  
    83  // Jitter returns a time.Duration between duration and duration + maxFactor *
    84  // duration.
    85  //
    86  // This allows clients to avoid converging on periodic behavior. If maxFactor
    87  // is 0.0, a suggested default value will be chosen.
    88  func Jitter(duration time.Duration, maxFactor float64) time.Duration {
    89  	if maxFactor <= 0.0 {
    90  		maxFactor = 1.0
    91  	}
    92  	wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
    93  	return wait
    94  }
    95  
    96  // ConditionFunc returns true if the condition is satisfied, or an error
    97  // if the loop should be aborted.
    98  type ConditionFunc func() (done bool, err error)
    99  
   100  // ConditionWithContextFunc returns true if the condition is satisfied, or an error
   101  // if the loop should be aborted.
   102  //
   103  // The caller passes along a context that can be used by the condition function.
   104  type ConditionWithContextFunc func(context.Context) (done bool, err error)
   105  
   106  // WithContext converts a ConditionFunc into a ConditionWithContextFunc
   107  func (cf ConditionFunc) WithContext() ConditionWithContextFunc {
   108  	return func(context.Context) (done bool, err error) {
   109  		return cf()
   110  	}
   111  }
   112  
   113  // ContextForChannel provides a context that will be treated as cancelled
   114  // when the provided parentCh is closed. The implementation returns
   115  // context.Canceled for Err() if and only if the parentCh is closed.
   116  func ContextForChannel(parentCh <-chan struct{}) context.Context {
   117  	return channelContext{stopCh: parentCh}
   118  }
   119  
   120  var _ context.Context = channelContext{}
   121  
   122  // channelContext will behave as if the context were cancelled when stopCh is
   123  // closed.
   124  type channelContext struct {
   125  	stopCh <-chan struct{}
   126  }
   127  
   128  func (c channelContext) Done() <-chan struct{} { return c.stopCh }
   129  func (c channelContext) Err() error {
   130  	select {
   131  	case <-c.stopCh:
   132  		return context.Canceled
   133  	default:
   134  		return nil
   135  	}
   136  }
   137  func (c channelContext) Deadline() (time.Time, bool) { return time.Time{}, false }
   138  func (c channelContext) Value(key any) any           { return nil }
   139  
   140  // runConditionWithCrashProtection runs a ConditionFunc with crash protection.
   141  //
   142  // Deprecated: Will be removed when the legacy polling methods are removed.
   143  func runConditionWithCrashProtection(condition ConditionFunc) (bool, error) {
   144  	defer runtime.HandleCrash()
   145  	return condition()
   146  }
   147  
   148  // runConditionWithCrashProtectionWithContext runs a ConditionWithContextFunc
   149  // with crash protection.
   150  //
   151  // Deprecated: Will be removed when the legacy polling methods are removed.
   152  func runConditionWithCrashProtectionWithContext(ctx context.Context, condition ConditionWithContextFunc) (bool, error) {
   153  	defer runtime.HandleCrash()
   154  	return condition(ctx)
   155  }
   156  
   157  // waitFunc creates a channel that receives an item every time a test
   158  // should be executed and is closed when the last test should be invoked.
   159  //
   160  // Deprecated: Will be removed in a future release in favor of
   161  // loopConditionUntilContext.
   162  type waitFunc func(done <-chan struct{}) <-chan struct{}
   163  
   164  // WithContext converts the WaitFunc to an equivalent WaitWithContextFunc
   165  func (w waitFunc) WithContext() waitWithContextFunc {
   166  	return func(ctx context.Context) <-chan struct{} {
   167  		return w(ctx.Done())
   168  	}
   169  }
   170  
   171  // waitWithContextFunc creates a channel that receives an item every time a test
   172  // should be executed and is closed when the last test should be invoked.
   173  //
   174  // When the specified context gets cancelled or expires the function
   175  // stops sending item and returns immediately.
   176  //
   177  // Deprecated: Will be removed in a future release in favor of
   178  // loopConditionUntilContext.
   179  type waitWithContextFunc func(ctx context.Context) <-chan struct{}
   180  
   181  // waitForWithContext continually checks 'fn' as driven by 'wait'.
   182  //
   183  // waitForWithContext gets a channel from 'wait()”, and then invokes 'fn'
   184  // once for every value placed on the channel and once more when the
   185  // channel is closed. If the channel is closed and 'fn'
   186  // returns false without error, waitForWithContext returns ErrWaitTimeout.
   187  //
   188  // If 'fn' returns an error the loop ends and that error is returned. If
   189  // 'fn' returns true the loop ends and nil is returned.
   190  //
   191  // context.Canceled will be returned if the ctx.Done() channel is closed
   192  // without fn ever returning true.
   193  //
   194  // When the ctx.Done() channel is closed, because the golang `select` statement is
   195  // "uniform pseudo-random", the `fn` might still run one or multiple times,
   196  // though eventually `waitForWithContext` will return.
   197  //
   198  // Deprecated: Will be removed in a future release in favor of
   199  // loopConditionUntilContext.
   200  func waitForWithContext(ctx context.Context, wait waitWithContextFunc, fn ConditionWithContextFunc) error {
   201  	waitCtx, cancel := context.WithCancel(context.Background())
   202  	defer cancel()
   203  	c := wait(waitCtx)
   204  	for {
   205  		select {
   206  		case _, open := <-c:
   207  			ok, err := runConditionWithCrashProtectionWithContext(ctx, fn)
   208  			if err != nil {
   209  				return err
   210  			}
   211  			if ok {
   212  				return nil
   213  			}
   214  			if !open {
   215  				return ErrWaitTimeout
   216  			}
   217  		case <-ctx.Done():
   218  			// returning ctx.Err() will break backward compatibility, use new PollUntilContext*
   219  			// methods instead
   220  			return ErrWaitTimeout
   221  		}
   222  	}
   223  }
   224  

View as plain text