...

Source file src/k8s.io/client-go/tools/pager/pager_test.go

Documentation: k8s.io/client-go/tools/pager

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package pager
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"testing"
    24  	"time"
    25  
    26  	"k8s.io/apimachinery/pkg/api/errors"
    27  	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  )
    32  
    33  func list(count int, rv string) *metainternalversion.List {
    34  	var list metainternalversion.List
    35  	for i := 0; i < count; i++ {
    36  		list.Items = append(list.Items, &metav1beta1.PartialObjectMetadata{
    37  			ObjectMeta: metav1.ObjectMeta{
    38  				Name: fmt.Sprintf("%d", i),
    39  			},
    40  		})
    41  	}
    42  	list.ResourceVersion = rv
    43  	return &list
    44  }
    45  
    46  type testPager struct {
    47  	t          *testing.T
    48  	rv         string
    49  	index      int
    50  	remaining  int
    51  	last       int
    52  	continuing bool
    53  	done       bool
    54  	expectPage int64
    55  }
    56  
    57  func (p *testPager) reset() {
    58  	p.continuing = false
    59  	p.remaining += p.index
    60  	p.index = 0
    61  	p.last = 0
    62  	p.done = false
    63  }
    64  
    65  func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
    66  	if p.done {
    67  		p.t.Errorf("did not expect additional call to paged list")
    68  		return nil, fmt.Errorf("unexpected list call")
    69  	}
    70  	expectedContinue := fmt.Sprintf("%s:%d", p.rv, p.last)
    71  	if options.Limit != p.expectPage || (p.continuing && options.Continue != expectedContinue) {
    72  		p.t.Errorf("invariant violated, expected limit %d and continue %s, got %#v", p.expectPage, expectedContinue, options)
    73  		return nil, fmt.Errorf("invariant violated")
    74  	}
    75  	if options.Continue != "" && options.ResourceVersion != "" {
    76  		p.t.Errorf("invariant violated, specifying resource version (%s) is not allowed when using continue (%s).", options.ResourceVersion, options.Continue)
    77  		return nil, fmt.Errorf("invariant violated")
    78  	}
    79  	if options.Continue != "" && options.ResourceVersionMatch != "" {
    80  		p.t.Errorf("invariant violated, specifying resource version match type (%s) is not allowed when using continue (%s).", options.ResourceVersionMatch, options.Continue)
    81  		return nil, fmt.Errorf("invariant violated")
    82  	}
    83  	var list metainternalversion.List
    84  	total := options.Limit
    85  	if total == 0 {
    86  		total = int64(p.remaining)
    87  	}
    88  	for i := int64(0); i < total; i++ {
    89  		if p.remaining <= 0 {
    90  			break
    91  		}
    92  		list.Items = append(list.Items, &metav1beta1.PartialObjectMetadata{
    93  			ObjectMeta: metav1.ObjectMeta{
    94  				Name: fmt.Sprintf("%d", p.index),
    95  			},
    96  		})
    97  		p.remaining--
    98  		p.index++
    99  	}
   100  	p.last = p.index
   101  	if p.remaining > 0 {
   102  		list.Continue = fmt.Sprintf("%s:%d", p.rv, p.last)
   103  		p.continuing = true
   104  	} else {
   105  		p.done = true
   106  	}
   107  	list.ResourceVersion = p.rv
   108  	return &list, nil
   109  }
   110  
   111  func (p *testPager) ExpiresOnSecondPage(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
   112  	if p.continuing {
   113  		p.done = true
   114  		return nil, errors.NewResourceExpired("this list has expired")
   115  	}
   116  	return p.PagedList(ctx, options)
   117  }
   118  
   119  func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
   120  	if p.continuing {
   121  		p.reset()
   122  		p.expectPage = 0
   123  		return nil, errors.NewResourceExpired("this list has expired")
   124  	}
   125  	return p.PagedList(ctx, options)
   126  }
   127  
   128  func TestListPager_List(t *testing.T) {
   129  	type fields struct {
   130  		PageSize          int64
   131  		PageFn            ListPageFunc
   132  		FullListIfExpired bool
   133  	}
   134  	type args struct {
   135  		ctx     context.Context
   136  		options metav1.ListOptions
   137  	}
   138  	tests := []struct {
   139  		name      string
   140  		fields    fields
   141  		args      args
   142  		want      runtime.Object
   143  		wantPaged bool
   144  		wantErr   bool
   145  		isExpired bool
   146  	}{
   147  		{
   148  			name:      "empty page",
   149  			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
   150  			args:      args{},
   151  			want:      list(0, "rv:20"),
   152  			wantPaged: false,
   153  		},
   154  		{
   155  			name:      "one page",
   156  			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
   157  			args:      args{},
   158  			want:      list(9, "rv:20"),
   159  			wantPaged: false,
   160  		},
   161  		{
   162  			name:      "one full page",
   163  			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
   164  			args:      args{},
   165  			want:      list(10, "rv:20"),
   166  			wantPaged: false,
   167  		},
   168  		{
   169  			name:      "two pages",
   170  			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
   171  			args:      args{},
   172  			want:      list(11, "rv:20"),
   173  			wantPaged: true,
   174  		},
   175  		{
   176  			name:      "three pages",
   177  			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
   178  			args:      args{},
   179  			want:      list(21, "rv:20"),
   180  			wantPaged: true,
   181  		},
   182  		{
   183  			name:      "expires on second page",
   184  			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
   185  			args:      args{},
   186  			wantPaged: true,
   187  			wantErr:   true,
   188  			isExpired: true,
   189  		},
   190  		{
   191  			name: "expires on second page and then lists",
   192  			fields: fields{
   193  				FullListIfExpired: true,
   194  				PageSize:          10,
   195  				PageFn:            (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList,
   196  			},
   197  			args:      args{},
   198  			want:      list(21, "rv:20"),
   199  			wantPaged: true,
   200  		},
   201  		{
   202  			name:      "two pages with resourceVersion",
   203  			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
   204  			args:      args{options: metav1.ListOptions{ResourceVersion: "rv:10"}},
   205  			want:      list(11, "rv:20"),
   206  			wantPaged: true,
   207  		},
   208  		{
   209  			name:      "two pages with resourceVersion and resourceVersionMatch",
   210  			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
   211  			args:      args{options: metav1.ListOptions{ResourceVersion: "rv:10", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}},
   212  			want:      list(11, "rv:20"),
   213  			wantPaged: true,
   214  		},
   215  	}
   216  	for _, tt := range tests {
   217  		t.Run(tt.name, func(t *testing.T) {
   218  			p := &ListPager{
   219  				PageSize:          tt.fields.PageSize,
   220  				PageFn:            tt.fields.PageFn,
   221  				FullListIfExpired: tt.fields.FullListIfExpired,
   222  			}
   223  			ctx := tt.args.ctx
   224  			if ctx == nil {
   225  				ctx = context.Background()
   226  			}
   227  			got, paginatedResult, err := p.List(ctx, tt.args.options)
   228  			if (err != nil) != tt.wantErr {
   229  				t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr)
   230  				return
   231  			}
   232  			if tt.isExpired != errors.IsResourceExpired(err) {
   233  				t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired)
   234  				return
   235  			}
   236  			if tt.wantPaged != paginatedResult {
   237  				t.Errorf("paginatedResult = %t, want %t", paginatedResult, tt.wantPaged)
   238  			}
   239  			if !reflect.DeepEqual(got, tt.want) {
   240  				t.Errorf("ListPager.List() = %v, want %v", got, tt.want)
   241  			}
   242  		})
   243  	}
   244  }
   245  
   246  func TestListPager_EachListItem(t *testing.T) {
   247  	type fields struct {
   248  		PageSize int64
   249  		PageFn   ListPageFunc
   250  	}
   251  	tests := []struct {
   252  		name                 string
   253  		fields               fields
   254  		want                 runtime.Object
   255  		wantErr              bool
   256  		wantPanic            bool
   257  		isExpired            bool
   258  		processorErrorOnItem int
   259  		processorPanicOnItem int
   260  		cancelContextOnItem  int
   261  	}{
   262  		{
   263  			name:   "empty page",
   264  			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
   265  			want:   list(0, "rv:20"),
   266  		},
   267  		{
   268  			name:   "one page",
   269  			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
   270  			want:   list(9, "rv:20"),
   271  		},
   272  		{
   273  			name:   "one full page",
   274  			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
   275  			want:   list(10, "rv:20"),
   276  		},
   277  		{
   278  			name:   "two pages",
   279  			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
   280  			want:   list(11, "rv:20"),
   281  		},
   282  		{
   283  			name:   "three pages",
   284  			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
   285  			want:   list(21, "rv:20"),
   286  		},
   287  		{
   288  			name:      "expires on second page",
   289  			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
   290  			want:      list(10, "rv:20"), // all items on the first page should have been visited
   291  			wantErr:   true,
   292  			isExpired: true,
   293  		},
   294  		{
   295  			name:                 "error processing item",
   296  			fields:               fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
   297  			want:                 list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited
   298  			wantPanic:            true,
   299  			processorPanicOnItem: 3,
   300  		},
   301  		{
   302  			name:                "cancel context while processing",
   303  			fields:              fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
   304  			want:                list(10, "rv:20"), // The whole PageSize worth of items got returned.
   305  			wantErr:             true,
   306  			cancelContextOnItem: 3,
   307  		},
   308  	}
   309  
   310  	processorErr := fmt.Errorf("processor error")
   311  	for _, tt := range tests {
   312  		t.Run(tt.name, func(t *testing.T) {
   313  			ctx, cancel := context.WithCancel(context.Background())
   314  			p := &ListPager{
   315  				PageSize: tt.fields.PageSize,
   316  				PageFn:   tt.fields.PageFn,
   317  			}
   318  			var items []runtime.Object
   319  
   320  			fn := func(obj runtime.Object) error {
   321  				items = append(items, obj)
   322  				if tt.processorErrorOnItem > 0 && len(items) == tt.processorErrorOnItem {
   323  					return processorErr
   324  				}
   325  				if tt.processorPanicOnItem > 0 && len(items) == tt.processorPanicOnItem {
   326  					panic(processorErr)
   327  				}
   328  				if tt.cancelContextOnItem > 0 && len(items) == tt.cancelContextOnItem {
   329  					cancel()
   330  				}
   331  				return nil
   332  			}
   333  			var err error
   334  			var panic interface{}
   335  			func() {
   336  				defer func() {
   337  					panic = recover()
   338  				}()
   339  				err = p.EachListItem(ctx, metav1.ListOptions{}, fn)
   340  			}()
   341  			if (panic != nil) && !tt.wantPanic {
   342  				t.Errorf(".EachListItem() panic = %v, wantPanic %v", panic, tt.wantPanic)
   343  				return
   344  			}
   345  			if (err != nil) != tt.wantErr {
   346  				t.Errorf("ListPager.EachListItem() error = %v, wantErr %v", err, tt.wantErr)
   347  				return
   348  			}
   349  			if tt.isExpired != errors.IsResourceExpired(err) {
   350  				t.Errorf("ListPager.EachListItem() error = %v, isExpired %v", err, tt.isExpired)
   351  				return
   352  			}
   353  			if tt.processorErrorOnItem > 0 && err != processorErr {
   354  				t.Errorf("ListPager.EachListItem() error = %v, processorErrorOnItem %d", err, tt.processorErrorOnItem)
   355  				return
   356  			}
   357  			l := tt.want.(*metainternalversion.List)
   358  			if !reflect.DeepEqual(items, l.Items) {
   359  				t.Errorf("ListPager.EachListItem() = %v, want %v", items, l.Items)
   360  			}
   361  		})
   362  	}
   363  }
   364  
   365  func TestListPager_eachListPageBuffered(t *testing.T) {
   366  	tests := []struct {
   367  		name           string
   368  		totalPages     int
   369  		pagesProcessed int
   370  		wantPageLists  int
   371  		pageBufferSize int32
   372  		pageSize       int
   373  	}{
   374  		{
   375  			name:           "no buffer, one total page",
   376  			totalPages:     1,
   377  			pagesProcessed: 1,
   378  			wantPageLists:  1,
   379  			pageBufferSize: 0,
   380  		}, {
   381  			name:           "no buffer, 1/5 pages processed",
   382  			totalPages:     5,
   383  			pagesProcessed: 1,
   384  			wantPageLists:  2, // 1 received for processing, 1 listed
   385  			pageBufferSize: 0,
   386  		},
   387  		{
   388  			name:           "no buffer, 2/5 pages processed",
   389  			totalPages:     5,
   390  			pagesProcessed: 2,
   391  			wantPageLists:  3,
   392  			pageBufferSize: 0,
   393  		},
   394  		{
   395  			name:           "no buffer, 5/5 pages processed",
   396  			totalPages:     5,
   397  			pagesProcessed: 5,
   398  			wantPageLists:  5,
   399  			pageBufferSize: 0,
   400  		},
   401  		{
   402  			name:           "size 1 buffer, 1/5 pages processed",
   403  			totalPages:     5,
   404  			pagesProcessed: 1,
   405  			wantPageLists:  3,
   406  			pageBufferSize: 1,
   407  		},
   408  		{
   409  			name:           "size 1 buffer, 5/5 pages processed",
   410  			totalPages:     5,
   411  			pagesProcessed: 5,
   412  			wantPageLists:  5,
   413  			pageBufferSize: 1,
   414  		},
   415  		{
   416  			name:           "size 10 buffer, 1/5 page processed",
   417  			totalPages:     5,
   418  			pagesProcessed: 1,
   419  			wantPageLists:  5,
   420  			pageBufferSize: 10, // buffer is larger than list
   421  		},
   422  	}
   423  	processorErr := fmt.Errorf("processor error")
   424  	pageSize := 10
   425  	for _, tt := range tests {
   426  		t.Run(tt.name, func(t *testing.T) {
   427  			pgr := &testPager{t: t, expectPage: int64(pageSize), remaining: tt.totalPages * pageSize, rv: "rv:20"}
   428  			pageLists := 0
   429  			wantedPageListsDone := make(chan struct{})
   430  			listFn := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
   431  				pageLists++
   432  				if pageLists == tt.wantPageLists {
   433  					close(wantedPageListsDone)
   434  				}
   435  				return pgr.PagedList(ctx, options)
   436  			}
   437  			p := &ListPager{
   438  				PageSize:       int64(pageSize),
   439  				PageBufferSize: tt.pageBufferSize,
   440  				PageFn:         listFn,
   441  			}
   442  
   443  			pagesProcessed := 0
   444  			fn := func(obj runtime.Object) error {
   445  				pagesProcessed++
   446  				if tt.pagesProcessed == pagesProcessed && tt.wantPageLists > 0 {
   447  					// wait for buffering to catch up
   448  					select {
   449  					case <-time.After(time.Second):
   450  						return fmt.Errorf("Timed out waiting for %d page lists", tt.wantPageLists)
   451  					case <-wantedPageListsDone:
   452  					}
   453  					return processorErr
   454  				}
   455  				return nil
   456  			}
   457  			err := p.eachListChunkBuffered(context.Background(), metav1.ListOptions{}, fn)
   458  			if tt.pagesProcessed > 0 && err == processorErr {
   459  				// expected
   460  			} else if err != nil {
   461  				t.Fatal(err)
   462  			}
   463  			if tt.wantPageLists > 0 && pageLists != tt.wantPageLists {
   464  				t.Errorf("expected %d page lists, got %d", tt.wantPageLists, pageLists)
   465  			}
   466  			if pagesProcessed != tt.pagesProcessed {
   467  				t.Errorf("expected %d pages processed, got %d", tt.pagesProcessed, pagesProcessed)
   468  			}
   469  		})
   470  	}
   471  }
   472  

View as plain text