...

Source file src/k8s.io/kubernetes/test/integration/ipamperf/ipam_test.go

Documentation: k8s.io/kubernetes/test/integration/ipamperf

     1  //go:build !providerless
     2  // +build !providerless
     3  
     4  /*
     5  Copyright 2018 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package ipamperf
    21  
    22  import (
    23  	"context"
    24  	"encoding/json"
    25  	"fmt"
    26  	"net"
    27  	"os"
    28  	"testing"
    29  	"time"
    30  
    31  	"k8s.io/klog/v2"
    32  	"k8s.io/klog/v2/ktesting"
    33  	netutils "k8s.io/utils/net"
    34  
    35  	"k8s.io/client-go/informers"
    36  	clientset "k8s.io/client-go/kubernetes"
    37  	restclient "k8s.io/client-go/rest"
    38  	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
    39  	"k8s.io/kubernetes/pkg/controller/nodeipam"
    40  	"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
    41  	"k8s.io/kubernetes/test/integration/framework"
    42  	"k8s.io/kubernetes/test/integration/util"
    43  )
    44  
    45  func setupAllocator(ctx context.Context, kubeConfig *restclient.Config, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*clientset.Clientset, util.ShutdownFunc, error) {
    46  	controllerStopChan := make(chan struct{})
    47  	shutdownFunc := func() {
    48  		close(controllerStopChan)
    49  	}
    50  
    51  	clientConfig := restclient.CopyConfig(kubeConfig)
    52  	clientConfig.QPS = float32(config.KubeQPS)
    53  	clientConfig.Burst = config.KubeQPS
    54  	clientSet := clientset.NewForConfigOrDie(clientConfig)
    55  
    56  	sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour)
    57  	ipamController, err := nodeipam.NewNodeIpamController(
    58  		ctx,
    59  		sharedInformer.Core().V1().Nodes(),
    60  		config.Cloud, clientSet, []*net.IPNet{clusterCIDR}, serviceCIDR, nil,
    61  		[]int{subnetMaskSize}, config.AllocatorType,
    62  	)
    63  	if err != nil {
    64  		return nil, shutdownFunc, err
    65  	}
    66  	go ipamController.Run(ctx)
    67  	sharedInformer.Start(controllerStopChan)
    68  
    69  	return clientSet, shutdownFunc, nil
    70  }
    71  
    72  func runTest(t *testing.T, kubeConfig *restclient.Config, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*Results, error) {
    73  	t.Helper()
    74  	klog.Infof("Running test %s", t.Name())
    75  
    76  	nodeClientConfig := restclient.CopyConfig(kubeConfig)
    77  	nodeClientConfig.QPS = float32(config.CreateQPS)
    78  	nodeClientConfig.Burst = config.CreateQPS
    79  	nodeClient := clientset.NewForConfigOrDie(nodeClientConfig)
    80  
    81  	defer deleteNodes(nodeClient) // cleanup nodes on after controller shutdown
    82  	_, ctx := ktesting.NewTestContext(t)
    83  	clientSet, shutdownFunc, err := setupAllocator(ctx, kubeConfig, config, clusterCIDR, serviceCIDR, subnetMaskSize)
    84  	if err != nil {
    85  		t.Fatalf("Error starting IPAM allocator: %v", err)
    86  	}
    87  	defer shutdownFunc()
    88  
    89  	o := NewObserver(clientSet, config.NumNodes)
    90  	if err := o.StartObserving(); err != nil {
    91  		t.Fatalf("Could not start test observer: %v", err)
    92  	}
    93  
    94  	if err := createNodes(nodeClient, config); err != nil {
    95  		t.Fatalf("Could not create nodes: %v", err)
    96  	}
    97  
    98  	results := o.Results(t.Name(), config)
    99  	klog.Infof("Results: %s", results)
   100  	if !results.Succeeded {
   101  		t.Errorf("%s: Not allocations succeeded", t.Name())
   102  	}
   103  	return results, nil
   104  }
   105  
   106  func logResults(allResults []*Results) {
   107  	jStr, err := json.MarshalIndent(allResults, "", "  ")
   108  	if err != nil {
   109  		klog.Errorf("Error formatting results: %v", err)
   110  		return
   111  	}
   112  	if resultsLogFile != "" {
   113  		klog.Infof("Logging results to %s", resultsLogFile)
   114  		if err := os.WriteFile(resultsLogFile, jStr, os.FileMode(0644)); err != nil {
   115  			klog.Errorf("Error logging results to %s: %v", resultsLogFile, err)
   116  		}
   117  	}
   118  	klog.Infof("AllResults:\n%s", string(jStr))
   119  }
   120  
   121  func TestPerformance(t *testing.T) {
   122  	// TODO (#93112) skip test until appropriate timeout established
   123  	if testing.Short() || true {
   124  		// TODO (#61854) find why flakiness is caused by etcd connectivity before enabling always
   125  		t.Skip("Skipping because we want to run short tests")
   126  	}
   127  
   128  	_, ctx := ktesting.NewTestContext(t)
   129  	ctx, cancel := context.WithCancel(ctx)
   130  	defer cancel()
   131  
   132  	_, kubeConfig, tearDownFn := framework.StartTestServer(ctx, t, framework.TestServerSetup{
   133  		ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
   134  			// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
   135  			opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition"}
   136  		},
   137  	})
   138  	defer tearDownFn()
   139  
   140  	_, clusterCIDR, _ := netutils.ParseCIDRSloppy("10.96.0.0/11") // allows up to 8K nodes
   141  	_, serviceCIDR, _ := netutils.ParseCIDRSloppy("10.94.0.0/24") // does not matter for test - pick upto  250 services
   142  	subnetMaskSize := 24
   143  
   144  	var (
   145  		allResults []*Results
   146  		tests      []*Config
   147  	)
   148  
   149  	if isCustom {
   150  		tests = append(tests, customConfig)
   151  	} else {
   152  		for _, numNodes := range []int{10, 100} {
   153  			for _, alloc := range []ipam.CIDRAllocatorType{ipam.RangeAllocatorType, ipam.CloudAllocatorType, ipam.IPAMFromClusterAllocatorType, ipam.IPAMFromCloudAllocatorType} {
   154  				tests = append(tests, &Config{AllocatorType: alloc, NumNodes: numNodes, CreateQPS: numNodes, KubeQPS: 10, CloudQPS: 10})
   155  			}
   156  		}
   157  	}
   158  
   159  	for _, test := range tests {
   160  		testName := fmt.Sprintf("%s-KubeQPS%d-Nodes%d", test.AllocatorType, test.KubeQPS, test.NumNodes)
   161  		t.Run(testName, func(t *testing.T) {
   162  			allocateCIDR := false
   163  			if test.AllocatorType == ipam.IPAMFromCloudAllocatorType || test.AllocatorType == ipam.CloudAllocatorType {
   164  				allocateCIDR = true
   165  			}
   166  			bil := newBaseInstanceList(allocateCIDR, clusterCIDR, subnetMaskSize)
   167  			cloud, err := util.NewMockGCECloud(bil.newMockCloud())
   168  			if err != nil {
   169  				t.Fatalf("Unable to create mock cloud: %v", err)
   170  			}
   171  			test.Cloud = cloud
   172  			if results, err := runTest(t, kubeConfig, test, clusterCIDR, serviceCIDR, subnetMaskSize); err == nil {
   173  				allResults = append(allResults, results)
   174  			}
   175  		})
   176  	}
   177  
   178  	logResults(allResults)
   179  }
   180  

View as plain text