1
16
17 package netpol
18
19 import (
20 "fmt"
21
22 "github.com/onsi/ginkgo/v2"
23 v1 "k8s.io/api/core/v1"
24 "k8s.io/kubernetes/test/e2e/framework"
25 netutils "k8s.io/utils/net"
26 )
27
28
29 type probeConnectivityArgs struct {
30 nsFrom string
31 podFrom string
32 containerFrom string
33 addrTo string
34 protocol v1.Protocol
35 toPort int
36 expectConnectivity bool
37 timeoutSeconds int
38 pollIntervalSeconds int
39 pollTimeoutSeconds int
40 }
41
42
43 type Prober interface {
44 probeConnectivity(args *probeConnectivityArgs) (bool, string, error)
45 }
46
47
48 type ProbeJob struct {
49 PodFrom TestPod
50 PodTo TestPod
51 PodToServiceIP string
52 ToPort int
53 ToPodDNSDomain string
54 Protocol v1.Protocol
55 ExpectConnectivity bool
56 }
57
58
59 type ProbeJobResults struct {
60 Job *ProbeJob
61 IsConnected bool
62 Err error
63 Command string
64 }
65
66
67 func ProbePodToPodConnectivity(prober Prober, allPods []TestPod, dnsDomain string, testCase *TestCase) {
68 size := len(allPods) * len(allPods)
69 jobs := make(chan *ProbeJob, size)
70 results := make(chan *ProbeJobResults, size)
71 for i := 0; i < getWorkers(); i++ {
72 go probeWorker(prober, jobs, results)
73 }
74 for _, podFrom := range allPods {
75 for _, podTo := range allPods {
76
77
78 expectConnectivity := testCase.Reachability.Expected.Get(podFrom.PodString().String(), podTo.PodString().String())
79
80 jobs <- &ProbeJob{
81 PodFrom: podFrom,
82 PodTo: podTo,
83 ToPort: testCase.ToPort,
84 ToPodDNSDomain: dnsDomain,
85 Protocol: testCase.Protocol,
86 ExpectConnectivity: expectConnectivity,
87 }
88 }
89 }
90 close(jobs)
91
92 for i := 0; i < size; i++ {
93 result := <-results
94 job := result.Job
95 if result.Err != nil {
96 framework.Logf("unable to perform probe %s -> %s: %v", job.PodFrom.PodString(), job.PodTo.PodString(), result.Err)
97 }
98 testCase.Reachability.Observe(job.PodFrom.PodString(), job.PodTo.PodString(), result.IsConnected)
99 expected := testCase.Reachability.Expected.Get(job.PodFrom.PodString().String(), job.PodTo.PodString().String())
100 if result.IsConnected != expected {
101 framework.Logf("Validation of %s -> %s FAILED !!!", job.PodFrom.PodString(), job.PodTo.PodString())
102 framework.Logf("error %v ", result.Err)
103 if expected {
104 framework.Logf("Expected allowed pod connection was instead BLOCKED --- run '%v'", result.Command)
105 } else {
106 framework.Logf("Expected blocked pod connection was instead ALLOWED --- run '%v'", result.Command)
107 }
108 }
109 }
110 }
111
112
113
114 func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults) {
115 defer ginkgo.GinkgoRecover()
116 for job := range jobs {
117 podFrom := job.PodFrom
118
119 if netutils.ParseIPSloppy(job.PodTo.ServiceIP) == nil {
120 results <- &ProbeJobResults{
121 Job: job,
122 IsConnected: false,
123 Err: fmt.Errorf("empty service ip"),
124 }
125 }
126
127
128
129
130
131
132 connected, command, err := prober.probeConnectivity(&probeConnectivityArgs{
133 nsFrom: podFrom.Namespace,
134 podFrom: podFrom.Name,
135 containerFrom: podFrom.ContainerName,
136 addrTo: job.PodTo.ServiceIP,
137 protocol: job.Protocol,
138 toPort: job.ToPort,
139 expectConnectivity: job.ExpectConnectivity,
140 timeoutSeconds: getProbeTimeoutSeconds(),
141 pollIntervalSeconds: getPollIntervalSeconds(),
142 pollTimeoutSeconds: getPollTimeoutSeconds(),
143 })
144 result := &ProbeJobResults{
145 Job: job,
146 IsConnected: connected,
147 Err: err,
148 Command: command,
149 }
150 results <- result
151 }
152 }
153
View as plain text