1
16
17 package prober
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "net/http"
24 "os"
25 "sync"
26 "sync/atomic"
27 "testing"
28 "time"
29
30 v1 "k8s.io/api/core/v1"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/types"
33 "k8s.io/apimachinery/pkg/util/intstr"
34 "k8s.io/client-go/kubernetes/fake"
35 "k8s.io/client-go/tools/record"
36 kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
37 "k8s.io/kubernetes/pkg/kubelet/prober/results"
38 "k8s.io/kubernetes/pkg/kubelet/status"
39 statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
40 kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
41 utilpointer "k8s.io/utils/pointer"
42 )
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 func TestTCPPortExhaustion(t *testing.T) {
64
65
66
67
68 t.Skip("skipping TCP port exhaustion tests")
69
70 const (
71 numTestPods = 1
72 numContainers = 600
73 )
74
75 tests := []struct {
76 name string
77 http bool
78 }{
79 {"TCP", false},
80 {"HTTP", true},
81 }
82 for _, tt := range tests {
83 t.Run(fmt.Sprintf(tt.name), func(t *testing.T) {
84 testRootDir := ""
85 if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
86 t.Fatalf("can't make a temp rootdir: %v", err)
87 } else {
88 testRootDir = tempDir
89 }
90 podManager := kubepod.NewBasicPodManager()
91 podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
92 m := NewManager(
93 status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir),
94 results.NewManager(),
95 results.NewManager(),
96 results.NewManager(),
97 nil,
98 &record.FakeRecorder{},
99 ).(*manager)
100 defer cleanup(t, m)
101
102 now := time.Now()
103 fakePods := make([]*fakePod, numTestPods)
104 for i := 0; i < numTestPods; i++ {
105 fake, err := newFakePod(tt.http)
106 if err != nil {
107 t.Fatalf("unexpected error creating fake pod: %v", err)
108 }
109 defer fake.stop()
110 handler := fake.probeHandler()
111 fakePods[i] = fake
112
113 pod := v1.Pod{
114 ObjectMeta: metav1.ObjectMeta{
115 UID: types.UID(fmt.Sprintf("pod%d", i)),
116 Name: fmt.Sprintf("pod%d", i),
117 Namespace: "test",
118 },
119 Spec: v1.PodSpec{},
120 Status: v1.PodStatus{
121 Phase: v1.PodPhase(v1.PodReady),
122 PodIPs: []v1.PodIP{{IP: "127.0.0.1"}},
123 },
124 }
125 for j := 0; j < numContainers; j++ {
126
127 pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
128 Name: fmt.Sprintf("container%d", j),
129 LivenessProbe: newProbe(handler),
130 })
131 pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
132 Name: fmt.Sprintf("container%d", j),
133 ContainerID: fmt.Sprintf("pod%d://container%d", i, j),
134 State: v1.ContainerState{
135 Running: &v1.ContainerStateRunning{
136 StartedAt: metav1.Now(),
137 },
138 },
139 Started: utilpointer.Bool(true),
140 })
141 }
142 podManager.AddPod(&pod)
143 m.statusManager.SetPodStatus(&pod, pod.Status)
144 m.AddPod(&pod)
145 }
146 t.Logf("Adding %d pods with %d containers each in %v", numTestPods, numContainers, time.Since(now))
147
148 ctx, cancel := context.WithTimeout(context.Background(), 59*time.Second)
149 defer cancel()
150 var wg sync.WaitGroup
151
152 wg.Add(1)
153 go func() {
154 defer wg.Done()
155 for {
156 var result results.Update
157 var probeType string
158 select {
159 case result = <-m.startupManager.Updates():
160 probeType = "startup"
161 case result = <-m.livenessManager.Updates():
162 probeType = "liveness"
163 case result = <-m.readinessManager.Updates():
164 probeType = "readiness"
165 case <-ctx.Done():
166 return
167 }
168 switch result.Result.String() {
169
170 case "Failure":
171 t.Errorf("Failure %s on contantinerID: %v Pod %v", probeType, result.ContainerID, result.PodUID)
172 case "UNKNOWN":
173 t.Logf("UNKNOWN state for %v", result)
174 default:
175 }
176 }
177 }()
178 wg.Wait()
179
180
181 for _, pod := range fakePods {
182 n := pod.connections()
183 t.Logf("Number of connections %d", n)
184 }
185
186 })
187 }
188
189 }
190
191 func newProbe(handler v1.ProbeHandler) *v1.Probe {
192 return &v1.Probe{
193 ProbeHandler: handler,
194 TimeoutSeconds: 1,
195 PeriodSeconds: 1,
196 SuccessThreshold: 1,
197 FailureThreshold: 3,
198 }
199 }
200
201
202 func newFakePod(httpServer bool) (*fakePod, error) {
203 ln, err := net.Listen("tcp", "127.0.0.1:0")
204 if err != nil {
205 return nil, fmt.Errorf("failed to bind: %v", err)
206 }
207 f := &fakePod{ln: ln, http: httpServer}
208
209
210 if httpServer {
211 var mu sync.Mutex
212 visitors := map[string]struct{}{}
213 go http.Serve(ln, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
214 mu.Lock()
215 defer mu.Unlock()
216 if _, ok := visitors[r.RemoteAddr]; !ok {
217 atomic.AddInt64(&f.numConnection, 1)
218 visitors[r.RemoteAddr] = struct{}{}
219 }
220 }))
221 } else {
222 go func() {
223 for {
224 conn, err := ln.Accept()
225 if err != nil {
226
227 return
228 }
229 atomic.AddInt64(&f.numConnection, 1)
230
231 go func(c net.Conn) {
232 defer c.Close()
233
234 buffer := make([]byte, 1024)
235 c.Read(buffer)
236
237 conn.Write([]byte("Hi back!\n"))
238 }(conn)
239
240 }
241 }()
242 }
243 return f, nil
244
245 }
246
247 type fakePod struct {
248 ln net.Listener
249 numConnection int64
250 http bool
251 }
252
253 func (f *fakePod) probeHandler() v1.ProbeHandler {
254 port := f.ln.Addr().(*net.TCPAddr).Port
255 var handler v1.ProbeHandler
256 if f.http {
257 handler = v1.ProbeHandler{
258 HTTPGet: &v1.HTTPGetAction{
259 Host: "127.0.0.1",
260 Port: intstr.FromInt32(int32(port)),
261 },
262 }
263 } else {
264 handler = v1.ProbeHandler{
265 TCPSocket: &v1.TCPSocketAction{
266 Host: "127.0.0.1",
267 Port: intstr.FromInt32(int32(port)),
268 },
269 }
270 }
271 return handler
272 }
273
274 func (f *fakePod) stop() {
275 f.ln.Close()
276 }
277
278 func (f *fakePod) connections() int {
279 return int(atomic.LoadInt64(&f.numConnection))
280 }
281
View as plain text