...

Source file src/k8s.io/kubernetes/test/integration/apiserver/flowcontrol/fight_test.go

Documentation: k8s.io/kubernetes/test/integration/apiserver/flowcontrol

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package flowcontrol
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math"
    23  	"math/rand"
    24  	"sync"
    25  	"testing"
    26  	"time"
    27  
    28  	flowcontrol "k8s.io/api/flowcontrol/v1"
    29  	utilfc "k8s.io/apiserver/pkg/util/flowcontrol"
    30  	fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
    31  	"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
    32  	"k8s.io/client-go/informers"
    33  	clientset "k8s.io/client-go/kubernetes"
    34  	"k8s.io/client-go/rest"
    35  	"k8s.io/client-go/tools/cache"
    36  	"k8s.io/utils/clock"
    37  	testclocks "k8s.io/utils/clock/testing"
    38  )
    39  
    40  /*
    41  fightTest configures a test of how API Priority and Fairness config
    42  
    43  	controllers fight when they disagree on how to set FlowSchemaStatus.
    44  	In particular, they set the condition that indicates integrity of
    45  	the reference to the PriorityLevelConfiguration.  The scenario tested is
    46  	two teams of controllers, where the controllers in one team set the
    47  	condition normally and the controllers in the other team set the condition
    48  	to the opposite value.
    49  
    50  	This is a behavioral test: it instantiates these controllers and runs them
    51  	almost normally.  The test aims to run the controllers for a little under
    52  	2 minutes.  The test takes clock readings to get upper and lower bounds on
    53  	how long each controller ran, and calculates consequent bounds on the number
    54  	of writes that should happen to each FlowSchemaStatus.  The test creates
    55  	an informer to observe the writes.  The calculated lower bound on the
    56  	number of writes is very lax, assuming only that one write can be done
    57  	every 10 seconds.
    58  */
    59  type fightTest struct {
    60  	t              *testing.T
    61  	ctx            context.Context
    62  	loopbackConfig *rest.Config
    63  	teamSize       int
    64  	stopCh         chan struct{}
    65  	now            time.Time
    66  	clk            *testclocks.FakeClock
    67  	ctlrs          map[bool][]utilfc.Interface
    68  
    69  	countsMutex sync.Mutex
    70  
    71  	// writeCounts maps FlowSchema.Name to number of writes
    72  	writeCounts map[string]int
    73  }
    74  
    75  func newFightTest(t *testing.T, loopbackConfig *rest.Config, teamSize int) *fightTest {
    76  	now := time.Now()
    77  	ft := &fightTest{
    78  		t:              t,
    79  		ctx:            context.Background(),
    80  		loopbackConfig: loopbackConfig,
    81  		teamSize:       teamSize,
    82  		stopCh:         make(chan struct{}),
    83  		now:            now,
    84  		clk:            testclocks.NewFakeClock(now),
    85  		ctlrs: map[bool][]utilfc.Interface{
    86  			false: make([]utilfc.Interface, teamSize),
    87  			true:  make([]utilfc.Interface, teamSize)},
    88  		writeCounts: map[string]int{},
    89  	}
    90  	return ft
    91  }
    92  
    93  func (ft *fightTest) createMainInformer() {
    94  	myConfig := rest.CopyConfig(ft.loopbackConfig)
    95  	myConfig = rest.AddUserAgent(myConfig, "audience")
    96  	myClientset := clientset.NewForConfigOrDie(myConfig)
    97  	informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
    98  	inf := informerFactory.Flowcontrol().V1().FlowSchemas().Informer()
    99  	inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
   100  		AddFunc: func(obj interface{}) {
   101  			fs := obj.(*flowcontrol.FlowSchema)
   102  			ft.countWrite(fs)
   103  		},
   104  		UpdateFunc: func(oldObj, newObj interface{}) {
   105  			fs := newObj.(*flowcontrol.FlowSchema)
   106  			ft.countWrite(fs)
   107  		},
   108  	})
   109  	go inf.Run(ft.stopCh)
   110  	if !cache.WaitForCacheSync(ft.stopCh, inf.HasSynced) {
   111  		ft.t.Errorf("Failed to sync main informer cache")
   112  	}
   113  }
   114  
   115  func (ft *fightTest) countWrite(fs *flowcontrol.FlowSchema) {
   116  	ft.countsMutex.Lock()
   117  	defer ft.countsMutex.Unlock()
   118  	ft.writeCounts[fs.Name]++
   119  }
   120  
   121  func (ft *fightTest) createController(invert bool, i int) {
   122  	fieldMgr := fmt.Sprintf("testController%d%v", i, invert)
   123  	myConfig := rest.CopyConfig(ft.loopbackConfig)
   124  	myConfig = rest.AddUserAgent(myConfig, fieldMgr)
   125  	myClientset := clientset.NewForConfigOrDie(myConfig)
   126  	fcIfc := myClientset.FlowcontrolV1()
   127  	informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
   128  	foundToDangling := func(found bool) bool { return !found }
   129  	if invert {
   130  		foundToDangling = func(found bool) bool { return found }
   131  	}
   132  	ctlr := utilfc.NewTestable(utilfc.TestableConfig{
   133  		Name:                   fieldMgr,
   134  		FoundToDangling:        foundToDangling,
   135  		Clock:                  clock.RealClock{},
   136  		AsFieldManager:         fieldMgr,
   137  		InformerFactory:        informerFactory,
   138  		FlowcontrolClient:      fcIfc,
   139  		ServerConcurrencyLimit: 200, // server concurrency limit
   140  		ReqsGaugeVec:           metrics.PriorityLevelConcurrencyGaugeVec,
   141  		ExecSeatsGaugeVec:      metrics.PriorityLevelExecutionSeatsGaugeVec,
   142  		QueueSetFactory:        fqtesting.NewNoRestraintFactory(),
   143  	})
   144  	ft.ctlrs[invert][i] = ctlr
   145  	informerFactory.Start(ft.stopCh)
   146  	go ctlr.Run(ft.stopCh)
   147  }
   148  
   149  func (ft *fightTest) evaluate(tBeforeCreate, tAfterCreate time.Time) {
   150  	tBeforeLock := time.Now()
   151  	ft.countsMutex.Lock()
   152  	defer ft.countsMutex.Unlock()
   153  	tAfterLock := time.Now()
   154  	minFightSecs := tBeforeLock.Sub(tAfterCreate).Seconds()
   155  	maxFightSecs := tAfterLock.Sub(tBeforeCreate).Seconds()
   156  	minTotalWrites := int(minFightSecs / 10)
   157  	maxWritesPerWriter := 6 * int(math.Ceil(maxFightSecs/60))
   158  	maxTotalWrites := (1 + ft.teamSize*2) * maxWritesPerWriter
   159  	for flowSchemaName, writeCount := range ft.writeCounts {
   160  		if writeCount < minTotalWrites {
   161  			ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been at least %d from %s to %s", writeCount, flowSchemaName, minTotalWrites, tAfterCreate, tBeforeLock)
   162  		} else if writeCount > maxTotalWrites {
   163  			ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been no more than %d from %s to %s", writeCount, flowSchemaName, maxTotalWrites, tBeforeCreate, tAfterLock)
   164  		} else {
   165  			ft.t.Logf("There were a total of %d writes to FlowSchema %s over %v, %v seconds", writeCount, flowSchemaName, minFightSecs, maxFightSecs)
   166  		}
   167  	}
   168  }
   169  func TestConfigConsumerFight(t *testing.T) {
   170  	_, kubeConfig, closeFn := setup(t, 100, 100)
   171  	defer closeFn()
   172  	const teamSize = 3
   173  	ft := newFightTest(t, kubeConfig, teamSize)
   174  	tBeforeCreate := time.Now()
   175  	ft.createMainInformer()
   176  	ft.foreach(ft.createController)
   177  	tAfterCreate := time.Now()
   178  	time.Sleep(110 * time.Second)
   179  	ft.evaluate(tBeforeCreate, tAfterCreate)
   180  	close(ft.stopCh)
   181  }
   182  
   183  func (ft *fightTest) foreach(visit func(invert bool, i int)) {
   184  	for i := 0; i < ft.teamSize; i++ {
   185  		// The order of the following enumeration is not deterministic,
   186  		// and that is good.
   187  		invert := rand.Intn(2) == 0
   188  		visit(invert, i)
   189  		visit(!invert, i)
   190  	}
   191  }
   192  

View as plain text