1
16
17 package netpol
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "github.com/onsi/ginkgo/v2"
25 networkingv1 "k8s.io/api/networking/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/util/wait"
28 "k8s.io/kubernetes/test/e2e/framework"
29 "sigs.k8s.io/yaml"
30 )
31
32 const (
33 waitInterval = 1 * time.Second
34 waitTimeout = 30 * time.Second
35 )
36
37
38 func prettyPrint(policy *networkingv1.NetworkPolicy) string {
39 raw, err := yaml.Marshal(policy)
40 framework.ExpectNoError(err, "marshal network policy to yaml")
41 return string(raw)
42 }
43
44
45 func CreatePolicy(ctx context.Context, k8s *kubeManager, policy *networkingv1.NetworkPolicy, namespace string) {
46 if isVerbose {
47 framework.Logf("****************************************************************")
48 framework.Logf("Network Policy creating %s/%s \n%s", namespace, policy.Name, prettyPrint(policy))
49 framework.Logf("****************************************************************")
50 }
51
52 _, err := k8s.createNetworkPolicy(ctx, namespace, policy)
53 framework.ExpectNoError(err, "Unable to create netpol %s/%s", namespace, policy.Name)
54 }
55
56
57 func UpdatePolicy(ctx context.Context, k8s *kubeManager, policy *networkingv1.NetworkPolicy, namespace string) {
58 if isVerbose {
59 framework.Logf("****************************************************************")
60 framework.Logf("Network Policy updating %s/%s \n%s", namespace, policy.Name, prettyPrint(policy))
61 framework.Logf("****************************************************************")
62 }
63
64 _, err := k8s.updateNetworkPolicy(ctx, namespace, policy)
65 framework.ExpectNoError(err, "Unable to update netpol %s/%s", namespace, policy.Name)
66 }
67
68
69 func waitForHTTPServers(k *kubeManager, model *Model) error {
70 const maxTries = 10
71 framework.Logf("waiting for HTTP servers (ports 80 and/or 81) to become ready")
72
73 testCases := map[string]*TestCase{}
74 for _, port := range model.Ports {
75
76 for _, protocol := range model.Protocols {
77 fromPort := 81
78 desc := fmt.Sprintf("%d->%d,%s", fromPort, port, protocol)
79 testCases[desc] = &TestCase{ToPort: int(port), Protocol: protocol}
80 }
81 }
82 notReady := map[string]bool{}
83 for caseName := range testCases {
84 notReady[caseName] = true
85 }
86
87 for i := 0; i < maxTries; i++ {
88 for caseName, testCase := range testCases {
89 if notReady[caseName] {
90 reachability := NewReachability(k.AllPodStrings(), true)
91 testCase.Reachability = reachability
92 ProbePodToPodConnectivity(k, k.AllPods(), k.DNSDomain(), testCase)
93 _, wrong, _, _ := reachability.Summary(ignoreLoopback)
94 if wrong == 0 {
95 framework.Logf("server %s is ready", caseName)
96 delete(notReady, caseName)
97 } else {
98 framework.Logf("server %s is not ready", caseName)
99 }
100 }
101 }
102 if len(notReady) == 0 {
103 return nil
104 }
105 time.Sleep(waitInterval)
106 }
107 return fmt.Errorf("after %d tries, %d HTTP servers are not ready", maxTries, len(notReady))
108 }
109
110
111 func ValidateOrFail(k8s *kubeManager, testCase *TestCase) {
112 ginkgo.By("Validating reachability matrix...")
113
114
115
116 ginkgo.By("Validating reachability matrix... (FIRST TRY)")
117 ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase)
118
119
120
121 if _, wrong, _, _ := testCase.Reachability.Summary(ignoreLoopback); wrong != 0 {
122 framework.Logf("failed first probe %d wrong results ... retrying (SECOND TRY)", wrong)
123 ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase)
124 }
125
126
127 if _, wrong, _, _ := testCase.Reachability.Summary(ignoreLoopback); wrong != 0 {
128 testCase.Reachability.PrintSummary(true, true, true)
129 framework.Failf("Had %d wrong results in reachability matrix", wrong)
130 }
131 if isVerbose {
132 testCase.Reachability.PrintSummary(true, true, true)
133 }
134 framework.Logf("VALIDATION SUCCESSFUL")
135 }
136
137
138 func AddNamespaceLabel(ctx context.Context, k8s *kubeManager, name string, key string, val string) {
139 ns, err := k8s.getNamespace(ctx, name)
140 framework.ExpectNoError(err, "Unable to get namespace %s", name)
141 ns.Labels[key] = val
142 _, err = k8s.clientSet.CoreV1().Namespaces().Update(ctx, ns, metav1.UpdateOptions{})
143 framework.ExpectNoError(err, "Unable to update namespace %s", name)
144 }
145
146
147 func DeleteNamespaceLabel(ctx context.Context, k8s *kubeManager, name string, key string) {
148 ns, err := k8s.getNamespace(ctx, name)
149 framework.ExpectNoError(err, "Unable to get namespace %s", name)
150 if _, ok := ns.Labels[key]; !ok {
151
152 return
153 }
154 delete(ns.Labels, key)
155 _, err = k8s.clientSet.CoreV1().Namespaces().Update(ctx, ns, metav1.UpdateOptions{})
156 framework.ExpectNoError(err, "Unable to update namespace %s", name)
157 }
158
159
160 func AddPodLabels(ctx context.Context, k8s *kubeManager, namespace string, name string, newPodLabels map[string]string) {
161 kubePod, err := k8s.clientSet.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
162 framework.ExpectNoError(err, "Unable to get pod %s/%s", namespace, name)
163 if kubePod.Labels == nil {
164 kubePod.Labels = map[string]string{}
165 }
166 for key, val := range newPodLabels {
167 kubePod.Labels[key] = val
168 }
169 _, err = k8s.clientSet.CoreV1().Pods(namespace).Update(ctx, kubePod, metav1.UpdateOptions{})
170 framework.ExpectNoError(err, "Unable to add pod %s/%s labels", namespace, name)
171
172 err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) {
173 waitForPod, err := k8s.getPod(ctx, namespace, name)
174 if err != nil {
175 return false, err
176 }
177 for key, expected := range newPodLabels {
178 if actual, ok := waitForPod.Labels[key]; !ok || (expected != actual) {
179 return false, nil
180 }
181 }
182 return true, nil
183 })
184 framework.ExpectNoError(err, "Unable to wait for pod %s/%s to update labels", namespace, name)
185 }
186
187
188 func ResetPodLabels(ctx context.Context, k8s *kubeManager, namespace string, name string) {
189 kubePod, err := k8s.clientSet.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
190 framework.ExpectNoError(err, "Unable to get pod %s/%s", namespace, name)
191 labels := map[string]string{
192 podNameLabelKey(): name,
193 }
194 kubePod.Labels = labels
195 _, err = k8s.clientSet.CoreV1().Pods(namespace).Update(ctx, kubePod, metav1.UpdateOptions{})
196 framework.ExpectNoError(err, "Unable to add pod %s/%s labels", namespace, name)
197
198 err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) {
199 waitForPod, err := k8s.getPod(ctx, namespace, name)
200 if err != nil {
201 return false, nil
202 }
203 for key, expected := range labels {
204 if actual, ok := waitForPod.Labels[key]; !ok || (expected != actual) {
205 return false, nil
206 }
207 }
208 return true, nil
209 })
210 framework.ExpectNoError(err, "Unable to wait for pod %s/%s to update labels", namespace, name)
211 }
212
View as plain text