1
16
17 package ipamperf
18
19 import (
20 "bytes"
21 "fmt"
22 "sort"
23 "sync"
24 "time"
25
26 "k8s.io/api/core/v1"
27 "k8s.io/client-go/informers"
28 clientset "k8s.io/client-go/kubernetes"
29 "k8s.io/client-go/tools/cache"
30 cloudprovider "k8s.io/cloud-provider"
31 "k8s.io/klog/v2"
32 "k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
33 controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
34 )
35
36
37 type Config struct {
38 CreateQPS int
39 KubeQPS int
40 CloudQPS int
41 NumNodes int
42 AllocatorType ipam.CIDRAllocatorType
43 Cloud cloudprovider.Interface
44 }
45
46 type nodeTime struct {
47 added time.Time
48 allocated time.Time
49 podCIDR string
50 }
51
52
53
54 type Observer struct {
55 numAdded int
56 numAllocated int
57 timing map[string]*nodeTime
58 numNodes int
59 stopChan chan struct{}
60 wg sync.WaitGroup
61 clientSet *clientset.Clientset
62 }
63
64
65 type JSONDuration time.Duration
66
67
68 type NodeDuration struct {
69 Name string
70 PodCIDR string
71 Duration JSONDuration
72 }
73
74
75 type Results struct {
76 Name string
77 Config *Config
78 Succeeded bool
79 MaxAllocTime JSONDuration
80 TotalAllocTime JSONDuration
81 NodeAllocTime []NodeDuration
82 }
83
84
85 func NewObserver(clientSet *clientset.Clientset, numNodes int) *Observer {
86 o := &Observer{
87 timing: map[string]*nodeTime{},
88 numNodes: numNodes,
89 clientSet: clientSet,
90 stopChan: make(chan struct{}),
91 }
92 return o
93 }
94
95
96
97 func (o *Observer) StartObserving() error {
98 o.monitor()
99 klog.Infof("Test observer started")
100 return nil
101 }
102
103
104
105 func (o *Observer) Results(name string, config *Config) *Results {
106 var (
107 firstAdd time.Time
108 lastAssignment time.Time
109 )
110 o.wg.Wait()
111 close(o.stopChan)
112
113 results := &Results{
114 Name: name,
115 Config: config,
116 Succeeded: o.numAdded == o.numNodes && o.numAllocated == o.numNodes,
117 MaxAllocTime: 0,
118 NodeAllocTime: []NodeDuration{},
119 }
120 for name, nTime := range o.timing {
121 addFound := !nTime.added.IsZero()
122 if addFound && (firstAdd.IsZero() || nTime.added.Before(firstAdd)) {
123 firstAdd = nTime.added
124 }
125 cidrFound := !nTime.allocated.IsZero()
126 if cidrFound && nTime.allocated.After(lastAssignment) {
127 lastAssignment = nTime.allocated
128 }
129 if addFound && cidrFound {
130 allocTime := nTime.allocated.Sub(nTime.added)
131 if allocTime > time.Duration(results.MaxAllocTime) {
132 results.MaxAllocTime = JSONDuration(allocTime)
133 }
134 results.NodeAllocTime = append(results.NodeAllocTime, NodeDuration{
135 Name: name, PodCIDR: nTime.podCIDR, Duration: JSONDuration(allocTime),
136 })
137 }
138 }
139 results.TotalAllocTime = JSONDuration(lastAssignment.Sub(firstAdd))
140 sort.Slice(results.NodeAllocTime, func(i, j int) bool {
141 return results.NodeAllocTime[i].Duration > results.NodeAllocTime[j].Duration
142 })
143 return results
144 }
145
146 func (o *Observer) monitor() {
147 o.wg.Add(1)
148
149 sharedInformer := informers.NewSharedInformerFactory(o.clientSet, 1*time.Second)
150 nodeInformer := sharedInformer.Core().V1().Nodes().Informer()
151
152 nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
153 AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) (err error) {
154 name := node.GetName()
155 if node.Spec.PodCIDR != "" {
156
157 return
158 }
159 nTime := &nodeTime{}
160 o.timing[name] = nTime
161 nTime.added = time.Now()
162 o.numAdded = o.numAdded + 1
163 return
164 }),
165 UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) (err error) {
166 name := newNode.GetName()
167 nTime, found := o.timing[name]
168 if !found {
169 return
170 }
171
172 if newNode.Spec.PodCIDR != "" && nTime.podCIDR == "" {
173 nTime.allocated = time.Now()
174 nTime.podCIDR = newNode.Spec.PodCIDR
175 o.numAllocated++
176 if o.numAllocated%10 == 0 {
177 klog.Infof("progress: %d/%d - %.2d%%", o.numAllocated, o.numNodes, (o.numAllocated * 100.0 / o.numNodes))
178 }
179
180
181 if o.numAdded == o.numNodes && o.numAllocated == o.numNodes {
182 klog.Info("All nodes assigned podCIDR")
183 o.wg.Done()
184 }
185 }
186 return
187 }),
188 })
189 sharedInformer.Start(o.stopChan)
190 }
191
192
193
194 func (results *Results) String() string {
195 var b bytes.Buffer
196 fmt.Fprintf(&b, "\n TestName: %s", results.Name)
197 fmt.Fprintf(&b, "\n NumNodes: %d, CreateQPS: %d, KubeQPS: %d, CloudQPS: %d, Allocator: %v",
198 results.Config.NumNodes, results.Config.CreateQPS, results.Config.KubeQPS,
199 results.Config.CloudQPS, results.Config.AllocatorType)
200 fmt.Fprintf(&b, "\n Succeeded: %v, TotalAllocTime: %v, MaxAllocTime: %v",
201 results.Succeeded, time.Duration(results.TotalAllocTime), time.Duration(results.MaxAllocTime))
202 fmt.Fprintf(&b, "\n %5s %-20s %-20s %s", "Num", "Node", "PodCIDR", "Duration (s)")
203 for i, d := range results.NodeAllocTime {
204 fmt.Fprintf(&b, "\n %5d %-20s %-20s %10.3f", i+1, d.Name, d.PodCIDR, time.Duration(d.Duration).Seconds())
205 }
206 return b.String()
207 }
208
209
210 func (jDuration *JSONDuration) MarshalJSON() ([]byte, error) {
211 return []byte(fmt.Sprintf("\"%s\"", time.Duration(*jDuration).String())), nil
212 }
213
214
215 func (jDuration *JSONDuration) UnmarshalJSON(b []byte) (err error) {
216 var d time.Duration
217 if d, err = time.ParseDuration(string(b[1 : len(b)-1])); err == nil {
218 *jDuration = JSONDuration(d)
219 }
220 return
221 }
222
View as plain text