1
16
17 package flowcontrol
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "math/rand"
24 "sync"
25 "testing"
26 "time"
27
28 flowcontrol "k8s.io/api/flowcontrol/v1"
29 utilfc "k8s.io/apiserver/pkg/util/flowcontrol"
30 fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
31 "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
32 "k8s.io/client-go/informers"
33 clientset "k8s.io/client-go/kubernetes"
34 "k8s.io/client-go/rest"
35 "k8s.io/client-go/tools/cache"
36 "k8s.io/utils/clock"
37 testclocks "k8s.io/utils/clock/testing"
38 )
39
40
59 type fightTest struct {
60 t *testing.T
61 ctx context.Context
62 loopbackConfig *rest.Config
63 teamSize int
64 stopCh chan struct{}
65 now time.Time
66 clk *testclocks.FakeClock
67 ctlrs map[bool][]utilfc.Interface
68
69 countsMutex sync.Mutex
70
71
72 writeCounts map[string]int
73 }
74
75 func newFightTest(t *testing.T, loopbackConfig *rest.Config, teamSize int) *fightTest {
76 now := time.Now()
77 ft := &fightTest{
78 t: t,
79 ctx: context.Background(),
80 loopbackConfig: loopbackConfig,
81 teamSize: teamSize,
82 stopCh: make(chan struct{}),
83 now: now,
84 clk: testclocks.NewFakeClock(now),
85 ctlrs: map[bool][]utilfc.Interface{
86 false: make([]utilfc.Interface, teamSize),
87 true: make([]utilfc.Interface, teamSize)},
88 writeCounts: map[string]int{},
89 }
90 return ft
91 }
92
93 func (ft *fightTest) createMainInformer() {
94 myConfig := rest.CopyConfig(ft.loopbackConfig)
95 myConfig = rest.AddUserAgent(myConfig, "audience")
96 myClientset := clientset.NewForConfigOrDie(myConfig)
97 informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
98 inf := informerFactory.Flowcontrol().V1().FlowSchemas().Informer()
99 inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
100 AddFunc: func(obj interface{}) {
101 fs := obj.(*flowcontrol.FlowSchema)
102 ft.countWrite(fs)
103 },
104 UpdateFunc: func(oldObj, newObj interface{}) {
105 fs := newObj.(*flowcontrol.FlowSchema)
106 ft.countWrite(fs)
107 },
108 })
109 go inf.Run(ft.stopCh)
110 if !cache.WaitForCacheSync(ft.stopCh, inf.HasSynced) {
111 ft.t.Errorf("Failed to sync main informer cache")
112 }
113 }
114
115 func (ft *fightTest) countWrite(fs *flowcontrol.FlowSchema) {
116 ft.countsMutex.Lock()
117 defer ft.countsMutex.Unlock()
118 ft.writeCounts[fs.Name]++
119 }
120
121 func (ft *fightTest) createController(invert bool, i int) {
122 fieldMgr := fmt.Sprintf("testController%d%v", i, invert)
123 myConfig := rest.CopyConfig(ft.loopbackConfig)
124 myConfig = rest.AddUserAgent(myConfig, fieldMgr)
125 myClientset := clientset.NewForConfigOrDie(myConfig)
126 fcIfc := myClientset.FlowcontrolV1()
127 informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
128 foundToDangling := func(found bool) bool { return !found }
129 if invert {
130 foundToDangling = func(found bool) bool { return found }
131 }
132 ctlr := utilfc.NewTestable(utilfc.TestableConfig{
133 Name: fieldMgr,
134 FoundToDangling: foundToDangling,
135 Clock: clock.RealClock{},
136 AsFieldManager: fieldMgr,
137 InformerFactory: informerFactory,
138 FlowcontrolClient: fcIfc,
139 ServerConcurrencyLimit: 200,
140 ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
141 ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
142 QueueSetFactory: fqtesting.NewNoRestraintFactory(),
143 })
144 ft.ctlrs[invert][i] = ctlr
145 informerFactory.Start(ft.stopCh)
146 go ctlr.Run(ft.stopCh)
147 }
148
149 func (ft *fightTest) evaluate(tBeforeCreate, tAfterCreate time.Time) {
150 tBeforeLock := time.Now()
151 ft.countsMutex.Lock()
152 defer ft.countsMutex.Unlock()
153 tAfterLock := time.Now()
154 minFightSecs := tBeforeLock.Sub(tAfterCreate).Seconds()
155 maxFightSecs := tAfterLock.Sub(tBeforeCreate).Seconds()
156 minTotalWrites := int(minFightSecs / 10)
157 maxWritesPerWriter := 6 * int(math.Ceil(maxFightSecs/60))
158 maxTotalWrites := (1 + ft.teamSize*2) * maxWritesPerWriter
159 for flowSchemaName, writeCount := range ft.writeCounts {
160 if writeCount < minTotalWrites {
161 ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been at least %d from %s to %s", writeCount, flowSchemaName, minTotalWrites, tAfterCreate, tBeforeLock)
162 } else if writeCount > maxTotalWrites {
163 ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been no more than %d from %s to %s", writeCount, flowSchemaName, maxTotalWrites, tBeforeCreate, tAfterLock)
164 } else {
165 ft.t.Logf("There were a total of %d writes to FlowSchema %s over %v, %v seconds", writeCount, flowSchemaName, minFightSecs, maxFightSecs)
166 }
167 }
168 }
169 func TestConfigConsumerFight(t *testing.T) {
170 _, kubeConfig, closeFn := setup(t, 100, 100)
171 defer closeFn()
172 const teamSize = 3
173 ft := newFightTest(t, kubeConfig, teamSize)
174 tBeforeCreate := time.Now()
175 ft.createMainInformer()
176 ft.foreach(ft.createController)
177 tAfterCreate := time.Now()
178 time.Sleep(110 * time.Second)
179 ft.evaluate(tBeforeCreate, tAfterCreate)
180 close(ft.stopCh)
181 }
182
183 func (ft *fightTest) foreach(visit func(invert bool, i int)) {
184 for i := 0; i < ft.teamSize; i++ {
185
186
187 invert := rand.Intn(2) == 0
188 visit(invert, i)
189 visit(!invert, i)
190 }
191 }
192
View as plain text