1
2
3
4
19
20 package ipamperf
21
22 import (
23 "context"
24 "encoding/json"
25 "fmt"
26 "net"
27 "os"
28 "testing"
29 "time"
30
31 "k8s.io/klog/v2"
32 "k8s.io/klog/v2/ktesting"
33 netutils "k8s.io/utils/net"
34
35 "k8s.io/client-go/informers"
36 clientset "k8s.io/client-go/kubernetes"
37 restclient "k8s.io/client-go/rest"
38 "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
39 "k8s.io/kubernetes/pkg/controller/nodeipam"
40 "k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
41 "k8s.io/kubernetes/test/integration/framework"
42 "k8s.io/kubernetes/test/integration/util"
43 )
44
45 func setupAllocator(ctx context.Context, kubeConfig *restclient.Config, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*clientset.Clientset, util.ShutdownFunc, error) {
46 controllerStopChan := make(chan struct{})
47 shutdownFunc := func() {
48 close(controllerStopChan)
49 }
50
51 clientConfig := restclient.CopyConfig(kubeConfig)
52 clientConfig.QPS = float32(config.KubeQPS)
53 clientConfig.Burst = config.KubeQPS
54 clientSet := clientset.NewForConfigOrDie(clientConfig)
55
56 sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour)
57 ipamController, err := nodeipam.NewNodeIpamController(
58 ctx,
59 sharedInformer.Core().V1().Nodes(),
60 config.Cloud, clientSet, []*net.IPNet{clusterCIDR}, serviceCIDR, nil,
61 []int{subnetMaskSize}, config.AllocatorType,
62 )
63 if err != nil {
64 return nil, shutdownFunc, err
65 }
66 go ipamController.Run(ctx)
67 sharedInformer.Start(controllerStopChan)
68
69 return clientSet, shutdownFunc, nil
70 }
71
72 func runTest(t *testing.T, kubeConfig *restclient.Config, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*Results, error) {
73 t.Helper()
74 klog.Infof("Running test %s", t.Name())
75
76 nodeClientConfig := restclient.CopyConfig(kubeConfig)
77 nodeClientConfig.QPS = float32(config.CreateQPS)
78 nodeClientConfig.Burst = config.CreateQPS
79 nodeClient := clientset.NewForConfigOrDie(nodeClientConfig)
80
81 defer deleteNodes(nodeClient)
82 _, ctx := ktesting.NewTestContext(t)
83 clientSet, shutdownFunc, err := setupAllocator(ctx, kubeConfig, config, clusterCIDR, serviceCIDR, subnetMaskSize)
84 if err != nil {
85 t.Fatalf("Error starting IPAM allocator: %v", err)
86 }
87 defer shutdownFunc()
88
89 o := NewObserver(clientSet, config.NumNodes)
90 if err := o.StartObserving(); err != nil {
91 t.Fatalf("Could not start test observer: %v", err)
92 }
93
94 if err := createNodes(nodeClient, config); err != nil {
95 t.Fatalf("Could not create nodes: %v", err)
96 }
97
98 results := o.Results(t.Name(), config)
99 klog.Infof("Results: %s", results)
100 if !results.Succeeded {
101 t.Errorf("%s: Not allocations succeeded", t.Name())
102 }
103 return results, nil
104 }
105
106 func logResults(allResults []*Results) {
107 jStr, err := json.MarshalIndent(allResults, "", " ")
108 if err != nil {
109 klog.Errorf("Error formatting results: %v", err)
110 return
111 }
112 if resultsLogFile != "" {
113 klog.Infof("Logging results to %s", resultsLogFile)
114 if err := os.WriteFile(resultsLogFile, jStr, os.FileMode(0644)); err != nil {
115 klog.Errorf("Error logging results to %s: %v", resultsLogFile, err)
116 }
117 }
118 klog.Infof("AllResults:\n%s", string(jStr))
119 }
120
121 func TestPerformance(t *testing.T) {
122
123 if testing.Short() || true {
124
125 t.Skip("Skipping because we want to run short tests")
126 }
127
128 _, ctx := ktesting.NewTestContext(t)
129 ctx, cancel := context.WithCancel(ctx)
130 defer cancel()
131
132 _, kubeConfig, tearDownFn := framework.StartTestServer(ctx, t, framework.TestServerSetup{
133 ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
134
135 opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition"}
136 },
137 })
138 defer tearDownFn()
139
140 _, clusterCIDR, _ := netutils.ParseCIDRSloppy("10.96.0.0/11")
141 _, serviceCIDR, _ := netutils.ParseCIDRSloppy("10.94.0.0/24")
142 subnetMaskSize := 24
143
144 var (
145 allResults []*Results
146 tests []*Config
147 )
148
149 if isCustom {
150 tests = append(tests, customConfig)
151 } else {
152 for _, numNodes := range []int{10, 100} {
153 for _, alloc := range []ipam.CIDRAllocatorType{ipam.RangeAllocatorType, ipam.CloudAllocatorType, ipam.IPAMFromClusterAllocatorType, ipam.IPAMFromCloudAllocatorType} {
154 tests = append(tests, &Config{AllocatorType: alloc, NumNodes: numNodes, CreateQPS: numNodes, KubeQPS: 10, CloudQPS: 10})
155 }
156 }
157 }
158
159 for _, test := range tests {
160 testName := fmt.Sprintf("%s-KubeQPS%d-Nodes%d", test.AllocatorType, test.KubeQPS, test.NumNodes)
161 t.Run(testName, func(t *testing.T) {
162 allocateCIDR := false
163 if test.AllocatorType == ipam.IPAMFromCloudAllocatorType || test.AllocatorType == ipam.CloudAllocatorType {
164 allocateCIDR = true
165 }
166 bil := newBaseInstanceList(allocateCIDR, clusterCIDR, subnetMaskSize)
167 cloud, err := util.NewMockGCECloud(bil.newMockCloud())
168 if err != nil {
169 t.Fatalf("Unable to create mock cloud: %v", err)
170 }
171 test.Cloud = cloud
172 if results, err := runTest(t, kubeConfig, test, clusterCIDR, serviceCIDR, subnetMaskSize); err == nil {
173 allResults = append(allResults, results)
174 }
175 })
176 }
177
178 logResults(allResults)
179 }
180
View as plain text