...

Source file src/k8s.io/apiextensions-apiserver/test/integration/storage/objectreader.go

Documentation: k8s.io/apiextensions-apiserver/test/integration/storage

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package storage
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"path"
    24  	"time"
    25  
    26  	"go.etcd.io/etcd/client/pkg/v3/transport"
    27  	clientv3 "go.etcd.io/etcd/client/v3"
    28  	"google.golang.org/grpc"
    29  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    30  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    31  	"k8s.io/apiserver/pkg/registry/generic"
    32  	"k8s.io/apiserver/pkg/storage/storagebackend"
    33  )
    34  
    35  // EtcdObjectReader provides direct access to custom resource objects stored in etcd.
    36  type EtcdObjectReader struct {
    37  	etcdClient    *clientv3.Client
    38  	storagePrefix string
    39  	crd           *apiextensionsv1.CustomResourceDefinition
    40  }
    41  
    42  // NewEtcdObjectReader creates a reader for accessing custom resource objects directly from etcd.
    43  func NewEtcdObjectReader(etcdClient *clientv3.Client, restOptions *generic.RESTOptions, crd *apiextensionsv1.CustomResourceDefinition) *EtcdObjectReader {
    44  	return &EtcdObjectReader{etcdClient, restOptions.StorageConfig.Prefix, crd}
    45  }
    46  
    47  // WaitForStorageVersion calls the updateObjFn periodically and waits for the version of the custom resource stored in etcd to be set to the provided version.
    48  // Typically updateObjFn should perform a noop update to the object so that when stored version of a CRD changes, the object is written at the updated storage version.
    49  // If the timeout is exceeded a error is returned.
    50  // This is useful when updating the stored version of an existing CRD because the update does not take effect immediately.
    51  func (s *EtcdObjectReader) WaitForStorageVersion(version string, ns, name string, timeout time.Duration, updateObjFn func()) error {
    52  	waitCh := time.After(timeout)
    53  	for {
    54  		storage, err := s.GetStoredCustomResource(ns, name)
    55  		if err != nil {
    56  			return err
    57  		}
    58  		if storage.GetObjectKind().GroupVersionKind().Version == version {
    59  			return nil
    60  		}
    61  		select {
    62  		case <-waitCh:
    63  			return fmt.Errorf("timed out after %v waiting for storage version to be %s for object (namespace:%s name:%s)", timeout, version, ns, name)
    64  		case <-time.After(10 * time.Millisecond):
    65  			updateObjFn()
    66  		}
    67  	}
    68  }
    69  
    70  // GetStoredCustomResource gets the storage representation of a custom resource from etcd.
    71  func (s *EtcdObjectReader) GetStoredCustomResource(ns, name string) (*unstructured.Unstructured, error) {
    72  	key := path.Join("/", s.storagePrefix, s.crd.Spec.Group, s.crd.Spec.Names.Plural, ns, name)
    73  	resp, err := s.etcdClient.KV.Get(context.Background(), key)
    74  	if err != nil {
    75  		return nil, fmt.Errorf("error getting storage object %s, %s from etcd at key %s: %v", ns, name, key, err)
    76  	}
    77  	if len(resp.Kvs) == 0 {
    78  		return nil, fmt.Errorf("no storage object found for %s, %s in etcd for key %s", ns, name, key)
    79  	}
    80  	raw := resp.Kvs[0].Value
    81  	u := &unstructured.Unstructured{Object: map[string]interface{}{}}
    82  	if err := json.Unmarshal(raw, u); err != nil {
    83  		return nil, fmt.Errorf("error deserializing object %s: %v", string(raw), err)
    84  	}
    85  	return u, nil
    86  }
    87  
    88  // SetStoredCustomResource writes the storage representation of a custom resource to etcd.
    89  func (s *EtcdObjectReader) SetStoredCustomResource(ns, name string, obj *unstructured.Unstructured) error {
    90  	bs, err := obj.MarshalJSON()
    91  	if err != nil {
    92  		return err
    93  	}
    94  
    95  	key := path.Join("/", s.storagePrefix, s.crd.Spec.Group, s.crd.Spec.Names.Plural, ns, name)
    96  	if _, err := s.etcdClient.KV.Put(context.Background(), key, string(bs)); err != nil {
    97  		return fmt.Errorf("error setting storage object %s, %s from etcd at key %s: %v", ns, name, key, err)
    98  	}
    99  	return nil
   100  }
   101  
   102  // GetEtcdClients returns an initialized  clientv3.Client and clientv3.KV.
   103  func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, clientv3.KV, error) {
   104  	tlsInfo := transport.TLSInfo{
   105  		CertFile:      config.CertFile,
   106  		KeyFile:       config.KeyFile,
   107  		TrustedCAFile: config.TrustedCAFile,
   108  	}
   109  
   110  	tlsConfig, err := tlsInfo.ClientConfig()
   111  	if err != nil {
   112  		return nil, nil, err
   113  	}
   114  
   115  	cfg := clientv3.Config{
   116  		Endpoints:   config.ServerList,
   117  		DialTimeout: 20 * time.Second,
   118  		DialOptions: []grpc.DialOption{
   119  			grpc.WithBlock(), // block until the underlying connection is up
   120  		},
   121  		TLS: tlsConfig,
   122  	}
   123  
   124  	c, err := clientv3.New(cfg)
   125  	if err != nil {
   126  		return nil, nil, err
   127  	}
   128  
   129  	return c, clientv3.NewKV(c), nil
   130  }
   131  

View as plain text