...

Source file src/k8s.io/client-go/tools/cache/synctrack/synctrack.go

Documentation: k8s.io/client-go/tools/cache/synctrack

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package synctrack contains utilities for helping controllers track whether
    18  // they are "synced" or not, that is, whether they have processed all items
    19  // from the informer's initial list.
    20  package synctrack
    21  
    22  import (
    23  	"sync"
    24  	"sync/atomic"
    25  
    26  	"k8s.io/apimachinery/pkg/util/sets"
    27  )
    28  
    29  // AsyncTracker helps propagate HasSynced in the face of multiple worker threads.
    30  type AsyncTracker[T comparable] struct {
    31  	UpstreamHasSynced func() bool
    32  
    33  	lock    sync.Mutex
    34  	waiting sets.Set[T]
    35  }
    36  
    37  // Start should be called prior to processing each key which is part of the
    38  // initial list.
    39  func (t *AsyncTracker[T]) Start(key T) {
    40  	t.lock.Lock()
    41  	defer t.lock.Unlock()
    42  	if t.waiting == nil {
    43  		t.waiting = sets.New[T](key)
    44  	} else {
    45  		t.waiting.Insert(key)
    46  	}
    47  }
    48  
    49  // Finished should be called when finished processing a key which was part of
    50  // the initial list. Since keys are tracked individually, nothing bad happens
    51  // if you call Finished without a corresponding call to Start. This makes it
    52  // easier to use this in combination with e.g. queues which don't make it easy
    53  // to plumb through the isInInitialList boolean.
    54  func (t *AsyncTracker[T]) Finished(key T) {
    55  	t.lock.Lock()
    56  	defer t.lock.Unlock()
    57  	if t.waiting != nil {
    58  		t.waiting.Delete(key)
    59  	}
    60  }
    61  
    62  // HasSynced returns true if the source is synced and every key present in the
    63  // initial list has been processed. This relies on the source not considering
    64  // itself synced until *after* it has delivered the notification for the last
    65  // key, and that notification handler must have called Start.
    66  func (t *AsyncTracker[T]) HasSynced() bool {
    67  	// Call UpstreamHasSynced first: it might take a lock, which might take
    68  	// a significant amount of time, and we can't hold our lock while
    69  	// waiting on that or a user is likely to get a deadlock.
    70  	if !t.UpstreamHasSynced() {
    71  		return false
    72  	}
    73  	t.lock.Lock()
    74  	defer t.lock.Unlock()
    75  	return t.waiting.Len() == 0
    76  }
    77  
    78  // SingleFileTracker helps propagate HasSynced when events are processed in
    79  // order (i.e. via a queue).
    80  type SingleFileTracker struct {
    81  	// Important: count is used with atomic operations so it must be 64-bit
    82  	// aligned, otherwise atomic operations will panic. Having it at the top of
    83  	// the struct will guarantee that, even on 32-bit arches.
    84  	// See https://pkg.go.dev/sync/atomic#pkg-note-BUG for more information.
    85  	count int64
    86  
    87  	UpstreamHasSynced func() bool
    88  }
    89  
    90  // Start should be called prior to processing each key which is part of the
    91  // initial list.
    92  func (t *SingleFileTracker) Start() {
    93  	atomic.AddInt64(&t.count, 1)
    94  }
    95  
    96  // Finished should be called when finished processing a key which was part of
    97  // the initial list. You must never call Finished() before (or without) its
    98  // corresponding Start(), that is a logic error that could cause HasSynced to
    99  // return a wrong value. To help you notice this should it happen, Finished()
   100  // will panic if the internal counter goes negative.
   101  func (t *SingleFileTracker) Finished() {
   102  	result := atomic.AddInt64(&t.count, -1)
   103  	if result < 0 {
   104  		panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value")
   105  	}
   106  }
   107  
   108  // HasSynced returns true if the source is synced and every key present in the
   109  // initial list has been processed. This relies on the source not considering
   110  // itself synced until *after* it has delivered the notification for the last
   111  // key, and that notification handler must have called Start.
   112  func (t *SingleFileTracker) HasSynced() bool {
   113  	// Call UpstreamHasSynced first: it might take a lock, which might take
   114  	// a significant amount of time, and we don't want to then act on a
   115  	// stale count value.
   116  	if !t.UpstreamHasSynced() {
   117  		return false
   118  	}
   119  	return atomic.LoadInt64(&t.count) <= 0
   120  }
   121  

View as plain text