...

Source file src/k8s.io/apiextensions-apiserver/test/integration/change_test.go

Documentation: k8s.io/apiextensions-apiserver/test/integration

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package integration
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  	"testing"
    24  	"time"
    25  
    26  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    27  	"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
    28  	"k8s.io/apiextensions-apiserver/test/integration/fixtures"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	"k8s.io/apimachinery/pkg/watch"
    33  	"k8s.io/client-go/dynamic"
    34  )
    35  
    36  func TestChangeCRD(t *testing.T) {
    37  	tearDown, config, _, err := fixtures.StartDefaultServer(t)
    38  	if err != nil {
    39  		t.Fatal(err)
    40  	}
    41  	defer tearDown()
    42  	config.QPS = 1000
    43  	config.Burst = 1000
    44  	apiExtensionsClient, err := clientset.NewForConfig(config)
    45  	if err != nil {
    46  		t.Fatal(err)
    47  	}
    48  	dynamicClient, err := dynamic.NewForConfig(config)
    49  	if err != nil {
    50  		t.Fatal(err)
    51  	}
    52  
    53  	noxuDefinition := fixtures.NewNoxuV1CustomResourceDefinition(apiextensionsv1.NamespaceScoped)
    54  	noxuDefinition, err = fixtures.CreateNewV1CustomResourceDefinition(noxuDefinition, apiExtensionsClient, dynamicClient)
    55  	if err != nil {
    56  		t.Fatal(err)
    57  	}
    58  
    59  	ns := "default"
    60  	noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1")
    61  
    62  	stopChan := make(chan struct{})
    63  
    64  	wg := &sync.WaitGroup{}
    65  
    66  	// Set up loop to modify CRD in the background
    67  	wg.Add(1)
    68  	go func() {
    69  		defer wg.Done()
    70  		for {
    71  			select {
    72  			case <-stopChan:
    73  				return
    74  			default:
    75  			}
    76  
    77  			time.Sleep(10 * time.Millisecond)
    78  
    79  			noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{})
    80  			if err != nil {
    81  				t.Error(err)
    82  				continue
    83  			}
    84  			if len(noxuDefinitionToUpdate.Spec.Versions) == 1 {
    85  				v2 := noxuDefinitionToUpdate.Spec.Versions[0]
    86  				v2.Name = "v2"
    87  				v2.Served = true
    88  				v2.Storage = false
    89  				noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2)
    90  			} else {
    91  				noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1]
    92  			}
    93  			if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) {
    94  				t.Error(err)
    95  				continue
    96  			}
    97  		}
    98  	}()
    99  
   100  	// Set up 10 loops creating and reading and watching custom resources
   101  	for i := 0; i < 10; i++ {
   102  		wg.Add(1)
   103  		go func(i int) {
   104  			defer wg.Done()
   105  			noxuInstanceToCreate := fixtures.NewNoxuInstance(ns, fmt.Sprintf("foo-%d", i))
   106  			if _, err := noxuNamespacedResourceClient.Create(context.TODO(), noxuInstanceToCreate, metav1.CreateOptions{}); err != nil {
   107  				t.Error(err)
   108  				return
   109  			}
   110  			for {
   111  				time.Sleep(10 * time.Millisecond)
   112  				select {
   113  				case <-stopChan:
   114  					return
   115  				default:
   116  					if _, err := noxuNamespacedResourceClient.Get(context.TODO(), noxuInstanceToCreate.GetName(), metav1.GetOptions{}); err != nil {
   117  						t.Error(err)
   118  						continue
   119  					}
   120  				}
   121  			}
   122  		}(i)
   123  
   124  		wg.Add(1)
   125  		go func(i int) {
   126  			defer wg.Done()
   127  			for {
   128  				time.Sleep(10 * time.Millisecond)
   129  				select {
   130  				case <-stopChan:
   131  					return
   132  				default:
   133  					w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{})
   134  					if err != nil {
   135  						t.Errorf("unexpected error establishing watch: %v", err)
   136  						continue
   137  					}
   138  					for event := range w.ResultChan() {
   139  						switch event.Type {
   140  						case watch.Added, watch.Modified, watch.Deleted:
   141  							// all expected
   142  						default:
   143  							t.Errorf("unexpected watch event: %#v", event)
   144  						}
   145  					}
   146  				}
   147  			}
   148  		}(i)
   149  	}
   150  
   151  	// Let all the established get request loops soak
   152  	time.Sleep(5 * time.Second)
   153  
   154  	// Tear down
   155  	close(stopChan)
   156  
   157  	// Let loops drain
   158  	drained := make(chan struct{})
   159  	go func() {
   160  		defer close(drained)
   161  		wg.Wait()
   162  	}()
   163  
   164  	select {
   165  	case <-drained:
   166  	case <-time.After(wait.ForeverTestTimeout):
   167  		t.Error("timed out waiting for clients to complete")
   168  	}
   169  }
   170  

View as plain text