1
16
17 package peerproxy
18
19 import (
20 "context"
21 "fmt"
22 "testing"
23 "time"
24
25 "github.com/stretchr/testify/assert"
26 "github.com/stretchr/testify/require"
27 v1 "k8s.io/api/batch/v1"
28 corev1 "k8s.io/api/core/v1"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/util/wait"
31 "k8s.io/apiserver/pkg/features"
32 "k8s.io/apiserver/pkg/server"
33 utilfeature "k8s.io/apiserver/pkg/util/feature"
34 "k8s.io/client-go/informers"
35 "k8s.io/client-go/kubernetes"
36 "k8s.io/client-go/transport"
37 "k8s.io/client-go/util/cert"
38 featuregatetesting "k8s.io/component-base/featuregate/testing"
39 "k8s.io/klog/v2"
40 kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
41 "k8s.io/kubernetes/pkg/controller/storageversiongc"
42 "k8s.io/kubernetes/pkg/controlplane"
43 kubefeatures "k8s.io/kubernetes/pkg/features"
44
45 "k8s.io/kubernetes/test/integration/framework"
46 testutil "k8s.io/kubernetes/test/utils"
47 "k8s.io/kubernetes/test/utils/ktesting"
48 )
49
50 func TestPeerProxiedRequest(t *testing.T) {
51
52 ktesting.SetDefaultVerbosity(1)
53 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
54 t.Cleanup(cancel)
55
56
57 transport.DialerStopCh = ctx.Done()
58
59
60 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
61 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
62 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)()
63
64
65 etcd := framework.SharedEtcd()
66
67
68 proxyCA, err := createProxyCertContent()
69 require.NoError(t, err)
70
71
72
73 server.SetHostnameFuncForTests("test-server-a")
74 serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{
75 EnableCertAuth: true,
76 ProxyCA: &proxyCA},
77 []string{}, etcd)
78 defer serverA.TearDownFn()
79
80
81
82 server.SetHostnameFuncForTests("test-server-b")
83 serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{
84 EnableCertAuth: true,
85 ProxyCA: &proxyCA},
86 []string{fmt.Sprintf("--runtime-config=%s", "batch/v1=false")}, etcd)
87 defer serverB.TearDownFn()
88
89 kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig)
90 require.NoError(t, err)
91
92 kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig)
93 require.NoError(t, err)
94
95
96 job := createJobResource()
97 _, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{})
98 require.NoError(t, err)
99
100 klog.Infof("\nServerA has created jobs\n")
101
102
103
104 jobsB, err := kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{})
105 klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items))
106 require.NoError(t, err)
107 assert.NotEmpty(t, jobsB)
108 assert.Equal(t, job.Name, jobsB.Items[0].Name)
109 }
110
111 func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
112
113 ktesting.SetDefaultVerbosity(1)
114 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
115 t.Cleanup(cancel)
116
117
118 transport.DialerStopCh = ctx.Done()
119
120
121 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
122 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
123 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)()
124
125
126 etcd := framework.SharedEtcd()
127
128
129 proxyCA, err := createProxyCertContent()
130 require.NoError(t, err)
131
132
133
134 controlplane.IdentityLeaseDurationSeconds = 10
135 controlplane.IdentityLeaseGCPeriod = time.Second
136 controlplane.IdentityLeaseRenewIntervalPeriod = 10 * time.Second
137
138
139
140 server.SetHostnameFuncForTests("test-server-a")
141 serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd)
142 kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig)
143 require.NoError(t, err)
144
145 informersA := informers.NewSharedInformerFactory(kubeClientSetA, time.Second)
146 setupStorageVersionGC(ctx, kubeClientSetA, informersA)
147
148
149 controlplane.IdentityLeaseDurationSeconds = 3600
150
151
152
153 server.SetHostnameFuncForTests("test-server-b")
154 serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{
155 fmt.Sprintf("--runtime-config=%v", "batch/v1=false")}, etcd)
156 defer serverB.TearDownFn()
157 kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig)
158 require.NoError(t, err)
159
160 informersB := informers.NewSharedInformerFactory(kubeClientSetB, time.Second)
161 setupStorageVersionGC(ctx, kubeClientSetB, informersB)
162
163
164
165 server.SetHostnameFuncForTests("test-server-c")
166 serverC := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd)
167 defer serverC.TearDownFn()
168
169
170 job := createJobResource()
171 _, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{})
172 require.NoError(t, err)
173 klog.Infof("\nServerA has created jobs\n")
174
175
176 serverA.TearDownFn()
177
178 var jobsB *v1.JobList
179
180 err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
181 jobsB, err = kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{})
182 if err != nil {
183 return false, nil
184 }
185 if jobsB != nil {
186 return true, nil
187 }
188 return false, nil
189 })
190 klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items))
191 require.NoError(t, err)
192 assert.NotEmpty(t, jobsB)
193 assert.Equal(t, job.Name, jobsB.Items[0].Name)
194 }
195
196 func setupStorageVersionGC(ctx context.Context, kubeClientSet *kubernetes.Clientset, informers informers.SharedInformerFactory) {
197 leaseInformer := informers.Coordination().V1().Leases()
198 storageVersionInformer := informers.Internal().V1alpha1().StorageVersions()
199 go leaseInformer.Informer().Run(ctx.Done())
200 go storageVersionInformer.Informer().Run(ctx.Done())
201
202 controller := storageversiongc.NewStorageVersionGC(ctx, kubeClientSet, leaseInformer, storageVersionInformer)
203 go controller.Run(ctx)
204 }
205
206 func createProxyCertContent() (kastesting.ProxyCA, error) {
207 result := kastesting.ProxyCA{}
208 proxySigningKey, err := testutil.NewPrivateKey()
209 if err != nil {
210 return result, err
211 }
212 proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
213 if err != nil {
214 return result, err
215 }
216
217 result = kastesting.ProxyCA{
218 ProxySigningCert: proxySigningCert,
219 ProxySigningKey: proxySigningKey,
220 }
221 return result, nil
222 }
223
224 func createJobResource() *v1.Job {
225 return &v1.Job{
226 ObjectMeta: metav1.ObjectMeta{
227 Name: "test-job",
228 Namespace: "default",
229 },
230 Spec: v1.JobSpec{
231 Template: corev1.PodTemplateSpec{
232 Spec: corev1.PodSpec{
233 Containers: []corev1.Container{
234 {
235 Name: "test",
236 Image: "test",
237 },
238 },
239 RestartPolicy: corev1.RestartPolicyNever,
240 },
241 },
242 },
243 }
244 }
245
View as plain text