...

Source file src/k8s.io/client-go/tools/pager/pager.go

Documentation: k8s.io/client-go/tools/pager

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package pager
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  
    23  	"k8s.io/apimachinery/pkg/api/errors"
    24  	"k8s.io/apimachinery/pkg/api/meta"
    25  	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    29  )
    30  
    31  const defaultPageSize = 500
    32  const defaultPageBufferSize = 10
    33  
    34  // ListPageFunc returns a list object for the given list options.
    35  type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error)
    36  
    37  // SimplePageFunc adapts a context-less list function into one that accepts a context.
    38  func SimplePageFunc(fn func(opts metav1.ListOptions) (runtime.Object, error)) ListPageFunc {
    39  	return func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
    40  		return fn(opts)
    41  	}
    42  }
    43  
    44  // ListPager assists client code in breaking large list queries into multiple
    45  // smaller chunks of PageSize or smaller. PageFn is expected to accept a
    46  // metav1.ListOptions that supports paging and return a list. The pager does
    47  // not alter the field or label selectors on the initial options list.
    48  type ListPager struct {
    49  	PageSize int64
    50  	PageFn   ListPageFunc
    51  
    52  	FullListIfExpired bool
    53  
    54  	// Number of pages to buffer
    55  	PageBufferSize int32
    56  }
    57  
    58  // New creates a new pager from the provided pager function using the default
    59  // options. It will fall back to a full list if an expiration error is encountered
    60  // as a last resort.
    61  func New(fn ListPageFunc) *ListPager {
    62  	return &ListPager{
    63  		PageSize:          defaultPageSize,
    64  		PageFn:            fn,
    65  		FullListIfExpired: true,
    66  		PageBufferSize:    defaultPageBufferSize,
    67  	}
    68  }
    69  
    70  // TODO: introduce other types of paging functions - such as those that retrieve from a list
    71  // of namespaces.
    72  
    73  // List returns a single list object, but attempts to retrieve smaller chunks from the
    74  // server to reduce the impact on the server. If the chunk attempt fails, it will load
    75  // the full list instead. The Limit field on options, if unset, will default to the page size.
    76  //
    77  // If items in the returned list are retained for different durations, and you want to avoid
    78  // retaining the whole slice returned by p.PageFn as long as any item is referenced,
    79  // use ListWithAlloc instead.
    80  func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
    81  	return p.list(ctx, options, false)
    82  }
    83  
    84  // ListWithAlloc works like List, but avoids retaining references to the items slice returned by p.PageFn.
    85  // It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn.
    86  //
    87  // If the items in the returned list are not retained, or are retained for the same duration, use List instead for memory efficiency.
    88  func (p *ListPager) ListWithAlloc(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
    89  	return p.list(ctx, options, true)
    90  }
    91  
    92  func (p *ListPager) list(ctx context.Context, options metav1.ListOptions, allocNew bool) (runtime.Object, bool, error) {
    93  	if options.Limit == 0 {
    94  		options.Limit = p.PageSize
    95  	}
    96  	requestedResourceVersion := options.ResourceVersion
    97  	requestedResourceVersionMatch := options.ResourceVersionMatch
    98  	var list *metainternalversion.List
    99  	paginatedResult := false
   100  
   101  	for {
   102  		select {
   103  		case <-ctx.Done():
   104  			return nil, paginatedResult, ctx.Err()
   105  		default:
   106  		}
   107  
   108  		obj, err := p.PageFn(ctx, options)
   109  		if err != nil {
   110  			// Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and
   111  			// the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from
   112  			// failing when the resource versions is established by the first page request falls out of the compaction
   113  			// during the subsequent list requests).
   114  			if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
   115  				return nil, paginatedResult, err
   116  			}
   117  			// the list expired while we were processing, fall back to a full list at
   118  			// the requested ResourceVersion.
   119  			options.Limit = 0
   120  			options.Continue = ""
   121  			options.ResourceVersion = requestedResourceVersion
   122  			options.ResourceVersionMatch = requestedResourceVersionMatch
   123  			result, err := p.PageFn(ctx, options)
   124  			return result, paginatedResult, err
   125  		}
   126  		m, err := meta.ListAccessor(obj)
   127  		if err != nil {
   128  			return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
   129  		}
   130  
   131  		// exit early and return the object we got if we haven't processed any pages
   132  		if len(m.GetContinue()) == 0 && list == nil {
   133  			return obj, paginatedResult, nil
   134  		}
   135  
   136  		// initialize the list and fill its contents
   137  		if list == nil {
   138  			list = &metainternalversion.List{Items: make([]runtime.Object, 0, options.Limit+1)}
   139  			list.ResourceVersion = m.GetResourceVersion()
   140  			list.SelfLink = m.GetSelfLink()
   141  		}
   142  		eachListItemFunc := meta.EachListItem
   143  		if allocNew {
   144  			eachListItemFunc = meta.EachListItemWithAlloc
   145  		}
   146  		if err := eachListItemFunc(obj, func(obj runtime.Object) error {
   147  			list.Items = append(list.Items, obj)
   148  			return nil
   149  		}); err != nil {
   150  			return nil, paginatedResult, err
   151  		}
   152  
   153  		// if we have no more items, return the list
   154  		if len(m.GetContinue()) == 0 {
   155  			return list, paginatedResult, nil
   156  		}
   157  
   158  		// set the next loop up
   159  		options.Continue = m.GetContinue()
   160  		// Clear the ResourceVersion(Match) on the subsequent List calls to avoid the
   161  		// `specifying resource version is not allowed when using continue` error.
   162  		// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
   163  		options.ResourceVersion = ""
   164  		options.ResourceVersionMatch = ""
   165  		// At this point, result is already paginated.
   166  		paginatedResult = true
   167  	}
   168  }
   169  
   170  // EachListItem fetches runtime.Object items using this ListPager and invokes fn on each item. If
   171  // fn returns an error, processing stops and that error is returned. If fn does not return an error,
   172  // any error encountered while retrieving the list from the server is returned. If the context
   173  // cancels or times out, the context error is returned. Since the list is retrieved in paginated
   174  // chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the pagination list
   175  // requests exceed the expiration limit of the apiserver being called.
   176  //
   177  // Items are retrieved in chunks from the server to reduce the impact on the server with up to
   178  // ListPager.PageBufferSize chunks buffered concurrently in the background.
   179  //
   180  // If items passed to fn are retained for different durations, and you want to avoid
   181  // retaining the whole slice returned by p.PageFn as long as any item is referenced,
   182  // use EachListItemWithAlloc instead.
   183  func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
   184  	return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
   185  		return meta.EachListItem(obj, fn)
   186  	})
   187  }
   188  
   189  // EachListItemWithAlloc works like EachListItem, but avoids retaining references to the items slice returned by p.PageFn.
   190  // It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn.
   191  //
   192  // If the items passed to fn are not retained, or are retained for the same duration, use EachListItem instead for memory efficiency.
   193  func (p *ListPager) EachListItemWithAlloc(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
   194  	return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
   195  		return meta.EachListItemWithAlloc(obj, fn)
   196  	})
   197  }
   198  
   199  // eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on
   200  // each list chunk.  If fn returns an error, processing stops and that error is returned. If fn does
   201  // not return an error, any error encountered while retrieving the list from the server is
   202  // returned. If the context cancels or times out, the context error is returned. Since the list is
   203  // retrieved in paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if
   204  // the pagination list requests exceed the expiration limit of the apiserver being called.
   205  //
   206  // Up to ListPager.PageBufferSize chunks are buffered concurrently in the background.
   207  func (p *ListPager) eachListChunkBuffered(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
   208  	if p.PageBufferSize < 0 {
   209  		return fmt.Errorf("ListPager.PageBufferSize must be >= 0, got %d", p.PageBufferSize)
   210  	}
   211  
   212  	// Ensure background goroutine is stopped if this call exits before all list items are
   213  	// processed. Cancelation error from this deferred cancel call is never returned to caller;
   214  	// either the list result has already been sent to bgResultC or the fn error is returned and
   215  	// the cancelation error is discarded.
   216  	ctx, cancel := context.WithCancel(ctx)
   217  	defer cancel()
   218  
   219  	chunkC := make(chan runtime.Object, p.PageBufferSize)
   220  	bgResultC := make(chan error, 1)
   221  	go func() {
   222  		defer utilruntime.HandleCrash()
   223  
   224  		var err error
   225  		defer func() {
   226  			close(chunkC)
   227  			bgResultC <- err
   228  		}()
   229  		err = p.eachListChunk(ctx, options, func(chunk runtime.Object) error {
   230  			select {
   231  			case chunkC <- chunk: // buffer the chunk, this can block
   232  			case <-ctx.Done():
   233  				return ctx.Err()
   234  			}
   235  			return nil
   236  		})
   237  	}()
   238  
   239  	for o := range chunkC {
   240  		select {
   241  		case <-ctx.Done():
   242  			return ctx.Err()
   243  		default:
   244  		}
   245  		err := fn(o)
   246  		if err != nil {
   247  			return err // any fn error should be returned immediately
   248  		}
   249  	}
   250  	// promote the results of our background goroutine to the foreground
   251  	return <-bgResultC
   252  }
   253  
   254  // eachListChunk fetches runtimeObject list chunks using this ListPager and invokes fn on each list
   255  // chunk. If fn returns an error, processing stops and that error is returned. If fn does not return
   256  // an error, any error encountered while retrieving the list from the server is returned. If the
   257  // context cancels or times out, the context error is returned. Since the list is retrieved in
   258  // paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the
   259  // pagination list requests exceed the expiration limit of the apiserver being called.
   260  func (p *ListPager) eachListChunk(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
   261  	if options.Limit == 0 {
   262  		options.Limit = p.PageSize
   263  	}
   264  	for {
   265  		select {
   266  		case <-ctx.Done():
   267  			return ctx.Err()
   268  		default:
   269  		}
   270  
   271  		obj, err := p.PageFn(ctx, options)
   272  		if err != nil {
   273  			return err
   274  		}
   275  		m, err := meta.ListAccessor(obj)
   276  		if err != nil {
   277  			return fmt.Errorf("returned object must be a list: %v", err)
   278  		}
   279  		if err := fn(obj); err != nil {
   280  			return err
   281  		}
   282  		// if we have no more items, return.
   283  		if len(m.GetContinue()) == 0 {
   284  			return nil
   285  		}
   286  		// set the next loop up
   287  		options.Continue = m.GetContinue()
   288  	}
   289  }
   290  

View as plain text