1
16
17 package synctrack_test
18
19 import (
20 "errors"
21 "sync"
22 "sync/atomic"
23 "testing"
24 "time"
25
26 "k8s.io/client-go/tools/cache/synctrack"
27 )
28
29 func TestLazy(t *testing.T) {
30 var reality int64
31 var z synctrack.Lazy[int64]
32
33 z.Evaluate = func() (int64, error) {
34 return atomic.LoadInt64(&reality), nil
35 }
36
37 var wg sync.WaitGroup
38
39 for i := 0; i < 10; i++ {
40 wg.Add(1)
41 go func(delay time.Duration) {
42 defer wg.Done()
43 for i := 0; i < 100; i++ {
44 t.Helper()
45 set := atomic.AddInt64(&reality, 1)
46 z.Notify()
47 got, err := z.Get()
48 if err != nil {
49 t.Errorf("unexpected error: %v", err)
50 }
51 if got < set {
52 t.Errorf("time went backwards. %v vs %v", got, set)
53 }
54 time.Sleep(delay)
55 }
56 }((1 + time.Duration(i%3)) * time.Microsecond)
57 }
58
59 wg.Wait()
60 }
61
62 func TestLazyThroughput(t *testing.T) {
63 var reality int64
64 var z synctrack.Lazy[int64]
65 var totalWait int64
66 z.Evaluate = func() (int64, error) {
67 got := atomic.LoadInt64(&reality)
68 time.Sleep(11 * time.Millisecond)
69 return got, nil
70 }
71
72 var wg sync.WaitGroup
73 wg.Add(1)
74
75 go func() {
76 defer wg.Done()
77 notifies := 0
78 tt := time.NewTicker(10 * time.Millisecond)
79 for {
80 <-tt.C
81 atomic.AddInt64(&reality, 1)
82 z.Notify()
83 notifies++
84 if notifies >= 100 {
85 tt.Stop()
86 return
87 }
88 wg.Add(1)
89 go func() {
90 t.Helper()
91 defer wg.Done()
92 start := time.Now()
93 z.Get()
94 d := time.Since(start)
95 atomic.AddInt64(&totalWait, int64(d))
96 }()
97 }
98 }()
99
100 wg.Wait()
101
102 twd := time.Duration(totalWait)
103
104 if twd > 3*time.Second {
105 t.Errorf("total wait was: %v; par would be ~1s", twd)
106 }
107
108 }
109
110
111
112 type sequence []sync.WaitGroup
113
114 func newSequence(n int) sequence {
115 s := make(sequence, n)
116 for i := range s {
117 s[i].Add(1)
118 }
119 return s
120 }
121
122 func (s sequence) Start() { s[0].Done() }
123
124 func (s sequence) Step(n int) {
125 s[n].Wait()
126 if n+1 < len(s) {
127 s[n+1].Done()
128 }
129 }
130
131
132 func asyncGet[T any](t *testing.T, seq sequence, z *synctrack.Lazy[T], pre, post int) func() T {
133 var wg sync.WaitGroup
134 var val T
135 wg.Add(1)
136 go func() {
137 defer wg.Done()
138 t.Helper()
139 var err error
140 seq.Step(pre)
141 val, err = z.Get()
142 seq.Step(post)
143 if err != nil {
144 t.Errorf("unexpected error: %v", err)
145 }
146 }()
147 return func() T { wg.Wait(); return val }
148 }
149
150 func TestLazySlowEval(t *testing.T) {
151
152
153
154
155
156 seq := newSequence(10)
157
158 var getCount int64
159 var z synctrack.Lazy[int64]
160
161 z.Evaluate = func() (int64, error) {
162 count := atomic.AddInt64(&getCount, 1)
163 if count == 1 {
164 seq.Step(1)
165 seq.Step(6)
166 } else if count > 2 {
167 t.Helper()
168 t.Errorf("Eval called extra times. count=%v", count)
169 } else {
170 seq.Step(4)
171 }
172 return time.Now().UnixNano(), nil
173 }
174
175 seq.Start()
176
177 getA := asyncGet(t, seq, &z, 0, 7)
178
179 seq.Step(2)
180 z.Notify()
181
182 getB := asyncGet(t, seq, &z, 3, 5)
183
184 getC := asyncGet(t, seq, &z, 8, 9)
185
186 a, b, c := getA(), getB(), getC()
187 if a < b {
188 t.Errorf("failed to create the test condition")
189 }
190 if b != c && c == a {
191 t.Errorf("wrong value was cached")
192 }
193 }
194
195 func TestLazySlowEval2(t *testing.T) {
196
197
198
199
200
201 seq := newSequence(11)
202
203 var getCount int64
204 var z synctrack.Lazy[int64]
205
206 z.Evaluate = func() (int64, error) {
207 count := atomic.AddInt64(&getCount, 1)
208 if count == 1 {
209 seq.Step(1)
210 seq.Step(5)
211 } else if count > 2 {
212 t.Helper()
213 t.Errorf("Eval called extra times. count=%v", count)
214 } else {
215 seq.Step(4)
216 seq.Step(7)
217 }
218 return time.Now().UnixNano(), nil
219 }
220
221 seq.Start()
222
223 getA := asyncGet(t, seq, &z, 0, 6)
224
225 seq.Step(2)
226
227 z.Notify()
228
229 getB := asyncGet(t, seq, &z, 3, 8)
230
231 getC := asyncGet(t, seq, &z, 9, 10)
232
233 a, b, c := getA(), getB(), getC()
234 if a > b {
235 t.Errorf("failed to create the test condition")
236 }
237 if b != c && c == a {
238 t.Errorf("wrong value was cached")
239 }
240 }
241
242 func TestLazyOnlyOnce(t *testing.T) {
243
244
245 seq := newSequence(8)
246
247 var getCount int64
248 var z synctrack.Lazy[int64]
249
250 z.Evaluate = func() (int64, error) {
251 count := atomic.AddInt64(&getCount, 1)
252 if count == 1 {
253 seq.Step(1)
254 seq.Step(4)
255 } else if count > 1 {
256 t.Helper()
257 t.Errorf("Eval called extra times. count=%v", count)
258 }
259 return time.Now().UnixNano(), nil
260 }
261
262 seq.Start()
263
264 z.Notify()
265
266 getA := asyncGet(t, seq, &z, 0, 5)
267 getB := asyncGet(t, seq, &z, 2, 6)
268 getC := asyncGet(t, seq, &z, 3, 7)
269
270 a, b, c := getA(), getB(), getC()
271 if a > b {
272 t.Errorf("failed to create the test condition")
273 }
274 if b != c && c == a {
275 t.Errorf("wrong value was cached")
276 }
277 }
278
279 func TestLazyError(t *testing.T) {
280 var succeed bool
281 var z synctrack.Lazy[bool]
282 z.Evaluate = func() (bool, error) {
283 if succeed {
284 return true, nil
285 } else {
286 return false, errors.New("deliberate fail")
287 }
288 }
289
290 if _, err := z.Get(); err == nil {
291 t.Fatalf("expected error")
292 }
293
294 succeed = true
295 if _, err := z.Get(); err != nil {
296 t.Fatalf("unexpected error")
297 }
298 }
299
View as plain text