1
16
17 package pager
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "testing"
24 "time"
25
26 "k8s.io/apimachinery/pkg/api/errors"
27 metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
30 "k8s.io/apimachinery/pkg/runtime"
31 )
32
33 func list(count int, rv string) *metainternalversion.List {
34 var list metainternalversion.List
35 for i := 0; i < count; i++ {
36 list.Items = append(list.Items, &metav1beta1.PartialObjectMetadata{
37 ObjectMeta: metav1.ObjectMeta{
38 Name: fmt.Sprintf("%d", i),
39 },
40 })
41 }
42 list.ResourceVersion = rv
43 return &list
44 }
45
46 type testPager struct {
47 t *testing.T
48 rv string
49 index int
50 remaining int
51 last int
52 continuing bool
53 done bool
54 expectPage int64
55 }
56
57 func (p *testPager) reset() {
58 p.continuing = false
59 p.remaining += p.index
60 p.index = 0
61 p.last = 0
62 p.done = false
63 }
64
65 func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
66 if p.done {
67 p.t.Errorf("did not expect additional call to paged list")
68 return nil, fmt.Errorf("unexpected list call")
69 }
70 expectedContinue := fmt.Sprintf("%s:%d", p.rv, p.last)
71 if options.Limit != p.expectPage || (p.continuing && options.Continue != expectedContinue) {
72 p.t.Errorf("invariant violated, expected limit %d and continue %s, got %#v", p.expectPage, expectedContinue, options)
73 return nil, fmt.Errorf("invariant violated")
74 }
75 if options.Continue != "" && options.ResourceVersion != "" {
76 p.t.Errorf("invariant violated, specifying resource version (%s) is not allowed when using continue (%s).", options.ResourceVersion, options.Continue)
77 return nil, fmt.Errorf("invariant violated")
78 }
79 if options.Continue != "" && options.ResourceVersionMatch != "" {
80 p.t.Errorf("invariant violated, specifying resource version match type (%s) is not allowed when using continue (%s).", options.ResourceVersionMatch, options.Continue)
81 return nil, fmt.Errorf("invariant violated")
82 }
83 var list metainternalversion.List
84 total := options.Limit
85 if total == 0 {
86 total = int64(p.remaining)
87 }
88 for i := int64(0); i < total; i++ {
89 if p.remaining <= 0 {
90 break
91 }
92 list.Items = append(list.Items, &metav1beta1.PartialObjectMetadata{
93 ObjectMeta: metav1.ObjectMeta{
94 Name: fmt.Sprintf("%d", p.index),
95 },
96 })
97 p.remaining--
98 p.index++
99 }
100 p.last = p.index
101 if p.remaining > 0 {
102 list.Continue = fmt.Sprintf("%s:%d", p.rv, p.last)
103 p.continuing = true
104 } else {
105 p.done = true
106 }
107 list.ResourceVersion = p.rv
108 return &list, nil
109 }
110
111 func (p *testPager) ExpiresOnSecondPage(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
112 if p.continuing {
113 p.done = true
114 return nil, errors.NewResourceExpired("this list has expired")
115 }
116 return p.PagedList(ctx, options)
117 }
118
119 func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
120 if p.continuing {
121 p.reset()
122 p.expectPage = 0
123 return nil, errors.NewResourceExpired("this list has expired")
124 }
125 return p.PagedList(ctx, options)
126 }
127
128 func TestListPager_List(t *testing.T) {
129 type fields struct {
130 PageSize int64
131 PageFn ListPageFunc
132 FullListIfExpired bool
133 }
134 type args struct {
135 ctx context.Context
136 options metav1.ListOptions
137 }
138 tests := []struct {
139 name string
140 fields fields
141 args args
142 want runtime.Object
143 wantPaged bool
144 wantErr bool
145 isExpired bool
146 }{
147 {
148 name: "empty page",
149 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
150 args: args{},
151 want: list(0, "rv:20"),
152 wantPaged: false,
153 },
154 {
155 name: "one page",
156 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
157 args: args{},
158 want: list(9, "rv:20"),
159 wantPaged: false,
160 },
161 {
162 name: "one full page",
163 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
164 args: args{},
165 want: list(10, "rv:20"),
166 wantPaged: false,
167 },
168 {
169 name: "two pages",
170 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
171 args: args{},
172 want: list(11, "rv:20"),
173 wantPaged: true,
174 },
175 {
176 name: "three pages",
177 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
178 args: args{},
179 want: list(21, "rv:20"),
180 wantPaged: true,
181 },
182 {
183 name: "expires on second page",
184 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
185 args: args{},
186 wantPaged: true,
187 wantErr: true,
188 isExpired: true,
189 },
190 {
191 name: "expires on second page and then lists",
192 fields: fields{
193 FullListIfExpired: true,
194 PageSize: 10,
195 PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList,
196 },
197 args: args{},
198 want: list(21, "rv:20"),
199 wantPaged: true,
200 },
201 {
202 name: "two pages with resourceVersion",
203 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
204 args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}},
205 want: list(11, "rv:20"),
206 wantPaged: true,
207 },
208 {
209 name: "two pages with resourceVersion and resourceVersionMatch",
210 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
211 args: args{options: metav1.ListOptions{ResourceVersion: "rv:10", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}},
212 want: list(11, "rv:20"),
213 wantPaged: true,
214 },
215 }
216 for _, tt := range tests {
217 t.Run(tt.name, func(t *testing.T) {
218 p := &ListPager{
219 PageSize: tt.fields.PageSize,
220 PageFn: tt.fields.PageFn,
221 FullListIfExpired: tt.fields.FullListIfExpired,
222 }
223 ctx := tt.args.ctx
224 if ctx == nil {
225 ctx = context.Background()
226 }
227 got, paginatedResult, err := p.List(ctx, tt.args.options)
228 if (err != nil) != tt.wantErr {
229 t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr)
230 return
231 }
232 if tt.isExpired != errors.IsResourceExpired(err) {
233 t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired)
234 return
235 }
236 if tt.wantPaged != paginatedResult {
237 t.Errorf("paginatedResult = %t, want %t", paginatedResult, tt.wantPaged)
238 }
239 if !reflect.DeepEqual(got, tt.want) {
240 t.Errorf("ListPager.List() = %v, want %v", got, tt.want)
241 }
242 })
243 }
244 }
245
246 func TestListPager_EachListItem(t *testing.T) {
247 type fields struct {
248 PageSize int64
249 PageFn ListPageFunc
250 }
251 tests := []struct {
252 name string
253 fields fields
254 want runtime.Object
255 wantErr bool
256 wantPanic bool
257 isExpired bool
258 processorErrorOnItem int
259 processorPanicOnItem int
260 cancelContextOnItem int
261 }{
262 {
263 name: "empty page",
264 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
265 want: list(0, "rv:20"),
266 },
267 {
268 name: "one page",
269 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
270 want: list(9, "rv:20"),
271 },
272 {
273 name: "one full page",
274 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
275 want: list(10, "rv:20"),
276 },
277 {
278 name: "two pages",
279 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
280 want: list(11, "rv:20"),
281 },
282 {
283 name: "three pages",
284 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
285 want: list(21, "rv:20"),
286 },
287 {
288 name: "expires on second page",
289 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
290 want: list(10, "rv:20"),
291 wantErr: true,
292 isExpired: true,
293 },
294 {
295 name: "error processing item",
296 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
297 want: list(3, "rv:20"),
298 wantPanic: true,
299 processorPanicOnItem: 3,
300 },
301 {
302 name: "cancel context while processing",
303 fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
304 want: list(10, "rv:20"),
305 wantErr: true,
306 cancelContextOnItem: 3,
307 },
308 }
309
310 processorErr := fmt.Errorf("processor error")
311 for _, tt := range tests {
312 t.Run(tt.name, func(t *testing.T) {
313 ctx, cancel := context.WithCancel(context.Background())
314 p := &ListPager{
315 PageSize: tt.fields.PageSize,
316 PageFn: tt.fields.PageFn,
317 }
318 var items []runtime.Object
319
320 fn := func(obj runtime.Object) error {
321 items = append(items, obj)
322 if tt.processorErrorOnItem > 0 && len(items) == tt.processorErrorOnItem {
323 return processorErr
324 }
325 if tt.processorPanicOnItem > 0 && len(items) == tt.processorPanicOnItem {
326 panic(processorErr)
327 }
328 if tt.cancelContextOnItem > 0 && len(items) == tt.cancelContextOnItem {
329 cancel()
330 }
331 return nil
332 }
333 var err error
334 var panic interface{}
335 func() {
336 defer func() {
337 panic = recover()
338 }()
339 err = p.EachListItem(ctx, metav1.ListOptions{}, fn)
340 }()
341 if (panic != nil) && !tt.wantPanic {
342 t.Errorf(".EachListItem() panic = %v, wantPanic %v", panic, tt.wantPanic)
343 return
344 }
345 if (err != nil) != tt.wantErr {
346 t.Errorf("ListPager.EachListItem() error = %v, wantErr %v", err, tt.wantErr)
347 return
348 }
349 if tt.isExpired != errors.IsResourceExpired(err) {
350 t.Errorf("ListPager.EachListItem() error = %v, isExpired %v", err, tt.isExpired)
351 return
352 }
353 if tt.processorErrorOnItem > 0 && err != processorErr {
354 t.Errorf("ListPager.EachListItem() error = %v, processorErrorOnItem %d", err, tt.processorErrorOnItem)
355 return
356 }
357 l := tt.want.(*metainternalversion.List)
358 if !reflect.DeepEqual(items, l.Items) {
359 t.Errorf("ListPager.EachListItem() = %v, want %v", items, l.Items)
360 }
361 })
362 }
363 }
364
365 func TestListPager_eachListPageBuffered(t *testing.T) {
366 tests := []struct {
367 name string
368 totalPages int
369 pagesProcessed int
370 wantPageLists int
371 pageBufferSize int32
372 pageSize int
373 }{
374 {
375 name: "no buffer, one total page",
376 totalPages: 1,
377 pagesProcessed: 1,
378 wantPageLists: 1,
379 pageBufferSize: 0,
380 }, {
381 name: "no buffer, 1/5 pages processed",
382 totalPages: 5,
383 pagesProcessed: 1,
384 wantPageLists: 2,
385 pageBufferSize: 0,
386 },
387 {
388 name: "no buffer, 2/5 pages processed",
389 totalPages: 5,
390 pagesProcessed: 2,
391 wantPageLists: 3,
392 pageBufferSize: 0,
393 },
394 {
395 name: "no buffer, 5/5 pages processed",
396 totalPages: 5,
397 pagesProcessed: 5,
398 wantPageLists: 5,
399 pageBufferSize: 0,
400 },
401 {
402 name: "size 1 buffer, 1/5 pages processed",
403 totalPages: 5,
404 pagesProcessed: 1,
405 wantPageLists: 3,
406 pageBufferSize: 1,
407 },
408 {
409 name: "size 1 buffer, 5/5 pages processed",
410 totalPages: 5,
411 pagesProcessed: 5,
412 wantPageLists: 5,
413 pageBufferSize: 1,
414 },
415 {
416 name: "size 10 buffer, 1/5 page processed",
417 totalPages: 5,
418 pagesProcessed: 1,
419 wantPageLists: 5,
420 pageBufferSize: 10,
421 },
422 }
423 processorErr := fmt.Errorf("processor error")
424 pageSize := 10
425 for _, tt := range tests {
426 t.Run(tt.name, func(t *testing.T) {
427 pgr := &testPager{t: t, expectPage: int64(pageSize), remaining: tt.totalPages * pageSize, rv: "rv:20"}
428 pageLists := 0
429 wantedPageListsDone := make(chan struct{})
430 listFn := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
431 pageLists++
432 if pageLists == tt.wantPageLists {
433 close(wantedPageListsDone)
434 }
435 return pgr.PagedList(ctx, options)
436 }
437 p := &ListPager{
438 PageSize: int64(pageSize),
439 PageBufferSize: tt.pageBufferSize,
440 PageFn: listFn,
441 }
442
443 pagesProcessed := 0
444 fn := func(obj runtime.Object) error {
445 pagesProcessed++
446 if tt.pagesProcessed == pagesProcessed && tt.wantPageLists > 0 {
447
448 select {
449 case <-time.After(time.Second):
450 return fmt.Errorf("Timed out waiting for %d page lists", tt.wantPageLists)
451 case <-wantedPageListsDone:
452 }
453 return processorErr
454 }
455 return nil
456 }
457 err := p.eachListChunkBuffered(context.Background(), metav1.ListOptions{}, fn)
458 if tt.pagesProcessed > 0 && err == processorErr {
459
460 } else if err != nil {
461 t.Fatal(err)
462 }
463 if tt.wantPageLists > 0 && pageLists != tt.wantPageLists {
464 t.Errorf("expected %d page lists, got %d", tt.wantPageLists, pageLists)
465 }
466 if pagesProcessed != tt.pagesProcessed {
467 t.Errorf("expected %d pages processed, got %d", tt.pagesProcessed, pagesProcessed)
468 }
469 })
470 }
471 }
472
View as plain text