1 /* 2 Copyright 2014 The Kubernetes Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 package wait 18 19 import ( 20 "context" 21 "math/rand" 22 "sync" 23 "time" 24 25 "k8s.io/apimachinery/pkg/util/runtime" 26 ) 27 28 // For any test of the style: 29 // 30 // ... 31 // <- time.After(timeout): 32 // t.Errorf("Timed out") 33 // 34 // The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s 35 // is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine 36 // (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test. 37 var ForeverTestTimeout = time.Second * 30 38 39 // NeverStop may be passed to Until to make it never stop. 40 var NeverStop <-chan struct{} = make(chan struct{}) 41 42 // Group allows to start a group of goroutines and wait for their completion. 43 type Group struct { 44 wg sync.WaitGroup 45 } 46 47 func (g *Group) Wait() { 48 g.wg.Wait() 49 } 50 51 // StartWithChannel starts f in a new goroutine in the group. 52 // stopCh is passed to f as an argument. f should stop when stopCh is available. 53 func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) { 54 g.Start(func() { 55 f(stopCh) 56 }) 57 } 58 59 // StartWithContext starts f in a new goroutine in the group. 60 // ctx is passed to f as an argument. f should stop when ctx.Done() is available. 61 func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) { 62 g.Start(func() { 63 f(ctx) 64 }) 65 } 66 67 // Start starts f in a new goroutine in the group. 68 func (g *Group) Start(f func()) { 69 g.wg.Add(1) 70 go func() { 71 defer g.wg.Done() 72 f() 73 }() 74 } 75 76 // Forever calls f every period for ever. 77 // 78 // Forever is syntactic sugar on top of Until. 79 func Forever(f func(), period time.Duration) { 80 Until(f, period, NeverStop) 81 } 82 83 // Jitter returns a time.Duration between duration and duration + maxFactor * 84 // duration. 85 // 86 // This allows clients to avoid converging on periodic behavior. If maxFactor 87 // is 0.0, a suggested default value will be chosen. 88 func Jitter(duration time.Duration, maxFactor float64) time.Duration { 89 if maxFactor <= 0.0 { 90 maxFactor = 1.0 91 } 92 wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration)) 93 return wait 94 } 95 96 // ConditionFunc returns true if the condition is satisfied, or an error 97 // if the loop should be aborted. 98 type ConditionFunc func() (done bool, err error) 99 100 // ConditionWithContextFunc returns true if the condition is satisfied, or an error 101 // if the loop should be aborted. 102 // 103 // The caller passes along a context that can be used by the condition function. 104 type ConditionWithContextFunc func(context.Context) (done bool, err error) 105 106 // WithContext converts a ConditionFunc into a ConditionWithContextFunc 107 func (cf ConditionFunc) WithContext() ConditionWithContextFunc { 108 return func(context.Context) (done bool, err error) { 109 return cf() 110 } 111 } 112 113 // ContextForChannel provides a context that will be treated as cancelled 114 // when the provided parentCh is closed. The implementation returns 115 // context.Canceled for Err() if and only if the parentCh is closed. 116 func ContextForChannel(parentCh <-chan struct{}) context.Context { 117 return channelContext{stopCh: parentCh} 118 } 119 120 var _ context.Context = channelContext{} 121 122 // channelContext will behave as if the context were cancelled when stopCh is 123 // closed. 124 type channelContext struct { 125 stopCh <-chan struct{} 126 } 127 128 func (c channelContext) Done() <-chan struct{} { return c.stopCh } 129 func (c channelContext) Err() error { 130 select { 131 case <-c.stopCh: 132 return context.Canceled 133 default: 134 return nil 135 } 136 } 137 func (c channelContext) Deadline() (time.Time, bool) { return time.Time{}, false } 138 func (c channelContext) Value(key any) any { return nil } 139 140 // runConditionWithCrashProtection runs a ConditionFunc with crash protection. 141 // 142 // Deprecated: Will be removed when the legacy polling methods are removed. 143 func runConditionWithCrashProtection(condition ConditionFunc) (bool, error) { 144 defer runtime.HandleCrash() 145 return condition() 146 } 147 148 // runConditionWithCrashProtectionWithContext runs a ConditionWithContextFunc 149 // with crash protection. 150 // 151 // Deprecated: Will be removed when the legacy polling methods are removed. 152 func runConditionWithCrashProtectionWithContext(ctx context.Context, condition ConditionWithContextFunc) (bool, error) { 153 defer runtime.HandleCrash() 154 return condition(ctx) 155 } 156 157 // waitFunc creates a channel that receives an item every time a test 158 // should be executed and is closed when the last test should be invoked. 159 // 160 // Deprecated: Will be removed in a future release in favor of 161 // loopConditionUntilContext. 162 type waitFunc func(done <-chan struct{}) <-chan struct{} 163 164 // WithContext converts the WaitFunc to an equivalent WaitWithContextFunc 165 func (w waitFunc) WithContext() waitWithContextFunc { 166 return func(ctx context.Context) <-chan struct{} { 167 return w(ctx.Done()) 168 } 169 } 170 171 // waitWithContextFunc creates a channel that receives an item every time a test 172 // should be executed and is closed when the last test should be invoked. 173 // 174 // When the specified context gets cancelled or expires the function 175 // stops sending item and returns immediately. 176 // 177 // Deprecated: Will be removed in a future release in favor of 178 // loopConditionUntilContext. 179 type waitWithContextFunc func(ctx context.Context) <-chan struct{} 180 181 // waitForWithContext continually checks 'fn' as driven by 'wait'. 182 // 183 // waitForWithContext gets a channel from 'wait()”, and then invokes 'fn' 184 // once for every value placed on the channel and once more when the 185 // channel is closed. If the channel is closed and 'fn' 186 // returns false without error, waitForWithContext returns ErrWaitTimeout. 187 // 188 // If 'fn' returns an error the loop ends and that error is returned. If 189 // 'fn' returns true the loop ends and nil is returned. 190 // 191 // context.Canceled will be returned if the ctx.Done() channel is closed 192 // without fn ever returning true. 193 // 194 // When the ctx.Done() channel is closed, because the golang `select` statement is 195 // "uniform pseudo-random", the `fn` might still run one or multiple times, 196 // though eventually `waitForWithContext` will return. 197 // 198 // Deprecated: Will be removed in a future release in favor of 199 // loopConditionUntilContext. 200 func waitForWithContext(ctx context.Context, wait waitWithContextFunc, fn ConditionWithContextFunc) error { 201 waitCtx, cancel := context.WithCancel(context.Background()) 202 defer cancel() 203 c := wait(waitCtx) 204 for { 205 select { 206 case _, open := <-c: 207 ok, err := runConditionWithCrashProtectionWithContext(ctx, fn) 208 if err != nil { 209 return err 210 } 211 if ok { 212 return nil 213 } 214 if !open { 215 return ErrWaitTimeout 216 } 217 case <-ctx.Done(): 218 // returning ctx.Err() will break backward compatibility, use new PollUntilContext* 219 // methods instead 220 return ErrWaitTimeout 221 } 222 } 223 } 224