1 package watcher
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8
9 "github.com/linkerd/linkerd2/controller/k8s"
10 pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
11 "github.com/prometheus/client_golang/prometheus"
12 "github.com/prometheus/client_golang/prometheus/promauto"
13 logging "github.com/sirupsen/logrus"
14 v1 "k8s.io/api/core/v1"
15 "k8s.io/client-go/kubernetes"
16 "k8s.io/client-go/tools/cache"
17 "k8s.io/client-go/tools/clientcmd"
18 )
19
20 type (
21
22
23
24
25
26 ClusterStore struct {
27
28 sync.RWMutex
29
30 api *k8s.API
31 store map[string]remoteCluster
32 enableEndpointSlices bool
33 log *logging.Entry
34
35
36
37 decodeFn configDecoder
38
39 size_gauge prometheus.GaugeFunc
40 }
41
42
43 remoteCluster struct {
44 watcher *EndpointsWatcher
45 config clusterConfig
46
47
48 stopCh chan<- struct{}
49 }
50
51
52 clusterConfig struct {
53 TrustDomain string
54 ClusterDomain string
55 }
56
57
58
59
60 configDecoder = func(data []byte, cluster string, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error)
61 )
62
63 const (
64 clusterNameLabel = "multicluster.linkerd.io/cluster-name"
65 trustDomainAnnotation = "multicluster.linkerd.io/trust-domain"
66 clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain"
67 )
68
69
70
71
72
73
74
75 func NewClusterStore(client kubernetes.Interface, namespace string, enableEndpointSlices bool) (*ClusterStore, error) {
76 return NewClusterStoreWithDecoder(client, namespace, enableEndpointSlices, decodeK8sConfigFromSecret)
77 }
78
79 func (cs *ClusterStore) Sync(stopCh <-chan struct{}) {
80 cs.api.Sync(stopCh)
81 }
82
83 func (cs *ClusterStore) UnregisterGauges() {
84 prometheus.Unregister(cs.size_gauge)
85 }
86
87
88
89 func NewClusterStoreWithDecoder(client kubernetes.Interface, namespace string, enableEndpointSlices bool, decodeFn configDecoder) (*ClusterStore, error) {
90 api := k8s.NewNamespacedAPI(client, nil, nil, namespace, "local", k8s.Secret)
91
92 cs := &ClusterStore{
93 store: make(map[string]remoteCluster),
94 log: logging.WithFields(logging.Fields{
95 "component": "cluster-store",
96 }),
97 enableEndpointSlices: enableEndpointSlices,
98 api: api,
99 decodeFn: decodeFn,
100 }
101
102 cs.size_gauge = promauto.NewGaugeFunc(prometheus.GaugeOpts{
103 Name: "cluster_store_size",
104 Help: "The number of linked clusters in the remote discovery cluster store",
105 }, func() float64 { return (float64)(len(cs.store)) })
106
107 _, err := cs.api.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
108 AddFunc: func(obj interface{}) {
109 secret, ok := obj.(*v1.Secret)
110 if !ok {
111 cs.log.Errorf("Error processing 'Secret' object: got %#v, expected *corev1.Secret", secret)
112 return
113 }
114
115 if secret.Type != pkgK8s.MirrorSecretType {
116 cs.log.Tracef("Skipping Add event for 'Secret' object %s/%s: invalid type %s", secret.Namespace, secret.Name, secret.Type)
117 return
118 }
119
120 clusterName, found := secret.GetLabels()[clusterNameLabel]
121 if !found {
122 cs.log.Tracef("Skipping Add event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel)
123 return
124 }
125
126 if err := cs.addCluster(clusterName, secret); err != nil {
127 cs.log.Errorf("Error adding cluster %s to store: %v", clusterName, err)
128 }
129 },
130 DeleteFunc: func(obj interface{}) {
131 secret, ok := obj.(*v1.Secret)
132 if !ok {
133
134
135
136
137 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
138 if !ok {
139 cs.log.Debugf("Unable to get object from DeletedFinalStateUnknown %#v", obj)
140 return
141 }
142
143 secret, ok = tombstone.Obj.(*v1.Secret)
144 if !ok {
145 cs.log.Debugf("DeletedFinalStateUnknown contained object that is not a Secret %#v", obj)
146 return
147 }
148 }
149
150 clusterName, found := secret.GetLabels()[clusterNameLabel]
151 if !found {
152 cs.log.Tracef("Skipping Delete event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel)
153 return
154 }
155
156 cs.removeCluster(clusterName)
157
158 },
159 })
160
161 if err != nil {
162 return nil, err
163 }
164
165 return cs, nil
166 }
167
168
169 func (cs *ClusterStore) Get(clusterName string) (*EndpointsWatcher, clusterConfig, bool) {
170 cs.RLock()
171 defer cs.RUnlock()
172 cw, found := cs.store[clusterName]
173 return cw.watcher, cw.config, found
174 }
175
176
177
178
179 func (cs *ClusterStore) removeCluster(clusterName string) {
180 cs.Lock()
181 defer cs.Unlock()
182 r, found := cs.store[clusterName]
183 if !found {
184 return
185 }
186 r.watcher.removeHandlers()
187 r.watcher.k8sAPI.UnregisterGauges()
188 r.watcher.metadataAPI.UnregisterGauges()
189 close(r.stopCh)
190 delete(cs.store, clusterName)
191 cs.log.Infof("Removed cluster %s from ClusterStore", clusterName)
192 }
193
194
195
196
197
198 func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error {
199 data, found := secret.Data[pkgK8s.ConfigKeyName]
200 if !found {
201 return errors.New("missing kubeconfig file")
202 }
203
204 clusterDomain, found := secret.GetAnnotations()[clusterDomainAnnotation]
205 if !found {
206 return fmt.Errorf("missing \"%s\" annotation", clusterDomainAnnotation)
207 }
208
209 trustDomain, found := secret.GetAnnotations()[trustDomainAnnotation]
210 if !found {
211 return fmt.Errorf("missing \"%s\" annotation", trustDomainAnnotation)
212 }
213
214 remoteAPI, metadataAPI, err := cs.decodeFn(data, clusterName, cs.enableEndpointSlices)
215 if err != nil {
216 return err
217 }
218
219 stopCh := make(chan struct{}, 1)
220 watcher, err := NewEndpointsWatcher(
221 remoteAPI,
222 metadataAPI,
223 logging.WithFields(logging.Fields{
224 "remote-cluster": clusterName,
225 }),
226 cs.enableEndpointSlices,
227 clusterName,
228 )
229 if err != nil {
230 return err
231 }
232
233 cs.Lock()
234 defer cs.Unlock()
235 cs.store[clusterName] = remoteCluster{
236 watcher,
237 clusterConfig{
238 trustDomain,
239 clusterDomain,
240 },
241 stopCh,
242 }
243
244 go remoteAPI.Sync(stopCh)
245 go metadataAPI.Sync(stopCh)
246
247 cs.log.Infof("Added cluster %s to ClusterStore", clusterName)
248
249 return nil
250 }
251
252
253
254
255 func decodeK8sConfigFromSecret(data []byte, cluster string, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) {
256 cfg, err := clientcmd.RESTConfigFromKubeConfig(data)
257 if err != nil {
258 return nil, nil, err
259 }
260
261 ctx := context.Background()
262 var remoteAPI *k8s.API
263 if enableEndpointSlices {
264 remoteAPI, err = k8s.InitializeAPIForConfig(
265 ctx,
266 cfg,
267 true,
268 cluster,
269 k8s.ES, k8s.Pod, k8s.Svc, k8s.Srv,
270 )
271 } else {
272 remoteAPI, err = k8s.InitializeAPIForConfig(
273 ctx,
274 cfg,
275 true,
276 cluster,
277 k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.Srv,
278 )
279 }
280 if err != nil {
281 return nil, nil, err
282 }
283
284 metadataAPI, err := k8s.InitializeMetadataAPIForConfig(cfg, cluster, k8s.RS)
285 if err != nil {
286 return nil, nil, err
287 }
288
289 return remoteAPI, metadataAPI, nil
290 }
291
View as plain text