1
16
17 package dualstack
18
19 import (
20 "fmt"
21 "testing"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 discovery "k8s.io/api/discovery/v1"
26 apierrors "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/util/intstr"
29 "k8s.io/apimachinery/pkg/util/wait"
30 "k8s.io/client-go/informers"
31 "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
32 "k8s.io/kubernetes/pkg/controller/endpoint"
33 "k8s.io/kubernetes/pkg/controller/endpointslice"
34 "k8s.io/kubernetes/test/integration/framework"
35 "k8s.io/kubernetes/test/utils/ktesting"
36 )
37
38 func TestDualStackEndpoints(t *testing.T) {
39
40 serviceCIDR := "10.0.0.0/16"
41 secondaryServiceCIDR := "2001:db8:1::/112"
42 labelMap := func() map[string]string {
43 return map[string]string{"foo": "bar"}
44 }
45
46 tCtx := ktesting.Init(t)
47
48 client, _, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
49 ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
50 opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR)
51
52 opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
53 },
54 })
55 defer tearDownFn()
56
57
58 if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
59 _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(tCtx, "kubernetes", metav1.GetOptions{})
60 if err != nil && !apierrors.IsNotFound(err) {
61 return false, err
62 }
63 return !apierrors.IsNotFound(err), nil
64 }); err != nil {
65 t.Fatalf("Creating kubernetes service timed out")
66 }
67
68 resyncPeriod := 0 * time.Hour
69 informers := informers.NewSharedInformerFactory(client, resyncPeriod)
70
71
72 testNode := &v1.Node{
73 ObjectMeta: metav1.ObjectMeta{
74 Name: "fakenode",
75 },
76 Spec: v1.NodeSpec{Unschedulable: false},
77 Status: v1.NodeStatus{
78 Conditions: []v1.NodeCondition{
79 {
80 Type: v1.NodeReady,
81 Status: v1.ConditionTrue,
82 Reason: fmt.Sprintf("schedulable condition"),
83 LastHeartbeatTime: metav1.Time{Time: time.Now()},
84 },
85 },
86 },
87 }
88 if _, err := client.CoreV1().Nodes().Create(tCtx, testNode, metav1.CreateOptions{}); err != nil {
89 t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
90 }
91
92 epController := endpoint.NewEndpointController(
93 tCtx,
94 informers.Core().V1().Pods(),
95 informers.Core().V1().Services(),
96 informers.Core().V1().Endpoints(),
97 client,
98 1*time.Second)
99
100 epsController := endpointslice.NewController(
101 tCtx,
102 informers.Core().V1().Pods(),
103 informers.Core().V1().Services(),
104 informers.Core().V1().Nodes(),
105 informers.Discovery().V1().EndpointSlices(),
106 int32(100),
107 client,
108 1*time.Second)
109
110
111 informers.Start(tCtx.Done())
112
113 go epController.Run(tCtx, 1)
114 go epsController.Run(tCtx, 1)
115
116 var testcases = []struct {
117 name string
118 serviceType v1.ServiceType
119 ipFamilies []v1.IPFamily
120 ipFamilyPolicy v1.IPFamilyPolicy
121 }{
122 {
123 name: "Service IPv4 Only",
124 serviceType: v1.ServiceTypeClusterIP,
125 ipFamilies: []v1.IPFamily{v1.IPv4Protocol},
126 ipFamilyPolicy: v1.IPFamilyPolicySingleStack,
127 },
128 {
129 name: "Service IPv6 Only",
130 serviceType: v1.ServiceTypeClusterIP,
131 ipFamilies: []v1.IPFamily{v1.IPv6Protocol},
132 ipFamilyPolicy: v1.IPFamilyPolicySingleStack,
133 },
134 {
135 name: "Service IPv6 IPv4 Dual Stack",
136 serviceType: v1.ServiceTypeClusterIP,
137 ipFamilies: []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol},
138 ipFamilyPolicy: v1.IPFamilyPolicyRequireDualStack,
139 },
140 {
141 name: "Service IPv4 IPv6 Dual Stack",
142 serviceType: v1.ServiceTypeClusterIP,
143 ipFamilies: []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol},
144 ipFamilyPolicy: v1.IPFamilyPolicyRequireDualStack,
145 },
146 }
147
148 for i, tc := range testcases {
149 t.Run(tc.name, func(t *testing.T) {
150 ns := framework.CreateNamespaceOrDie(client, fmt.Sprintf("test-endpointslice-dualstack-%d", i), t)
151 defer framework.DeleteNamespaceOrDie(client, ns, t)
152
153
154 pod := &v1.Pod{
155 ObjectMeta: metav1.ObjectMeta{
156 Name: "test-pod",
157 Namespace: ns.Name,
158 Labels: labelMap(),
159 },
160 Spec: v1.PodSpec{
161 NodeName: "fakenode",
162 Containers: []v1.Container{
163 {
164 Name: "fake-name",
165 Image: "fakeimage",
166 },
167 },
168 },
169 }
170
171 createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
172 if err != nil {
173 t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
174 }
175
176
177 podIPbyFamily := map[v1.IPFamily]string{v1.IPv4Protocol: "1.1.1.1", v1.IPv6Protocol: "2001:db2::65"}
178 createdPod.Status = v1.PodStatus{
179 Phase: v1.PodRunning,
180 PodIPs: []v1.PodIP{{IP: podIPbyFamily[v1.IPv4Protocol]}, {IP: podIPbyFamily[v1.IPv6Protocol]}},
181 }
182 _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
183 if err != nil {
184 t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
185 }
186
187 svc := &v1.Service{
188 ObjectMeta: metav1.ObjectMeta{
189 Name: fmt.Sprintf("svc-test-%d", i),
190 Namespace: ns.Name,
191 Labels: labelMap(),
192 },
193 Spec: v1.ServiceSpec{
194 Type: v1.ServiceTypeClusterIP,
195 IPFamilies: tc.ipFamilies,
196 IPFamilyPolicy: &tc.ipFamilyPolicy,
197 Selector: labelMap(),
198 Ports: []v1.ServicePort{
199 {
200 Name: fmt.Sprintf("port-test-%d", i),
201 Port: 443,
202 TargetPort: intstr.IntOrString{IntVal: 443},
203 Protocol: "TCP",
204 },
205 },
206 },
207 }
208
209
210 _, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
211 if err != nil {
212 t.Fatalf("Error creating service: %v", err)
213 }
214
215
216
217
218 if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
219 e, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
220 if err != nil {
221 t.Logf("Error fetching endpoints: %v", err)
222 return false, nil
223 }
224
225
226 if len(e.Subsets) > 0 && len(e.Subsets[0].NotReadyAddresses) > 0 {
227 if e.Subsets[0].NotReadyAddresses[0].IP == podIPbyFamily[tc.ipFamilies[0]] {
228 return true, nil
229 }
230 t.Logf("Endpoint address %s does not match PodIP %s ", e.Subsets[0].Addresses[0].IP, podIPbyFamily[tc.ipFamilies[0]])
231 }
232 t.Logf("Endpoint does not contain addresses: %s", e.Name)
233 return false, nil
234 }); err != nil {
235 t.Fatalf("Endpoints not found: %v", err)
236 }
237
238
239 err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
240 lSelector := discovery.LabelServiceName + "=" + svc.Name
241 esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(tCtx, metav1.ListOptions{LabelSelector: lSelector})
242 if err != nil {
243 t.Logf("Error listing EndpointSlices: %v", err)
244 return false, nil
245 }
246
247 if len(esList.Items) != len(tc.ipFamilies) {
248 t.Logf("Waiting for EndpointSlice to be created %v", esList)
249 return false, nil
250 }
251
252 for _, ipFamily := range tc.ipFamilies {
253 found := false
254 for _, slice := range esList.Items {
255
256 if len(slice.Endpoints) > 0 && len(slice.Endpoints[0].Addresses) > 0 {
257 if string(ipFamily) == string(slice.AddressType) &&
258 slice.Endpoints[0].Addresses[0] == podIPbyFamily[ipFamily] {
259 found = true
260 break
261 }
262 }
263 t.Logf("Waiting endpoint slice to contain addresses")
264 }
265 if !found {
266 t.Logf("Endpoint slices does not contain PodIP %s", podIPbyFamily[ipFamily])
267 return false, nil
268 }
269 }
270 return true, nil
271 })
272 if err != nil {
273 t.Fatalf("Error waiting for endpoint slices: %v", err)
274 }
275 })
276 }
277 }
278
View as plain text