...

Source file src/k8s.io/kubernetes/test/integration/etcd/etcd_storage_path_test.go

Documentation: k8s.io/kubernetes/test/integration/etcd

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package etcd
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"path/filepath"
    24  	"reflect"
    25  	"strings"
    26  	"testing"
    27  
    28  	"github.com/google/go-cmp/cmp"
    29  	clientv3 "go.etcd.io/etcd/client/v3"
    30  
    31  	v1 "k8s.io/api/core/v1"
    32  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    33  	"k8s.io/apimachinery/pkg/api/meta"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    36  	"k8s.io/apimachinery/pkg/runtime/schema"
    37  	"k8s.io/apimachinery/pkg/util/sets"
    38  	"k8s.io/apiserver/pkg/util/feature"
    39  	"k8s.io/client-go/dynamic"
    40  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    41  	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
    42  )
    43  
    44  // Only add kinds to this list when this a virtual resource with get and create verbs that doesn't actually
    45  // store into it's kind.  We've used this downstream for mappings before.
    46  var kindAllowList = sets.NewString()
    47  
    48  // namespace used for all tests, do not change this
    49  const testNamespace = "etcdstoragepathtestnamespace"
    50  
    51  // allowMissingTestdataFixtures contains the kinds expected to be missing serialization fixtures API testdata directory.
    52  // this should only contain custom resources and built-in types with open issues tracking adding serialization fixtures.
    53  // Do not add new built-in types to this list, add them to k8s.io/api/roundtrip_test.go instead.
    54  var allowMissingTestdataFixtures = map[schema.GroupVersionKind]bool{
    55  	// TODO(https://github.com/kubernetes/kubernetes/issues/79027)
    56  	gvk("apiregistration.k8s.io", "v1", "APIService"):     true,
    57  	gvk("apiregistration.k8s.io", "v1beta", "APIService"): true,
    58  
    59  	// TODO(https://github.com/kubernetes/kubernetes/issues/79026)
    60  	gvk("apiextensions.k8s.io", "v1beta1", "CustomResourceDefinition"): true,
    61  	gvk("apiextensions.k8s.io", "v1", "CustomResourceDefinition"):      true,
    62  
    63  	// Custom resources are not expected to have serialization fixtures in k8s.io/api
    64  	gvk("awesome.bears.com", "v1", "Panda"):    true,
    65  	gvk("cr.bar.com", "v1", "Foo"):             true,
    66  	gvk("random.numbers.com", "v1", "Integer"): true,
    67  	gvk("custom.fancy.com", "v2", "Pant"):      true,
    68  }
    69  
    70  // TestEtcdStoragePath tests to make sure that all objects are stored in an expected location in etcd.
    71  // It will start failing when a new type is added to ensure that all future types are added to this test.
    72  // It will also fail when a type gets moved to a different location. Be very careful in this situation because
    73  // it essentially means that you will be break old clusters unless you create some migration path for the old data.
    74  func TestEtcdStoragePath(t *testing.T) {
    75  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, "AllAlpha", true)()
    76  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, "AllBeta", true)()
    77  	apiServer := StartRealAPIServerOrDie(t, func(opts *options.ServerRunOptions) {
    78  	})
    79  	defer apiServer.Cleanup()
    80  	defer dumpEtcdKVOnFailure(t, apiServer.KV)
    81  
    82  	client := &allClient{dynamicClient: apiServer.Dynamic}
    83  
    84  	if _, err := apiServer.Client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}, metav1.CreateOptions{}); err != nil {
    85  		t.Fatal(err)
    86  	}
    87  
    88  	etcdStorageData := GetEtcdStorageData()
    89  
    90  	kindSeen := sets.NewString()
    91  	pathSeen := map[string][]schema.GroupVersionResource{}
    92  	etcdSeen := map[schema.GroupVersionResource]empty{}
    93  	cohabitatingResources := map[string]map[schema.GroupVersionKind]empty{}
    94  
    95  	for _, resourceToPersist := range apiServer.Resources {
    96  		t.Run(resourceToPersist.Mapping.Resource.String(), func(t *testing.T) {
    97  			mapping := resourceToPersist.Mapping
    98  			gvk := resourceToPersist.Mapping.GroupVersionKind
    99  			gvResource := resourceToPersist.Mapping.Resource
   100  			kind := gvk.Kind
   101  
   102  			if kindAllowList.Has(kind) {
   103  				kindSeen.Insert(kind)
   104  				t.Skip("allowlisted")
   105  			}
   106  
   107  			etcdSeen[gvResource] = empty{}
   108  			testData, hasTest := etcdStorageData[gvResource]
   109  
   110  			if !hasTest {
   111  				t.Fatalf("no test data for %s.  Please add a test for your new type to GetEtcdStorageData().", gvResource)
   112  			}
   113  
   114  			if len(testData.ExpectedEtcdPath) == 0 {
   115  				t.Fatalf("empty test data for %s", gvResource)
   116  			}
   117  
   118  			shouldCreate := len(testData.Stub) != 0 // try to create only if we have a stub
   119  
   120  			var (
   121  				input *metaObject
   122  				err   error
   123  			)
   124  			if shouldCreate {
   125  				if input, err = jsonToMetaObject([]byte(testData.Stub)); err != nil || input.isEmpty() {
   126  					t.Fatalf("invalid test data for %s: %v", gvResource, err)
   127  				}
   128  				// unset type meta fields - we only set these in the CRD test data and it makes
   129  				// any CRD test with an expectedGVK override fail the DeepDerivative test
   130  				input.Kind = ""
   131  				input.APIVersion = ""
   132  			}
   133  
   134  			all := &[]cleanupData{}
   135  			defer func() {
   136  				if !t.Failed() { // do not cleanup if test has already failed since we may need things in the etcd dump
   137  					if err := client.cleanup(all); err != nil {
   138  						t.Fatalf("failed to clean up etcd: %#v", err)
   139  					}
   140  				}
   141  			}()
   142  
   143  			if err := client.createPrerequisites(apiServer.Mapper, testNamespace, testData.Prerequisites, all); err != nil {
   144  				t.Fatalf("failed to create prerequisites for %s: %#v", gvResource, err)
   145  			}
   146  
   147  			if shouldCreate { // do not try to create items with no stub
   148  				if err := client.create(testData.Stub, testNamespace, mapping, all); err != nil {
   149  					t.Fatalf("failed to create stub for %s: %#v", gvResource, err)
   150  				}
   151  			}
   152  
   153  			output, err := getFromEtcd(apiServer.KV, testData.ExpectedEtcdPath)
   154  			if err != nil {
   155  				t.Fatalf("failed to get from etcd for %s: %#v", gvResource, err)
   156  			}
   157  
   158  			expectedGVK := gvk
   159  			if testData.ExpectedGVK != nil {
   160  				if gvk == *testData.ExpectedGVK {
   161  					t.Errorf("GVK override %s for %s is unnecessary or something was changed incorrectly", testData.ExpectedGVK, gvk)
   162  				}
   163  				expectedGVK = *testData.ExpectedGVK
   164  			}
   165  
   166  			// if previous releases had a non-alpha version of this group/kind, make sure the storage version is understood by a previous release
   167  			fixtureFilenameGroup := expectedGVK.Group
   168  			if fixtureFilenameGroup == "" {
   169  				fixtureFilenameGroup = "core"
   170  			}
   171  			// find all versions of this group/kind in all versions of the serialization fixture testdata
   172  			releaseGroupKindFiles, err := filepath.Glob("../../../staging/src/k8s.io/api/testdata/*/" + fixtureFilenameGroup + ".*." + expectedGVK.Kind + ".yaml")
   173  			if err != nil {
   174  				t.Error(err)
   175  			}
   176  			if len(releaseGroupKindFiles) == 0 && !allowMissingTestdataFixtures[expectedGVK] {
   177  				// We should at least find the HEAD fixtures
   178  				t.Errorf("No testdata serialization files found for %#v, cannot determine if previous releases could read this group/kind. Add this group-version to k8s.io/api/roundtrip_test.go", expectedGVK)
   179  			}
   180  
   181  			// find non-alpha versions of this group/kind understood by current and previous releases
   182  			currentNonAlphaVersions := sets.NewString()
   183  			previousNonAlphaVersions := sets.NewString()
   184  			for _, previousReleaseGroupKindFile := range releaseGroupKindFiles {
   185  				parts := strings.Split(filepath.Base(previousReleaseGroupKindFile), ".")
   186  				version := parts[len(parts)-3]
   187  				if !strings.Contains(version, "alpha") {
   188  					if serverVersion := filepath.Base(filepath.Dir(previousReleaseGroupKindFile)); serverVersion == "HEAD" {
   189  						currentNonAlphaVersions.Insert(version)
   190  					} else {
   191  						previousNonAlphaVersions.Insert(version)
   192  					}
   193  				}
   194  			}
   195  			if len(currentNonAlphaVersions) > 0 && strings.Contains(expectedGVK.Version, "alpha") {
   196  				t.Errorf("Non-alpha versions %q exist, but the expected storage version is %q. Prefer beta or GA storage versions over alpha.",
   197  					currentNonAlphaVersions.List(),
   198  					expectedGVK.Version,
   199  				)
   200  			}
   201  			if !strings.Contains(expectedGVK.Version, "alpha") && len(previousNonAlphaVersions) > 0 && !previousNonAlphaVersions.Has(expectedGVK.Version) {
   202  				t.Errorf("Previous releases understand non-alpha versions %q, but do not understand the expected current non-alpha storage version %q. "+
   203  					"This means a current server will store data in etcd that is not understood by a previous version.",
   204  					previousNonAlphaVersions.List(),
   205  					expectedGVK.Version,
   206  				)
   207  			}
   208  
   209  			actualGVK := output.getGVK()
   210  			if actualGVK != expectedGVK {
   211  				t.Errorf("GVK for %s does not match, expected %s got %s", kind, expectedGVK, actualGVK)
   212  			}
   213  
   214  			if !apiequality.Semantic.DeepDerivative(input, output) {
   215  				t.Errorf("Test stub for %s does not match: %s", kind, cmp.Diff(input, output))
   216  			}
   217  
   218  			addGVKToEtcdBucket(cohabitatingResources, actualGVK, getEtcdBucket(testData.ExpectedEtcdPath))
   219  			pathSeen[testData.ExpectedEtcdPath] = append(pathSeen[testData.ExpectedEtcdPath], mapping.Resource)
   220  		})
   221  	}
   222  
   223  	if inEtcdData, inEtcdSeen := diffMaps(etcdStorageData, etcdSeen); len(inEtcdData) != 0 || len(inEtcdSeen) != 0 {
   224  		t.Errorf("etcd data does not match the types we saw:\nin etcd data but not seen:\n%s\nseen but not in etcd data:\n%s", inEtcdData, inEtcdSeen)
   225  	}
   226  	if inKindData, inKindSeen := diffMaps(kindAllowList, kindSeen); len(inKindData) != 0 || len(inKindSeen) != 0 {
   227  		t.Errorf("kind allowlist data does not match the types we saw:\nin kind allowlist but not seen:\n%s\nseen but not in kind allowlist:\n%s", inKindData, inKindSeen)
   228  	}
   229  
   230  	for bucket, gvks := range cohabitatingResources {
   231  		if len(gvks) != 1 {
   232  			gvkStrings := []string{}
   233  			for key := range gvks {
   234  				gvkStrings = append(gvkStrings, keyStringer(key))
   235  			}
   236  			t.Errorf("cohabitating resources in etcd bucket %s have inconsistent GVKs\nyou may need to use DefaultStorageFactory.AddCohabitatingResources to sync the GVK of these resources:\n%s", bucket, gvkStrings)
   237  		}
   238  	}
   239  
   240  	for path, gvrs := range pathSeen {
   241  		if len(gvrs) != 1 {
   242  			gvrStrings := []string{}
   243  			for _, key := range gvrs {
   244  				gvrStrings = append(gvrStrings, keyStringer(key))
   245  			}
   246  			t.Errorf("invalid test data, please ensure all expectedEtcdPath are unique, path %s has duplicate GVRs:\n%s", path, gvrStrings)
   247  		}
   248  	}
   249  }
   250  
   251  var debug = false
   252  
   253  func dumpEtcdKVOnFailure(t *testing.T, kvClient clientv3.KV) {
   254  	if t.Failed() && debug {
   255  		response, err := kvClient.Get(context.Background(), "/", clientv3.WithPrefix())
   256  		if err != nil {
   257  			t.Fatal(err)
   258  		}
   259  
   260  		for _, kv := range response.Kvs {
   261  			t.Error(string(kv.Key), "->", string(kv.Value))
   262  		}
   263  	}
   264  }
   265  
   266  func addGVKToEtcdBucket(cohabitatingResources map[string]map[schema.GroupVersionKind]empty, gvk schema.GroupVersionKind, bucket string) {
   267  	if cohabitatingResources[bucket] == nil {
   268  		cohabitatingResources[bucket] = map[schema.GroupVersionKind]empty{}
   269  	}
   270  	cohabitatingResources[bucket][gvk] = empty{}
   271  }
   272  
   273  // getEtcdBucket assumes the last segment of the given etcd path is the name of the object.
   274  // Thus it strips that segment to extract the object's storage "bucket" in etcd. We expect
   275  // all objects that share the a bucket (cohabitating resources) to be stored as the same GVK.
   276  func getEtcdBucket(path string) string {
   277  	idx := strings.LastIndex(path, "/")
   278  	if idx == -1 {
   279  		panic("path with no slashes " + path)
   280  	}
   281  	bucket := path[:idx]
   282  	if len(bucket) == 0 {
   283  		panic("invalid bucket for path " + path)
   284  	}
   285  	return bucket
   286  }
   287  
   288  // stable fields to compare as a sanity check
   289  type metaObject struct {
   290  	// all of type meta
   291  	Kind       string `json:"kind,omitempty"`
   292  	APIVersion string `json:"apiVersion,omitempty"`
   293  
   294  	// parts of object meta
   295  	Metadata struct {
   296  		Name      string `json:"name,omitempty"`
   297  		Namespace string `json:"namespace,omitempty"`
   298  	} `json:"metadata,omitempty"`
   299  }
   300  
   301  func (obj *metaObject) getGVK() schema.GroupVersionKind {
   302  	return schema.FromAPIVersionAndKind(obj.APIVersion, obj.Kind)
   303  }
   304  
   305  func (obj *metaObject) isEmpty() bool {
   306  	return obj == nil || *obj == metaObject{} // compare to zero value since all fields are strings
   307  }
   308  
   309  type empty struct{}
   310  
   311  type cleanupData struct {
   312  	obj      *unstructured.Unstructured
   313  	resource schema.GroupVersionResource
   314  }
   315  
   316  func jsonToMetaObject(stub []byte) (*metaObject, error) {
   317  	obj := &metaObject{}
   318  	if err := json.Unmarshal(stub, obj); err != nil {
   319  		return nil, err
   320  	}
   321  	return obj, nil
   322  }
   323  
   324  func keyStringer(i interface{}) string {
   325  	base := "\n\t"
   326  	switch key := i.(type) {
   327  	case string:
   328  		return base + key
   329  	case schema.GroupVersionResource:
   330  		return base + key.String()
   331  	case schema.GroupVersionKind:
   332  		return base + key.String()
   333  	default:
   334  		panic("unexpected type")
   335  	}
   336  }
   337  
   338  type allClient struct {
   339  	dynamicClient dynamic.Interface
   340  }
   341  
   342  func (c *allClient) create(stub, ns string, mapping *meta.RESTMapping, all *[]cleanupData) error {
   343  	resourceClient, obj, err := JSONToUnstructured(stub, ns, mapping, c.dynamicClient)
   344  	if err != nil {
   345  		return err
   346  	}
   347  
   348  	actual, err := resourceClient.Create(context.TODO(), obj, metav1.CreateOptions{})
   349  	if err != nil {
   350  		return err
   351  	}
   352  
   353  	*all = append(*all, cleanupData{obj: actual, resource: mapping.Resource})
   354  
   355  	return nil
   356  }
   357  
   358  func (c *allClient) cleanup(all *[]cleanupData) error {
   359  	for i := len(*all) - 1; i >= 0; i-- { // delete in reverse order in case creation order mattered
   360  		obj := (*all)[i].obj
   361  		gvr := (*all)[i].resource
   362  
   363  		if err := c.dynamicClient.Resource(gvr).Namespace(obj.GetNamespace()).Delete(context.TODO(), obj.GetName(), metav1.DeleteOptions{}); err != nil {
   364  			return err
   365  		}
   366  	}
   367  	return nil
   368  }
   369  
   370  func (c *allClient) createPrerequisites(mapper meta.RESTMapper, ns string, prerequisites []Prerequisite, all *[]cleanupData) error {
   371  	for _, prerequisite := range prerequisites {
   372  		gvk, err := mapper.KindFor(prerequisite.GvrData)
   373  		if err != nil {
   374  			return err
   375  		}
   376  		mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
   377  		if err != nil {
   378  			return err
   379  		}
   380  		if err := c.create(prerequisite.Stub, ns, mapping, all); err != nil {
   381  			return err
   382  		}
   383  	}
   384  	return nil
   385  }
   386  
   387  func getFromEtcd(keys clientv3.KV, path string) (*metaObject, error) {
   388  	response, err := keys.Get(context.Background(), path)
   389  	if err != nil {
   390  		return nil, err
   391  	}
   392  	if response.More || response.Count != 1 || len(response.Kvs) != 1 {
   393  		return nil, fmt.Errorf("Invalid etcd response (not found == %v): %#v", response.Count == 0, response)
   394  	}
   395  	return jsonToMetaObject(response.Kvs[0].Value)
   396  }
   397  
   398  func diffMaps(a, b interface{}) ([]string, []string) {
   399  	inA := diffMapKeys(a, b, keyStringer)
   400  	inB := diffMapKeys(b, a, keyStringer)
   401  	return inA, inB
   402  }
   403  
   404  func diffMapKeys(a, b interface{}, stringer func(interface{}) string) []string {
   405  	av := reflect.ValueOf(a)
   406  	bv := reflect.ValueOf(b)
   407  	ret := []string{}
   408  
   409  	for _, ka := range av.MapKeys() {
   410  		kat := ka.Interface()
   411  		found := false
   412  		for _, kb := range bv.MapKeys() {
   413  			kbt := kb.Interface()
   414  			if kat == kbt {
   415  				found = true
   416  				break
   417  			}
   418  		}
   419  		if !found {
   420  			ret = append(ret, stringer(kat))
   421  		}
   422  	}
   423  
   424  	return ret
   425  }
   426  

View as plain text