...

Source file src/k8s.io/kubernetes/pkg/kubelet/prober/scale_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/prober

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package prober
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"net/http"
    24  	"os"
    25  	"sync"
    26  	"sync/atomic"
    27  	"testing"
    28  	"time"
    29  
    30  	v1 "k8s.io/api/core/v1"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/types"
    33  	"k8s.io/apimachinery/pkg/util/intstr"
    34  	"k8s.io/client-go/kubernetes/fake"
    35  	"k8s.io/client-go/tools/record"
    36  	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
    37  	"k8s.io/kubernetes/pkg/kubelet/prober/results"
    38  	"k8s.io/kubernetes/pkg/kubelet/status"
    39  	statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
    40  	kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
    41  	utilpointer "k8s.io/utils/pointer"
    42  )
    43  
    44  // TCP sockets goes through a TIME-WAIT state (default 60 sec) before being freed,
    45  // causing conntrack entries and ephemeral ports to be hold for 60 seconds
    46  // despite the probe may have finished in less than 1 second.
    47  // If the rate of probes is higher than the rate the OS recycles the ports used,
    48  // it can consume a considerable number of ephemeral ports or conntrack entries.
    49  // These tests verify that after certain period the probes keep working, if the probes
    50  // don't close the sockets faster, they will start to fail.
    51  // The test creates a TCP or HTTP server to fake a pod. It creates 1 pod with 600 fake
    52  // containers each and runs one probe for each of these containers (all the probes comes
    53  // from the same process, same as in the Kubelet, and targets the same IP:port to verify
    54  // that the ephemeral port is not exhausted.
    55  
    56  // The default port range on a normal Linux system has 28321 free ephemeral ports per
    57  // tuple srcIP,srcPort:dstIP:dstPort:Proto: /proc/sys/net/ipv4/ip_local_port_range 32768 60999
    58  // 1 pods x 600 containers/pod x 1 probes/container x 1 req/sec = 600 req/sec
    59  // 600 req/sec x 59 sec = 35400
    60  // The test should run out of ephemeral ports in less than one minute and start failing connections
    61  // Ref: https://github.com/kubernetes/kubernetes/issues/89898#issuecomment-1383207322
    62  
    63  func TestTCPPortExhaustion(t *testing.T) {
    64  	// This test creates a considereable number of connections in a short time
    65  	// and flakes on constrained environments, thus it is skipped by default.
    66  	// The test is left for manual verification or experimentation with new
    67  	// changes on the probes.
    68  	t.Skip("skipping TCP port exhaustion tests")
    69  
    70  	const (
    71  		numTestPods   = 1
    72  		numContainers = 600
    73  	)
    74  
    75  	tests := []struct {
    76  		name string
    77  		http bool // it can be tcp or http
    78  	}{
    79  		{"TCP", false},
    80  		{"HTTP", true},
    81  	}
    82  	for _, tt := range tests {
    83  		t.Run(fmt.Sprintf(tt.name), func(t *testing.T) {
    84  			testRootDir := ""
    85  			if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
    86  				t.Fatalf("can't make a temp rootdir: %v", err)
    87  			} else {
    88  				testRootDir = tempDir
    89  			}
    90  			podManager := kubepod.NewBasicPodManager()
    91  			podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
    92  			m := NewManager(
    93  				status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir),
    94  				results.NewManager(),
    95  				results.NewManager(),
    96  				results.NewManager(),
    97  				nil, // runner
    98  				&record.FakeRecorder{},
    99  			).(*manager)
   100  			defer cleanup(t, m)
   101  
   102  			now := time.Now()
   103  			fakePods := make([]*fakePod, numTestPods)
   104  			for i := 0; i < numTestPods; i++ {
   105  				fake, err := newFakePod(tt.http)
   106  				if err != nil {
   107  					t.Fatalf("unexpected error creating fake pod: %v", err)
   108  				}
   109  				defer fake.stop()
   110  				handler := fake.probeHandler()
   111  				fakePods[i] = fake
   112  
   113  				pod := v1.Pod{
   114  					ObjectMeta: metav1.ObjectMeta{
   115  						UID:       types.UID(fmt.Sprintf("pod%d", i)),
   116  						Name:      fmt.Sprintf("pod%d", i),
   117  						Namespace: "test",
   118  					},
   119  					Spec: v1.PodSpec{},
   120  					Status: v1.PodStatus{
   121  						Phase:  v1.PodPhase(v1.PodReady),
   122  						PodIPs: []v1.PodIP{{IP: "127.0.0.1"}},
   123  					},
   124  				}
   125  				for j := 0; j < numContainers; j++ {
   126  					// use only liveness probes for simplicity, initial state is success for them
   127  					pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
   128  						Name:          fmt.Sprintf("container%d", j),
   129  						LivenessProbe: newProbe(handler),
   130  					})
   131  					pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
   132  						Name:        fmt.Sprintf("container%d", j),
   133  						ContainerID: fmt.Sprintf("pod%d://container%d", i, j),
   134  						State: v1.ContainerState{
   135  							Running: &v1.ContainerStateRunning{
   136  								StartedAt: metav1.Now(),
   137  							},
   138  						},
   139  						Started: utilpointer.Bool(true),
   140  					})
   141  				}
   142  				podManager.AddPod(&pod)
   143  				m.statusManager.SetPodStatus(&pod, pod.Status)
   144  				m.AddPod(&pod)
   145  			}
   146  			t.Logf("Adding %d pods with %d containers each in %v", numTestPods, numContainers, time.Since(now))
   147  
   148  			ctx, cancel := context.WithTimeout(context.Background(), 59*time.Second)
   149  			defer cancel()
   150  			var wg sync.WaitGroup
   151  
   152  			wg.Add(1)
   153  			go func() {
   154  				defer wg.Done()
   155  				for {
   156  					var result results.Update
   157  					var probeType string
   158  					select {
   159  					case result = <-m.startupManager.Updates():
   160  						probeType = "startup"
   161  					case result = <-m.livenessManager.Updates():
   162  						probeType = "liveness"
   163  					case result = <-m.readinessManager.Updates():
   164  						probeType = "readiness"
   165  					case <-ctx.Done():
   166  						return
   167  					}
   168  					switch result.Result.String() {
   169  					// The test will fail if any of the probes fails
   170  					case "Failure":
   171  						t.Errorf("Failure %s on contantinerID: %v Pod %v", probeType, result.ContainerID, result.PodUID)
   172  					case "UNKNOWN": // startup probes
   173  						t.Logf("UNKNOWN state for %v", result)
   174  					default:
   175  					}
   176  				}
   177  			}()
   178  			wg.Wait()
   179  
   180  			// log the number of connections received in each pod for debugging test failures.
   181  			for _, pod := range fakePods {
   182  				n := pod.connections()
   183  				t.Logf("Number of connections %d", n)
   184  			}
   185  
   186  		})
   187  	}
   188  
   189  }
   190  
   191  func newProbe(handler v1.ProbeHandler) *v1.Probe {
   192  	return &v1.Probe{
   193  		ProbeHandler:     handler,
   194  		TimeoutSeconds:   1,
   195  		PeriodSeconds:    1,
   196  		SuccessThreshold: 1,
   197  		FailureThreshold: 3,
   198  	}
   199  }
   200  
   201  // newFakePod runs a server (TCP or HTTP) in a random port
   202  func newFakePod(httpServer bool) (*fakePod, error) {
   203  	ln, err := net.Listen("tcp", "127.0.0.1:0")
   204  	if err != nil {
   205  		return nil, fmt.Errorf("failed to bind: %v", err)
   206  	}
   207  	f := &fakePod{ln: ln, http: httpServer}
   208  
   209  	// spawn an http server or a TCP server that counts the number of connections received
   210  	if httpServer {
   211  		var mu sync.Mutex
   212  		visitors := map[string]struct{}{}
   213  		go http.Serve(ln, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   214  			mu.Lock()
   215  			defer mu.Unlock()
   216  			if _, ok := visitors[r.RemoteAddr]; !ok {
   217  				atomic.AddInt64(&f.numConnection, 1)
   218  				visitors[r.RemoteAddr] = struct{}{}
   219  			}
   220  		}))
   221  	} else {
   222  		go func() {
   223  			for {
   224  				conn, err := ln.Accept()
   225  				if err != nil {
   226  					// exit when the listener is closed
   227  					return
   228  				}
   229  				atomic.AddInt64(&f.numConnection, 1)
   230  				// handle request but not block
   231  				go func(c net.Conn) {
   232  					defer c.Close()
   233  					// read but swallow the errors since the probe doesn't send data
   234  					buffer := make([]byte, 1024)
   235  					c.Read(buffer)
   236  					// respond
   237  					conn.Write([]byte("Hi back!\n"))
   238  				}(conn)
   239  
   240  			}
   241  		}()
   242  	}
   243  	return f, nil
   244  
   245  }
   246  
   247  type fakePod struct {
   248  	ln            net.Listener
   249  	numConnection int64
   250  	http          bool
   251  }
   252  
   253  func (f *fakePod) probeHandler() v1.ProbeHandler {
   254  	port := f.ln.Addr().(*net.TCPAddr).Port
   255  	var handler v1.ProbeHandler
   256  	if f.http {
   257  		handler = v1.ProbeHandler{
   258  			HTTPGet: &v1.HTTPGetAction{
   259  				Host: "127.0.0.1",
   260  				Port: intstr.FromInt32(int32(port)),
   261  			},
   262  		}
   263  	} else {
   264  		handler = v1.ProbeHandler{
   265  			TCPSocket: &v1.TCPSocketAction{
   266  				Host: "127.0.0.1",
   267  				Port: intstr.FromInt32(int32(port)),
   268  			},
   269  		}
   270  	}
   271  	return handler
   272  }
   273  
   274  func (f *fakePod) stop() {
   275  	f.ln.Close()
   276  }
   277  
   278  func (f *fakePod) connections() int {
   279  	return int(atomic.LoadInt64(&f.numConnection))
   280  }
   281  

View as plain text