1
16
17 package storage
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "path"
24 "time"
25
26 "go.etcd.io/etcd/client/pkg/v3/transport"
27 clientv3 "go.etcd.io/etcd/client/v3"
28 "google.golang.org/grpc"
29 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
30 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
31 "k8s.io/apiserver/pkg/registry/generic"
32 "k8s.io/apiserver/pkg/storage/storagebackend"
33 )
34
35
36 type EtcdObjectReader struct {
37 etcdClient *clientv3.Client
38 storagePrefix string
39 crd *apiextensionsv1.CustomResourceDefinition
40 }
41
42
43 func NewEtcdObjectReader(etcdClient *clientv3.Client, restOptions *generic.RESTOptions, crd *apiextensionsv1.CustomResourceDefinition) *EtcdObjectReader {
44 return &EtcdObjectReader{etcdClient, restOptions.StorageConfig.Prefix, crd}
45 }
46
47
48
49
50
51 func (s *EtcdObjectReader) WaitForStorageVersion(version string, ns, name string, timeout time.Duration, updateObjFn func()) error {
52 waitCh := time.After(timeout)
53 for {
54 storage, err := s.GetStoredCustomResource(ns, name)
55 if err != nil {
56 return err
57 }
58 if storage.GetObjectKind().GroupVersionKind().Version == version {
59 return nil
60 }
61 select {
62 case <-waitCh:
63 return fmt.Errorf("timed out after %v waiting for storage version to be %s for object (namespace:%s name:%s)", timeout, version, ns, name)
64 case <-time.After(10 * time.Millisecond):
65 updateObjFn()
66 }
67 }
68 }
69
70
71 func (s *EtcdObjectReader) GetStoredCustomResource(ns, name string) (*unstructured.Unstructured, error) {
72 key := path.Join("/", s.storagePrefix, s.crd.Spec.Group, s.crd.Spec.Names.Plural, ns, name)
73 resp, err := s.etcdClient.KV.Get(context.Background(), key)
74 if err != nil {
75 return nil, fmt.Errorf("error getting storage object %s, %s from etcd at key %s: %v", ns, name, key, err)
76 }
77 if len(resp.Kvs) == 0 {
78 return nil, fmt.Errorf("no storage object found for %s, %s in etcd for key %s", ns, name, key)
79 }
80 raw := resp.Kvs[0].Value
81 u := &unstructured.Unstructured{Object: map[string]interface{}{}}
82 if err := json.Unmarshal(raw, u); err != nil {
83 return nil, fmt.Errorf("error deserializing object %s: %v", string(raw), err)
84 }
85 return u, nil
86 }
87
88
89 func (s *EtcdObjectReader) SetStoredCustomResource(ns, name string, obj *unstructured.Unstructured) error {
90 bs, err := obj.MarshalJSON()
91 if err != nil {
92 return err
93 }
94
95 key := path.Join("/", s.storagePrefix, s.crd.Spec.Group, s.crd.Spec.Names.Plural, ns, name)
96 if _, err := s.etcdClient.KV.Put(context.Background(), key, string(bs)); err != nil {
97 return fmt.Errorf("error setting storage object %s, %s from etcd at key %s: %v", ns, name, key, err)
98 }
99 return nil
100 }
101
102
103 func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, clientv3.KV, error) {
104 tlsInfo := transport.TLSInfo{
105 CertFile: config.CertFile,
106 KeyFile: config.KeyFile,
107 TrustedCAFile: config.TrustedCAFile,
108 }
109
110 tlsConfig, err := tlsInfo.ClientConfig()
111 if err != nil {
112 return nil, nil, err
113 }
114
115 cfg := clientv3.Config{
116 Endpoints: config.ServerList,
117 DialTimeout: 20 * time.Second,
118 DialOptions: []grpc.DialOption{
119 grpc.WithBlock(),
120 },
121 TLS: tlsConfig,
122 }
123
124 c, err := clientv3.New(cfg)
125 if err != nil {
126 return nil, nil, err
127 }
128
129 return c, clientv3.NewKV(c), nil
130 }
131
View as plain text