...

Source file src/k8s.io/client-go/tools/cache/mutation_detector.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"fmt"
    21  	"os"
    22  	"reflect"
    23  	"strconv"
    24  	"sync"
    25  	"time"
    26  
    27  	"k8s.io/klog/v2"
    28  
    29  	"k8s.io/apimachinery/pkg/runtime"
    30  	"k8s.io/apimachinery/pkg/util/diff"
    31  )
    32  
    33  var mutationDetectionEnabled = false
    34  
    35  func init() {
    36  	mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
    37  }
    38  
    39  // MutationDetector is able to monitor objects for mutation within a limited window of time
    40  type MutationDetector interface {
    41  	// AddObject adds the given object to the set being monitored for a while from now
    42  	AddObject(obj interface{})
    43  
    44  	// Run starts the monitoring and does not return until the monitoring is stopped.
    45  	Run(stopCh <-chan struct{})
    46  }
    47  
    48  // NewCacheMutationDetector creates a new instance for the defaultCacheMutationDetector.
    49  func NewCacheMutationDetector(name string) MutationDetector {
    50  	if !mutationDetectionEnabled {
    51  		return dummyMutationDetector{}
    52  	}
    53  	klog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
    54  	return &defaultCacheMutationDetector{name: name, period: 1 * time.Second, retainDuration: 2 * time.Minute}
    55  }
    56  
    57  type dummyMutationDetector struct{}
    58  
    59  func (dummyMutationDetector) Run(stopCh <-chan struct{}) {
    60  }
    61  func (dummyMutationDetector) AddObject(obj interface{}) {
    62  }
    63  
    64  // defaultCacheMutationDetector gives a way to detect if a cached object has been mutated
    65  // It has a list of cached objects and their copies.  I haven't thought of a way
    66  // to see WHO is mutating it, just that it's getting mutated.
    67  type defaultCacheMutationDetector struct {
    68  	name   string
    69  	period time.Duration
    70  
    71  	// compareLock ensures only a single call to CompareObjects runs at a time
    72  	compareObjectsLock sync.Mutex
    73  
    74  	// addLock guards addedObjs between AddObject and CompareObjects
    75  	addedObjsLock sync.Mutex
    76  	addedObjs     []cacheObj
    77  
    78  	cachedObjs []cacheObj
    79  
    80  	retainDuration     time.Duration
    81  	lastRotated        time.Time
    82  	retainedCachedObjs []cacheObj
    83  
    84  	// failureFunc is injectable for unit testing.  If you don't have it, the process will panic.
    85  	// This panic is intentional, since turning on this detection indicates you want a strong
    86  	// failure signal.  This failure is effectively a p0 bug and you can't trust process results
    87  	// after a mutation anyway.
    88  	failureFunc func(message string)
    89  }
    90  
    91  // cacheObj holds the actual object and a copy
    92  type cacheObj struct {
    93  	cached interface{}
    94  	copied interface{}
    95  }
    96  
    97  func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
    98  	// we DON'T want protection from panics.  If we're running this code, we want to die
    99  	for {
   100  		if d.lastRotated.IsZero() {
   101  			d.lastRotated = time.Now()
   102  		} else if time.Since(d.lastRotated) > d.retainDuration {
   103  			d.retainedCachedObjs = d.cachedObjs
   104  			d.cachedObjs = nil
   105  			d.lastRotated = time.Now()
   106  		}
   107  
   108  		d.CompareObjects()
   109  
   110  		select {
   111  		case <-stopCh:
   112  			return
   113  		case <-time.After(d.period):
   114  		}
   115  	}
   116  }
   117  
   118  // AddObject makes a deep copy of the object for later comparison.  It only works on runtime.Object
   119  // but that covers the vast majority of our cached objects
   120  func (d *defaultCacheMutationDetector) AddObject(obj interface{}) {
   121  	if _, ok := obj.(DeletedFinalStateUnknown); ok {
   122  		return
   123  	}
   124  	if obj, ok := obj.(runtime.Object); ok {
   125  		copiedObj := obj.DeepCopyObject()
   126  
   127  		d.addedObjsLock.Lock()
   128  		defer d.addedObjsLock.Unlock()
   129  		d.addedObjs = append(d.addedObjs, cacheObj{cached: obj, copied: copiedObj})
   130  	}
   131  }
   132  
   133  func (d *defaultCacheMutationDetector) CompareObjects() {
   134  	d.compareObjectsLock.Lock()
   135  	defer d.compareObjectsLock.Unlock()
   136  
   137  	// move addedObjs into cachedObjs under lock
   138  	// this keeps the critical section small to avoid blocking AddObject while we compare cachedObjs
   139  	d.addedObjsLock.Lock()
   140  	d.cachedObjs = append(d.cachedObjs, d.addedObjs...)
   141  	d.addedObjs = nil
   142  	d.addedObjsLock.Unlock()
   143  
   144  	altered := false
   145  	for i, obj := range d.cachedObjs {
   146  		if !reflect.DeepEqual(obj.cached, obj.copied) {
   147  			fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied))
   148  			altered = true
   149  		}
   150  	}
   151  	for i, obj := range d.retainedCachedObjs {
   152  		if !reflect.DeepEqual(obj.cached, obj.copied) {
   153  			fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied))
   154  			altered = true
   155  		}
   156  	}
   157  
   158  	if altered {
   159  		msg := fmt.Sprintf("cache %s modified", d.name)
   160  		if d.failureFunc != nil {
   161  			d.failureFunc(msg)
   162  			return
   163  		}
   164  		panic(msg)
   165  	}
   166  }
   167  

View as plain text