1
16
17 package netpol
18
19 import (
20 "context"
21 "fmt"
22 v1 "k8s.io/api/core/v1"
23 networkingv1 "k8s.io/api/networking/v1"
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/apimachinery/pkg/util/wait"
26 clientset "k8s.io/client-go/kubernetes"
27 "k8s.io/kubernetes/test/e2e/framework"
28 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
29 netutils "k8s.io/utils/net"
30 "net"
31 "strconv"
32 "strings"
33 "time"
34 )
35
36
37 const defaultPollIntervalSeconds = 1
38
39
40 const defaultPollTimeoutSeconds = 10
41
42
43
44
45 type TestPod struct {
46 Namespace string
47 Name string
48 ContainerName string
49 ServiceIP string
50 }
51
52 func (pod TestPod) PodString() PodString {
53 return NewPodString(pod.Namespace, pod.Name)
54 }
55
56
57
58
59
60 type kubeManager struct {
61 framework *framework.Framework
62 clientSet clientset.Interface
63 namespaceNames []string
64 allPods []TestPod
65 allPodStrings []PodString
66 dnsDomain string
67 }
68
69
70 func newKubeManager(framework *framework.Framework, dnsDomain string) *kubeManager {
71 return &kubeManager{
72 framework: framework,
73 clientSet: framework.ClientSet,
74 dnsDomain: dnsDomain,
75 }
76 }
77
78
79 func (k *kubeManager) initializeClusterFromModel(ctx context.Context, model *Model) error {
80 var createdPods []*v1.Pod
81 for _, ns := range model.Namespaces {
82
83 namespace, err := k.framework.CreateNamespace(ctx, ns.BaseName, nil)
84 if err != nil {
85 return err
86 }
87 namespaceName := namespace.Name
88 k.namespaceNames = append(k.namespaceNames, namespaceName)
89
90 for _, pod := range ns.Pods {
91 framework.Logf("creating pod %s/%s with matching service", namespaceName, pod.Name)
92
93
94
95 kubePod, err := k.createPod(ctx, pod.KubePod(namespaceName))
96 if err != nil {
97 return err
98 }
99
100 createdPods = append(createdPods, kubePod)
101 svc, err := k.createService(ctx, pod.Service(namespaceName))
102 if err != nil {
103 return err
104 }
105 if netutils.ParseIPSloppy(svc.Spec.ClusterIP) == nil {
106 return fmt.Errorf("empty IP address found for service %s/%s", svc.Namespace, svc.Name)
107 }
108
109 k.allPods = append(k.allPods, TestPod{
110 Namespace: kubePod.Namespace,
111 Name: kubePod.Name,
112 ContainerName: pod.Containers[0].Name(),
113 ServiceIP: svc.Spec.ClusterIP,
114 })
115 k.allPodStrings = append(k.allPodStrings, NewPodString(kubePod.Namespace, kubePod.Name))
116 }
117 }
118
119 for _, createdPod := range createdPods {
120 err := e2epod.WaitForPodRunningInNamespace(ctx, k.clientSet, createdPod)
121 if err != nil {
122 return fmt.Errorf("unable to wait for pod %s/%s: %w", createdPod.Namespace, createdPod.Name, err)
123 }
124 }
125
126 return nil
127 }
128
129 func (k *kubeManager) AllPods() []TestPod {
130 return k.allPods
131 }
132
133 func (k *kubeManager) AllPodStrings() []PodString {
134 return k.allPodStrings
135 }
136
137 func (k *kubeManager) DNSDomain() string {
138 return k.dnsDomain
139 }
140
141 func (k *kubeManager) NamespaceNames() []string {
142 return k.namespaceNames
143 }
144
145
146 func (k *kubeManager) getPod(ctx context.Context, ns string, name string) (*v1.Pod, error) {
147 kubePod, err := k.clientSet.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
148 if err != nil {
149 return nil, fmt.Errorf("unable to get pod %s/%s: %w", ns, name, err)
150 }
151 return kubePod, nil
152 }
153
154
155
156 func (k *kubeManager) probeConnectivity(args *probeConnectivityArgs) (bool, string, error) {
157 port := strconv.Itoa(args.toPort)
158 if args.addrTo == "" {
159 return false, "no IP provided", fmt.Errorf("empty addrTo field")
160 }
161 framework.Logf("Starting probe from pod %v to %v", args.podFrom, args.addrTo)
162 var cmd []string
163 timeout := fmt.Sprintf("--timeout=%vs", args.timeoutSeconds)
164
165 switch args.protocol {
166 case v1.ProtocolSCTP:
167 cmd = []string{"/agnhost", "connect", net.JoinHostPort(args.addrTo, port), timeout, "--protocol=sctp"}
168 case v1.ProtocolTCP:
169 cmd = []string{"/agnhost", "connect", net.JoinHostPort(args.addrTo, port), timeout, "--protocol=tcp"}
170 case v1.ProtocolUDP:
171 cmd = []string{"/agnhost", "connect", net.JoinHostPort(args.addrTo, port), timeout, "--protocol=udp"}
172 if framework.NodeOSDistroIs("windows") {
173 framework.Logf("probing UDP for windows may result in cluster instability for certain windows nodes with low CPU/Memory, depending on CRI version")
174 }
175 default:
176 framework.Failf("protocol %s not supported", args.protocol)
177 }
178
179 commandDebugString := fmt.Sprintf("kubectl exec %s -c %s -n %s -- %s", args.podFrom, args.containerFrom, args.nsFrom, strings.Join(cmd, " "))
180
181 attempt := 0
182
183
184
185
186 var probeError error
187 var stderr string
188
189
190
191
192 conditionFunc := func() (bool, error) {
193 _, stderr, probeError = k.executeRemoteCommand(args.nsFrom, args.podFrom, args.containerFrom, cmd)
194
195 if args.expectConnectivity {
196 if probeError != nil {
197
198
199
200 framework.Logf("probe #%d :: connectivity expected :: %s/%s -> %s :: stderr - %s",
201 attempt+1, args.nsFrom, args.podFrom, args.addrTo, stderr,
202 )
203 attempt++
204 return false, nil
205 } else {
206
207 return true, nil
208 }
209 } else {
210 if probeError != nil {
211
212 return true, nil
213 } else {
214
215
216
217 framework.Logf(" probe #%d :: connectivity not expected :: %s/%s -> %s",
218 attempt+1, args.nsFrom, args.podFrom, args.addrTo,
219 )
220 attempt++
221 return false, nil
222 }
223 }
224 }
225
226
227 _ = wait.PollImmediate(
228 time.Duration(args.pollIntervalSeconds)*time.Second,
229 time.Duration(args.pollTimeoutSeconds)*time.Second,
230 conditionFunc,
231 )
232
233 if probeError != nil {
234 return false, commandDebugString, nil
235 }
236 return true, commandDebugString, nil
237 }
238
239
240 func (k *kubeManager) executeRemoteCommand(namespace string, pod string, containerName string, command []string) (string, string, error) {
241 return e2epod.ExecWithOptions(k.framework, e2epod.ExecOptions{
242 Command: command,
243 Namespace: namespace,
244 PodName: pod,
245 ContainerName: containerName,
246 Stdin: nil,
247 CaptureStdout: true,
248 CaptureStderr: true,
249 PreserveWhitespace: false,
250 })
251 }
252
253
254 func (k *kubeManager) createService(ctx context.Context, service *v1.Service) (*v1.Service, error) {
255 ns := service.Namespace
256 name := service.Name
257
258 createdService, err := k.clientSet.CoreV1().Services(ns).Create(ctx, service, metav1.CreateOptions{})
259 if err != nil {
260 return nil, fmt.Errorf("unable to create service %s/%s: %w", ns, name, err)
261 }
262 return createdService, nil
263 }
264
265
266 func (k *kubeManager) createPod(ctx context.Context, pod *v1.Pod) (*v1.Pod, error) {
267 ns := pod.Namespace
268 framework.Logf("creating pod %s/%s", ns, pod.Name)
269
270 createdPod, err := k.clientSet.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
271 if err != nil {
272 return nil, fmt.Errorf("unable to create pod %s/%s: %w", ns, pod.Name, err)
273 }
274 return createdPod, nil
275 }
276
277
278 func (k *kubeManager) cleanNetworkPolicies(ctx context.Context) error {
279 for _, ns := range k.namespaceNames {
280 framework.Logf("deleting policies in %s ..........", ns)
281 l, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).List(ctx, metav1.ListOptions{})
282 if err != nil {
283 return fmt.Errorf("unable to list network policies in ns %s: %w", ns, err)
284 }
285 for _, np := range l.Items {
286 framework.Logf("deleting network policy %s/%s", ns, np.Name)
287 err = k.clientSet.NetworkingV1().NetworkPolicies(ns).Delete(ctx, np.Name, metav1.DeleteOptions{})
288 if err != nil {
289 return fmt.Errorf("unable to delete network policy %s/%s: %w", ns, np.Name, err)
290 }
291 }
292 }
293 return nil
294 }
295
296
297 func (k *kubeManager) createNetworkPolicy(ctx context.Context, ns string, netpol *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
298 framework.Logf("creating network policy %s/%s", ns, netpol.Name)
299 netpol.ObjectMeta.Namespace = ns
300 np, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).Create(ctx, netpol, metav1.CreateOptions{})
301 if err != nil {
302 return nil, fmt.Errorf("unable to create network policy %s/%s: %w", ns, netpol.Name, err)
303 }
304 return np, nil
305 }
306
307
308 func (k *kubeManager) updateNetworkPolicy(ctx context.Context, ns string, netpol *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
309 framework.Logf("updating network policy %s/%s", ns, netpol.Name)
310 netpol.ObjectMeta.Namespace = ns
311 np, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).Update(ctx, netpol, metav1.UpdateOptions{})
312 if err != nil {
313 return np, fmt.Errorf("unable to update network policy %s/%s: %w", ns, netpol.Name, err)
314 }
315 return np, nil
316 }
317
318
319 func (k *kubeManager) getNamespace(ctx context.Context, ns string) (*v1.Namespace, error) {
320 selectedNameSpace, err := k.clientSet.CoreV1().Namespaces().Get(ctx, ns, metav1.GetOptions{})
321 if err != nil {
322 return nil, fmt.Errorf("unable to get namespace %s: %w", ns, err)
323 }
324 return selectedNameSpace, nil
325 }
326
327
328 func getProbeTimeoutSeconds() int {
329 timeoutSeconds := 1
330 if framework.NodeOSDistroIs("windows") {
331 timeoutSeconds = 3
332 }
333 return timeoutSeconds
334 }
335
336
337 func getWorkers() int {
338 return 3
339 }
340
341
342 func getPollIntervalSeconds() int {
343 return defaultPollIntervalSeconds
344 }
345
346
347 func getPollTimeoutSeconds() int {
348 if framework.NodeOSDistroIs("windows") {
349 return defaultPollTimeoutSeconds * 2
350 }
351 return defaultPollTimeoutSeconds
352 }
353
View as plain text