1
16
17 package openapi
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "net/http"
24 "testing"
25 "time"
26
27 "github.com/stretchr/testify/require"
28 apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/util/wait"
31 "k8s.io/client-go/discovery"
32 "k8s.io/client-go/dynamic"
33 kubernetes "k8s.io/client-go/kubernetes"
34 apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
35 aggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
36 "k8s.io/kube-openapi/pkg/validation/spec"
37 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
38 testdiscovery "k8s.io/kubernetes/test/integration/apiserver/discovery"
39 "k8s.io/kubernetes/test/integration/framework"
40 )
41
42 func TestSlowAPIServiceOpenAPIDoesNotBlockHealthCheck(t *testing.T) {
43 ctx, cancelCtx := context.WithCancel(context.Background())
44 defer cancelCtx()
45
46 etcd := framework.SharedEtcd()
47 setupServer := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, etcd)
48 client := generateTestClient(t, setupServer)
49
50 service := testdiscovery.NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
51 if r.URL.Path != "/openapi/v2" {
52 return
53 }
54
55 <-ctx.Done()
56 openapi := &spec.Swagger{
57 SwaggerProps: spec.SwaggerProps{
58 Paths: &spec.Paths{
59 Paths: map[string]spec.PathItem{
60 "/apis/wardle.example.com/v1alpha1": {},
61 },
62 },
63 },
64 }
65 data, err := openapi.MarshalJSON()
66 if err != nil {
67 t.Error(err)
68 }
69 http.ServeContent(w, r, "/openapi/v2", time.Now(), bytes.NewReader(data))
70 }))
71 go func() {
72 require.NoError(t, service.Run(ctx))
73 }()
74 require.NoError(t, service.WaitForReady(ctx))
75
76 groupVersion := metav1.GroupVersion{
77 Group: "wardle.example.com",
78 Version: "v1alpha1",
79 }
80
81 require.NoError(t, registerAPIService(ctx, client, groupVersion, service))
82
83 setupServer.TearDownFn()
84 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, etcd)
85 t.Cleanup(server.TearDownFn)
86 client2 := generateTestClient(t, server)
87
88 err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 1*time.Second, true, func(context.Context) (bool, error) {
89 var statusCode int
90 client2.AdmissionregistrationV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&statusCode)
91 if statusCode == 200 {
92 return true, nil
93 }
94 return false, nil
95 })
96 require.NoError(t, err)
97 }
98
99 func TestFetchingOpenAPIBeforeReady(t *testing.T) {
100 ctx, cancelCtx := context.WithCancel(context.Background())
101 defer cancelCtx()
102
103 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
104 t.Cleanup(server.TearDownFn)
105 client := generateTestClient(t, server)
106
107 readyCh := make(chan bool)
108 defer close(readyCh)
109 go func() {
110 select {
111 case <-readyCh:
112 default:
113 _, _ = client.Discovery().RESTClient().Get().AbsPath("/openapi/v2").Do(context.TODO()).Raw()
114 }
115 }()
116
117 service := testdiscovery.NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
118 openapi := &spec.Swagger{
119 SwaggerProps: spec.SwaggerProps{
120 Paths: &spec.Paths{
121 Paths: map[string]spec.PathItem{
122 "/apis/wardle.example.com/v1alpha1/": {},
123 },
124 },
125 },
126 }
127 data, err := openapi.MarshalJSON()
128 if err != nil {
129 t.Error(err)
130 }
131 http.ServeContent(w, r, "/openapi/v2", time.Now(), bytes.NewReader(data))
132 }))
133 go func() {
134 require.NoError(t, service.Run(ctx))
135 }()
136 require.NoError(t, service.WaitForReady(ctx))
137
138 groupVersion := metav1.GroupVersion{
139 Group: "wardle.example.com",
140 Version: "v1alpha1",
141 }
142
143 require.NoError(t, registerAPIService(ctx, client, groupVersion, service))
144 defer func() {
145 require.NoError(t, unregisterAPIService(ctx, client, groupVersion))
146 }()
147
148 err := wait.PollUntilContextTimeout(context.Background(), time.Millisecond*10, time.Second, true, func(context.Context) (bool, error) {
149 b, err := client.Discovery().RESTClient().Get().AbsPath("/openapi/v2").Do(context.TODO()).Raw()
150 require.NoError(t, err)
151 var openapi spec.Swagger
152 require.NoError(t, openapi.UnmarshalJSON(b))
153 if _, ok := openapi.Paths.Paths["/apis/wardle.example.com/v1alpha1/"]; ok {
154 return true, nil
155 }
156 return false, nil
157 })
158 require.NoError(t, err)
159
160 }
161
162
163
164 type kubeClientSet = kubernetes.Interface
165
166 type aggegatorClientSet = aggregator.Interface
167
168 type apiextensionsClientSet = apiextensions.Interface
169
170 type dynamicClientset = dynamic.Interface
171
172 type testClientSet struct {
173 kubeClientSet
174 aggegatorClientSet
175 apiextensionsClientSet
176 dynamicClientset
177 }
178
179 type testClient interface {
180 kubernetes.Interface
181 aggregator.Interface
182 apiextensions.Interface
183 dynamic.Interface
184 }
185
186 var _ testClient = testClientSet{}
187
188 func (t testClientSet) Discovery() discovery.DiscoveryInterface {
189 return t.kubeClientSet.Discovery()
190 }
191
192 func generateTestClient(t *testing.T, server *kubeapiservertesting.TestServer) testClient {
193 kubeClientSet, err := kubernetes.NewForConfig(server.ClientConfig)
194 require.NoError(t, err)
195
196 aggegatorClientSet, err := aggregator.NewForConfig(server.ClientConfig)
197 require.NoError(t, err)
198
199 apiextensionsClientSet, err := apiextensions.NewForConfig(server.ClientConfig)
200 require.NoError(t, err)
201
202 dynamicClientset, err := dynamic.NewForConfig(server.ClientConfig)
203 require.NoError(t, err)
204
205 client := testClientSet{
206 kubeClientSet: kubeClientSet,
207 aggegatorClientSet: aggegatorClientSet,
208 apiextensionsClientSet: apiextensionsClientSet,
209 dynamicClientset: dynamicClientset,
210 }
211 return client
212 }
213
214 func registerAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion, service testdiscovery.FakeService) error {
215 port := service.Port()
216 if port == nil {
217 return errors.New("service not yet started")
218 }
219
220 patch := apiregistrationv1.APIService{
221 ObjectMeta: metav1.ObjectMeta{
222 Name: gv.Version + "." + gv.Group,
223 },
224 TypeMeta: metav1.TypeMeta{
225 Kind: "APIService",
226 APIVersion: "apiregistration.k8s.io/v1",
227 },
228 Spec: apiregistrationv1.APIServiceSpec{
229 Group: gv.Group,
230 Version: gv.Version,
231 InsecureSkipTLSVerify: true,
232 GroupPriorityMinimum: 1000,
233 VersionPriority: 15,
234 Service: &apiregistrationv1.ServiceReference{
235 Namespace: "default",
236 Name: service.Name(),
237 Port: port,
238 },
239 },
240 }
241
242 _, err := client.
243 ApiregistrationV1().
244 APIServices().
245 Create(context.TODO(), &patch, metav1.CreateOptions{FieldManager: "test-manager"})
246 return err
247 }
248
249 func unregisterAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion) error {
250 return client.ApiregistrationV1().APIServices().Delete(ctx, gv.Version+"."+gv.Group, metav1.DeleteOptions{})
251 }
252
View as plain text