1
16
17 package etcd
18
19 import (
20 "context"
21 "encoding/json"
22 "net"
23 "net/http"
24 "os"
25 "strings"
26 "testing"
27 "time"
28
29 utiltesting "k8s.io/client-go/util/testing"
30
31 clientv3 "go.etcd.io/etcd/client/v3"
32
33 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
34 apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
35 "k8s.io/apimachinery/pkg/api/meta"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
38 "k8s.io/apimachinery/pkg/runtime"
39 "k8s.io/apimachinery/pkg/runtime/schema"
40 utilerrors "k8s.io/apimachinery/pkg/util/errors"
41 "k8s.io/apimachinery/pkg/util/wait"
42 genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
43 cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
44 "k8s.io/client-go/dynamic"
45 clientset "k8s.io/client-go/kubernetes"
46 restclient "k8s.io/client-go/rest"
47 "k8s.io/client-go/restmapper"
48 "k8s.io/kubernetes/cmd/kube-apiserver/app"
49 "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
50 "k8s.io/kubernetes/test/integration"
51 "k8s.io/kubernetes/test/integration/framework"
52 netutils "k8s.io/utils/net"
53
54
55 _ "k8s.io/kubernetes/pkg/controlplane"
56 )
57
58
59 const ecdsaPrivateKey = `-----BEGIN EC PRIVATE KEY-----
60 MHcCAQEEIEZmTmUhuanLjPA2CLquXivuwBDHTt5XYwgIr/kA1LtRoAoGCCqGSM49
61 AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0
62 /IR3qCXyThP/dbCiHrF3v1cuhBOHY8CLVg==
63 -----END EC PRIVATE KEY-----`
64
65
66 func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOptions)) *APIServer {
67 certDir, err := os.MkdirTemp("", t.Name())
68 if err != nil {
69 t.Fatal(err)
70 }
71
72 _, defaultServiceClusterIPRange, err := netutils.ParseCIDRSloppy("10.0.0.0/24")
73 if err != nil {
74 t.Fatal(err)
75 }
76
77 listener, _, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{})
78 if err != nil {
79 t.Fatal(err)
80 }
81
82 saSigningKeyFile, err := os.CreateTemp("/tmp", "insecure_test_key")
83 if err != nil {
84 t.Fatalf("create temp file failed: %v", err)
85 }
86 defer utiltesting.CloseAndRemove(t, saSigningKeyFile)
87 if err = os.WriteFile(saSigningKeyFile.Name(), []byte(ecdsaPrivateKey), 0666); err != nil {
88 t.Fatalf("write file %s failed: %v", saSigningKeyFile.Name(), err)
89 }
90
91 opts := options.NewServerRunOptions()
92 opts.Options.SecureServing.Listener = listener
93 opts.Options.SecureServing.ServerCert.CertDirectory = certDir
94 opts.Options.ServiceAccountSigningKeyFile = saSigningKeyFile.Name()
95 opts.Options.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()}
96 opts.Options.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON
97 opts.ServiceClusterIPRanges = defaultServiceClusterIPRange.String()
98 opts.Options.Authentication.APIAudiences = []string{"https://foo.bar.example.com"}
99 opts.Options.Authentication.ServiceAccounts.Issuers = []string{"https://foo.bar.example.com"}
100 opts.Options.Authentication.ServiceAccounts.KeyFiles = []string{saSigningKeyFile.Name()}
101 opts.Options.Authorization.Modes = []string{"RBAC"}
102 opts.Options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
103 opts.Options.APIEnablement.RuntimeConfig["api/all"] = "true"
104 for _, f := range configFuncs {
105 f(opts)
106 }
107 completedOptions, err := opts.Complete()
108 if err != nil {
109 t.Fatal(err)
110 }
111
112 if errs := completedOptions.Validate(); len(errs) != 0 {
113 t.Fatalf("failed to validate ServerRunOptions: %v", utilerrors.NewAggregate(errs))
114 }
115
116
117 rawClient, kvClient, err := integration.GetEtcdClients(completedOptions.Etcd.StorageConfig.Transport)
118 if err != nil {
119 t.Fatal(err)
120 }
121
122
123 if _, err := kvClient.Delete(context.Background(), "/registry/", clientv3.WithPrefix()); err != nil {
124 t.Fatal(err)
125 }
126
127 config, err := app.NewConfig(completedOptions)
128 if err != nil {
129 t.Fatal(err)
130 }
131 completed, err := config.Complete()
132 if err != nil {
133 t.Fatal(err)
134 }
135 kubeAPIServer, err := app.CreateServerChain(completed)
136 if err != nil {
137 t.Fatal(err)
138 }
139
140 kubeClientConfig := restclient.CopyConfig(kubeAPIServer.GenericAPIServer.LoopbackClientConfig)
141
142
143 kubeClientConfig.QPS = 99999
144 kubeClientConfig.Burst = 9999
145
146
147 restclient.SetDefaultWarningHandler(restclient.NoWarnings{})
148
149 kubeClient := clientset.NewForConfigOrDie(kubeClientConfig)
150
151 stopCh := make(chan struct{})
152 errCh := make(chan error)
153 go func() {
154
155 defer func() {
156 if err := recover(); err != nil {
157 t.Errorf("Unexpected panic trying to start API server: %#v", err)
158 }
159 }()
160 defer close(errCh)
161
162 prepared, err := kubeAPIServer.PrepareRun()
163 if err != nil {
164 errCh <- err
165 return
166 }
167 if err := prepared.Run(stopCh); err != nil {
168 errCh <- err
169 t.Error(err)
170 return
171 }
172 }()
173
174 lastHealth := ""
175 attempt := 0
176 if err := wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
177 select {
178 case err := <-errCh:
179 return false, err
180 default:
181 }
182
183
184 result := kubeClient.RESTClient().Get().AbsPath("/healthz").Do(context.TODO())
185 content, _ := result.Raw()
186 lastHealth = string(content)
187 if errResult := result.Error(); errResult != nil {
188 attempt++
189 if attempt < 10 {
190 t.Log("waiting for server to be healthy")
191 } else {
192 t.Log(errResult)
193 }
194 return false, nil
195 }
196 var status int
197 result.StatusCode(&status)
198 return status == http.StatusOK, nil
199 }); err != nil {
200 t.Log(lastHealth)
201 t.Fatal(err)
202 }
203
204
205 CreateTestCRDs(t, apiextensionsclientset.NewForConfigOrDie(kubeClientConfig), false, GetCustomResourceDefinitionData()...)
206
207
208 discoveryClient := cacheddiscovery.NewMemCacheClient(kubeClient.Discovery())
209 restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
210 restMapper.Reset()
211
212 _, serverResources, err := kubeClient.Discovery().ServerGroupsAndResources()
213 if err != nil {
214 t.Fatal(err)
215 }
216
217 cleanup := func() {
218
219
220 close(stopCh)
221
222
223
224 err, ok := <-errCh
225 if ok && err != nil {
226 t.Error(err)
227 }
228 rawClient.Close()
229 if err := os.RemoveAll(certDir); err != nil {
230 t.Log(err)
231 }
232 }
233
234 return &APIServer{
235 Client: kubeClient,
236 Dynamic: dynamic.NewForConfigOrDie(kubeClientConfig),
237 Config: kubeClientConfig,
238 KV: kvClient,
239 Mapper: restMapper,
240 Resources: GetResources(t, serverResources),
241 Cleanup: cleanup,
242 }
243 }
244
245
246
247 type APIServer struct {
248 Client clientset.Interface
249 Dynamic dynamic.Interface
250 Config *restclient.Config
251 KV clientv3.KV
252 Mapper meta.RESTMapper
253 Resources []Resource
254 Cleanup func()
255 }
256
257
258 type Resource struct {
259 Mapping *meta.RESTMapping
260 HasDeleteCollection bool
261 }
262
263
264 func GetResources(t *testing.T, serverResources []*metav1.APIResourceList) []Resource {
265 var resources []Resource
266
267 for _, discoveryGroup := range serverResources {
268 for _, discoveryResource := range discoveryGroup.APIResources {
269
270 if strings.Contains(discoveryResource.Name, "/") {
271 continue
272 }
273 hasCreate := false
274 hasGet := false
275 hasDeleteCollection := false
276 for _, verb := range discoveryResource.Verbs {
277 if verb == "get" {
278 hasGet = true
279 }
280 if verb == "create" {
281 hasCreate = true
282 }
283 if verb == "deletecollection" {
284 hasDeleteCollection = true
285 }
286 }
287 if !(hasCreate && hasGet) {
288 continue
289 }
290
291 resourceGV, err := schema.ParseGroupVersion(discoveryGroup.GroupVersion)
292 if err != nil {
293 t.Fatal(err)
294 }
295 gvk := resourceGV.WithKind(discoveryResource.Kind)
296 if len(discoveryResource.Group) > 0 || len(discoveryResource.Version) > 0 {
297 gvk = schema.GroupVersionKind{
298 Group: discoveryResource.Group,
299 Version: discoveryResource.Version,
300 Kind: discoveryResource.Kind,
301 }
302 }
303 gvr := resourceGV.WithResource(discoveryResource.Name)
304
305 resources = append(resources, Resource{
306 Mapping: &meta.RESTMapping{
307 Resource: gvr,
308 GroupVersionKind: gvk,
309 Scope: scope(discoveryResource.Namespaced),
310 },
311 HasDeleteCollection: hasDeleteCollection,
312 })
313 }
314 }
315
316 return resources
317 }
318
319 func scope(namespaced bool) meta.RESTScope {
320 if namespaced {
321 return meta.RESTScopeNamespace
322 }
323 return meta.RESTScopeRoot
324 }
325
326
327
328 func JSONToUnstructured(stub, namespace string, mapping *meta.RESTMapping, dynamicClient dynamic.Interface) (dynamic.ResourceInterface, *unstructured.Unstructured, error) {
329 typeMetaAdder := map[string]interface{}{}
330 if err := json.Unmarshal([]byte(stub), &typeMetaAdder); err != nil {
331 return nil, nil, err
332 }
333
334
335 typeMetaAdder["apiVersion"] = mapping.GroupVersionKind.GroupVersion().String()
336 typeMetaAdder["kind"] = mapping.GroupVersionKind.Kind
337
338 if mapping.Scope == meta.RESTScopeRoot {
339 namespace = ""
340 }
341
342 return dynamicClient.Resource(mapping.Resource).Namespace(namespace), &unstructured.Unstructured{Object: typeMetaAdder}, nil
343 }
344
345
346
347
348 func CreateTestCRDs(t *testing.T, client apiextensionsclientset.Interface, skipCrdExistsInDiscovery bool, crds ...*apiextensionsv1.CustomResourceDefinition) {
349 for _, crd := range crds {
350 createTestCRD(t, client, skipCrdExistsInDiscovery, crd)
351 }
352 }
353
354 func createTestCRD(t *testing.T, client apiextensionsclientset.Interface, skipCrdExistsInDiscovery bool, crd *apiextensionsv1.CustomResourceDefinition) {
355 if _, err := client.ApiextensionsV1().CustomResourceDefinitions().Create(context.TODO(), crd, metav1.CreateOptions{}); err != nil {
356 t.Fatalf("Failed to create %s CRD; %v", crd.Name, err)
357 }
358 if skipCrdExistsInDiscovery {
359 if err := waitForEstablishedCRD(client, crd.Name); err != nil {
360 t.Fatalf("Failed to establish %s CRD; %v", crd.Name, err)
361 }
362 return
363 }
364 if err := wait.PollImmediate(500*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
365 return CrdExistsInDiscovery(client, crd), nil
366 }); err != nil {
367 t.Fatalf("Failed to see %s in discovery: %v", crd.Name, err)
368 }
369 }
370
371 func waitForEstablishedCRD(client apiextensionsclientset.Interface, name string) error {
372 return wait.PollImmediate(500*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
373 crd, err := client.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), name, metav1.GetOptions{})
374 if err != nil {
375 return false, err
376 }
377 for _, cond := range crd.Status.Conditions {
378 switch cond.Type {
379 case apiextensionsv1.Established:
380 if cond.Status == apiextensionsv1.ConditionTrue {
381 return true, nil
382 }
383 }
384 }
385 return false, nil
386 })
387 }
388
389
390 func CrdExistsInDiscovery(client apiextensionsclientset.Interface, crd *apiextensionsv1.CustomResourceDefinition) bool {
391 var versions []string
392 for _, v := range crd.Spec.Versions {
393 if v.Served {
394 versions = append(versions, v.Name)
395 }
396 }
397 for _, v := range versions {
398 if !crdVersionExistsInDiscovery(client, crd, v) {
399 return false
400 }
401 }
402 return true
403 }
404
405 func crdVersionExistsInDiscovery(client apiextensionsclientset.Interface, crd *apiextensionsv1.CustomResourceDefinition, version string) bool {
406 resourceList, err := client.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + version)
407 if err != nil {
408 return false
409 }
410 for _, resource := range resourceList.APIResources {
411 if resource.Name == crd.Spec.Names.Plural {
412 return true
413 }
414 }
415 return false
416 }
417
View as plain text