1
16
17 package etcd
18
19 import (
20 "context"
21 "encoding/json"
22 "testing"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/api/meta"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
29 "k8s.io/apimachinery/pkg/runtime/schema"
30 "k8s.io/apimachinery/pkg/util/sets"
31 "k8s.io/apimachinery/pkg/watch"
32 "k8s.io/apiserver/pkg/storage"
33 "k8s.io/client-go/dynamic"
34 "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
35 )
36
37
38 func TestCrossGroupStorage(t *testing.T) {
39 apiServer := StartRealAPIServerOrDie(t, func(opts *options.ServerRunOptions) {
40
41 })
42 defer apiServer.Cleanup()
43
44 etcdStorageData := GetEtcdStorageData()
45
46 crossGroupResources := map[schema.GroupVersionKind][]Resource{}
47
48 apiServer.Client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}, metav1.CreateOptions{})
49
50
51 for _, resourceToPersist := range apiServer.Resources {
52 gvk := resourceToPersist.Mapping.GroupVersionKind
53 data, exists := etcdStorageData[resourceToPersist.Mapping.Resource]
54 if !exists {
55 continue
56 }
57 storageGVK := gvk
58 if data.ExpectedGVK != nil {
59 storageGVK = *data.ExpectedGVK
60 }
61 crossGroupResources[storageGVK] = append(crossGroupResources[storageGVK], resourceToPersist)
62 }
63
64
65 for gvk, resources := range crossGroupResources {
66 groups := sets.NewString()
67 for _, resource := range resources {
68 groups.Insert(resource.Mapping.GroupVersionKind.Group)
69 }
70 if len(groups) < 2 {
71 delete(crossGroupResources, gvk)
72 }
73 }
74
75 if len(crossGroupResources) == 0 {
76
77 t.Fatal("no cross-group resources found")
78 }
79
80
81 for gvk, resources := range crossGroupResources {
82 t.Run(gvk.String(), func(t *testing.T) {
83
84 resource := resources[0]
85
86
87 ns := ""
88 if resource.Mapping.Scope.Name() == meta.RESTScopeNameNamespace {
89 ns = testNamespace
90 }
91
92 data := etcdStorageData[resource.Mapping.Resource]
93
94 resourceClient, obj, err := JSONToUnstructured(data.Stub, ns, resource.Mapping, apiServer.Dynamic)
95 if err != nil {
96 t.Fatal(err)
97 }
98 actual, err := resourceClient.Create(context.TODO(), obj, metav1.CreateOptions{})
99 if err != nil {
100 t.Fatal(err)
101 }
102 name := actual.GetName()
103
104
105 var (
106 clients = map[schema.GroupVersionResource]dynamic.ResourceInterface{}
107 versionedData = map[schema.GroupVersionResource]*unstructured.Unstructured{}
108 watches = map[schema.GroupVersionResource]watch.Interface{}
109 )
110 for _, resource := range resources {
111 clients[resource.Mapping.Resource] = apiServer.Dynamic.Resource(resource.Mapping.Resource).Namespace(ns)
112 versionedData[resource.Mapping.Resource], err = clients[resource.Mapping.Resource].Get(context.TODO(), name, metav1.GetOptions{})
113 if err != nil {
114 t.Fatalf("error finding resource via %s: %v", resource.Mapping.Resource.GroupVersion().String(), err)
115 }
116 watches[resource.Mapping.Resource], err = clients[resource.Mapping.Resource].Watch(context.TODO(), metav1.ListOptions{ResourceVersion: actual.GetResourceVersion()})
117 if err != nil {
118 t.Fatalf("error opening watch via %s: %v", resource.Mapping.Resource.GroupVersion().String(), err)
119 }
120 }
121
122 versioner := storage.APIObjectVersioner{}
123 for _, resource := range resources {
124
125 versioned := versionedData[resource.Mapping.Resource]
126 if err := versioner.PrepareObjectForStorage(versioned); err != nil {
127 t.Error(err)
128 continue
129 }
130 versionedJSON, err := versioned.MarshalJSON()
131 if err != nil {
132 t.Error(err)
133 continue
134 }
135
136
137 if _, err := apiServer.KV.Put(context.Background(), data.ExpectedEtcdPath, string(versionedJSON)); err != nil {
138 t.Error(err)
139 continue
140 }
141 t.Logf("wrote %s to etcd", resource.Mapping.Resource.GroupVersion().String())
142
143
144 for watchResource, watcher := range watches {
145 select {
146 case event, ok := <-watcher.ResultChan():
147 if !ok {
148 t.Fatalf("watch of %s closed in response to persisting %s", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String())
149 }
150 if event.Type != watch.Modified {
151 eventJSON, _ := json.Marshal(event)
152 t.Errorf("unexpected watch event sent to watch of %s in response to persisting %s: %s", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String(), string(eventJSON))
153 continue
154 }
155 if event.Object.GetObjectKind().GroupVersionKind().GroupVersion() != watchResource.GroupVersion() {
156 t.Errorf("unexpected group version object sent to watch of %s in response to persisting %s: %#v", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String(), event.Object)
157 continue
158 }
159 t.Logf(" received event for %s", watchResource.GroupVersion().String())
160 case <-time.After(30 * time.Second):
161 t.Errorf("timed out waiting for watch event for %s in response to persisting %s", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String())
162 continue
163 }
164 }
165
166
167 for clientResource, client := range clients {
168 obj, err := client.Get(context.TODO(), name, metav1.GetOptions{})
169 if err != nil {
170 t.Errorf("error looking up %s after persisting %s", clientResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String())
171 continue
172 }
173 if obj.GetObjectKind().GroupVersionKind().GroupVersion() != clientResource.GroupVersion() {
174 t.Errorf("unexpected group version retrieved from %s after persisting %s: %#v", clientResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String(), obj)
175 continue
176 }
177 t.Logf(" fetched object for %s", clientResource.GroupVersion().String())
178 }
179 }
180 })
181 }
182 }
183
View as plain text