1 package servicemirror
2
3 import (
4 "context"
5 "flag"
6 "fmt"
7 "os"
8 "os/signal"
9 "syscall"
10 "time"
11
12 controllerK8s "github.com/linkerd/linkerd2/controller/k8s"
13 servicemirror "github.com/linkerd/linkerd2/multicluster/service-mirror"
14 "github.com/linkerd/linkerd2/pkg/admin"
15 "github.com/linkerd/linkerd2/pkg/flags"
16 "github.com/linkerd/linkerd2/pkg/k8s"
17 "github.com/linkerd/linkerd2/pkg/multicluster"
18 sm "github.com/linkerd/linkerd2/pkg/servicemirror"
19 log "github.com/sirupsen/logrus"
20 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21 dynamic "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
22 "k8s.io/apimachinery/pkg/watch"
23 "k8s.io/client-go/tools/clientcmd"
24 "k8s.io/client-go/tools/leaderelection"
25 "k8s.io/client-go/tools/leaderelection/resourcelock"
26 )
27
28 const (
29 linkWatchRestartAfter = 10 * time.Second
30
31 LEASE_DURATION = 30 * time.Second
32
33
34 LEASE_RENEW_DEADLINE = 10 * time.Second
35
36
37 LEASE_RETRY_PERIOD = 2 * time.Second
38 )
39
40 var (
41 clusterWatcher *servicemirror.RemoteClusterServiceWatcher
42 probeWorker *servicemirror.ProbeWorker
43 )
44
45
46 func Main(args []string) {
47 cmd := flag.NewFlagSet("service-mirror", flag.ExitOnError)
48
49 kubeConfigPath := cmd.String("kubeconfig", "", "path to the local kube config")
50 requeueLimit := cmd.Int("event-requeue-limit", 3, "requeue limit for events")
51 metricsAddr := cmd.String("metrics-addr", ":9999", "address to serve scrapable metrics on")
52 namespace := cmd.String("namespace", "", "namespace containing Link and credentials Secret")
53 repairPeriod := cmd.Duration("endpoint-refresh-period", 1*time.Minute, "frequency to refresh endpoint resolution")
54 enableHeadlessSvc := cmd.Bool("enable-headless-services", false, "toggle support for headless service mirroring")
55 enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server")
56
57 flags.ConfigureAndParse(cmd, args)
58 linkName := cmd.Arg(0)
59
60 ready := false
61 adminServer := admin.NewServer(*metricsAddr, *enablePprof, &ready)
62
63 go func() {
64 log.Infof("starting admin server on %s", *metricsAddr)
65 if err := adminServer.ListenAndServe(); err != nil {
66 log.Errorf("failed to start service mirror admin server: %s", err)
67 }
68 }()
69
70 rootCtx, cancel := context.WithCancel(context.Background())
71
72 stop := make(chan os.Signal, 1)
73 signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
74 go func() {
75 <-stop
76 log.Info("Received shutdown signal")
77
78
79 cancel()
80 }()
81
82
83
84
85
86
87
88 k8sAPI, err := k8s.NewAPI(*kubeConfigPath, "", "", []string{}, 0)
89
90 if err != nil {
91 log.Fatalf("Failed to initialize K8s API: %s", err)
92 }
93 controllerK8sAPI, err := controllerK8s.InitializeAPI(
94 rootCtx,
95 *kubeConfigPath,
96 false,
97 "local",
98 controllerK8s.NS,
99 controllerK8s.Svc,
100 controllerK8s.Endpoint,
101 )
102 if err != nil {
103 log.Fatalf("Failed to initialize K8s API: %s", err)
104 }
105
106 linkClient := k8sAPI.DynamicClient.Resource(multicluster.LinkGVR).Namespace(*namespace)
107 metrics := servicemirror.NewProbeMetricVecs()
108 controllerK8sAPI.Sync(nil)
109
110 ready = true
111 run := func(ctx context.Context) {
112 main:
113 for {
114
115 linkWatch, err := linkClient.Watch(ctx, metav1.ListOptions{})
116 if err != nil {
117 log.Fatalf("Failed to watch Link %s: %s", linkName, err)
118 }
119 results := linkWatch.ResultChan()
120
121
122
123 for {
124 select {
125
126
127
128 case <-ctx.Done():
129
130
131
132
133 cleanupWorkers()
134 return
135 case event, ok := <-results:
136 if !ok {
137 log.Info("Link watch terminated; restarting watch")
138 continue main
139 }
140 switch obj := event.Object.(type) {
141 case *dynamic.Unstructured:
142 if obj.GetName() == linkName {
143 switch event.Type {
144 case watch.Added, watch.Modified:
145 link, err := multicluster.NewLink(*obj)
146 if err != nil {
147 log.Errorf("Failed to parse link %s: %s", linkName, err)
148 continue
149 }
150 log.Infof("Got updated link %s: %+v", linkName, link)
151 creds, err := loadCredentials(ctx, link, *namespace, k8sAPI)
152 if err != nil {
153 log.Errorf("Failed to load remote cluster credentials: %s", err)
154 }
155 err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc)
156 if err != nil {
157
158
159 log.Error(err)
160 time.Sleep(linkWatchRestartAfter)
161 linkWatch.Stop()
162 }
163 case watch.Deleted:
164 log.Infof("Link %s deleted", linkName)
165 cleanupWorkers()
166 default:
167 log.Infof("Ignoring event type %s", event.Type)
168 }
169 }
170 default:
171 log.Errorf("Unknown object type detected: %+v", obj)
172 }
173 }
174 }
175 }
176 }
177
178 hostname, found := os.LookupEnv("HOSTNAME")
179 if !found {
180 log.Fatal("Failed to fetch 'HOSTNAME' environment variable")
181 }
182
183 lock := &resourcelock.LeaseLock{
184 LeaseMeta: metav1.ObjectMeta{
185 Name: fmt.Sprintf("service-mirror-write-%s", linkName),
186 Namespace: *namespace,
187 },
188 Client: k8sAPI.CoordinationV1(),
189 LockConfig: resourcelock.ResourceLockConfig{
190 Identity: hostname,
191 },
192 }
193
194 election:
195 for {
196
197
198
199
200
201
202
203
204 leaderelection.RunOrDie(rootCtx, leaderelection.LeaderElectionConfig{
205
206
207 ReleaseOnCancel: true,
208 Lock: lock,
209 LeaseDuration: LEASE_DURATION,
210 RenewDeadline: LEASE_RENEW_DEADLINE,
211 RetryPeriod: LEASE_RETRY_PERIOD,
212 Callbacks: leaderelection.LeaderCallbacks{
213 OnStartedLeading: func(ctx context.Context) {
214
215
216
217
218 log.Info("Starting controller loop")
219 run(ctx)
220 },
221 OnStoppedLeading: func() {
222 log.Infof("%s released lease", hostname)
223 },
224 OnNewLeader: func(identity string) {
225 if identity == hostname {
226 log.Infof("%s acquired lease", hostname)
227 }
228 },
229 },
230 })
231
232 select {
233
234
235
236 case <-rootCtx.Done():
237 break election
238
239 default:
240
241 }
242 }
243 log.Info("Shutting down")
244 }
245
246
247
248
249
250 func cleanupWorkers() {
251 if clusterWatcher != nil {
252
253
254 clusterWatcher.Stop(false)
255 clusterWatcher = nil
256 }
257
258 if probeWorker != nil {
259 probeWorker.Stop()
260 probeWorker = nil
261 }
262 }
263
264 func loadCredentials(ctx context.Context, link multicluster.Link, namespace string, k8sAPI *k8s.KubernetesAPI) ([]byte, error) {
265
266 secret, err := k8sAPI.Interface.CoreV1().Secrets(namespace).Get(ctx, link.ClusterCredentialsSecret, metav1.GetOptions{})
267 if err != nil {
268 return nil, fmt.Errorf("failed to load credentials secret %s: %w", link.ClusterCredentialsSecret, err)
269 }
270 return sm.ParseRemoteClusterSecret(secret)
271 }
272
273 func restartClusterWatcher(
274 ctx context.Context,
275 link multicluster.Link,
276 namespace string,
277 creds []byte,
278 controllerK8sAPI *controllerK8s.API,
279 requeueLimit int,
280 repairPeriod time.Duration,
281 metrics servicemirror.ProbeMetricVecs,
282 enableHeadlessSvc bool,
283 ) error {
284
285 cleanupWorkers()
286
287 workerMetrics, err := metrics.NewWorkerMetrics(link.TargetClusterName)
288 if err != nil {
289 return fmt.Errorf("failed to create metrics for cluster watcher: %w", err)
290 }
291
292
293
294 var ch chan bool
295 if link.ProbeSpec.Path != "" {
296 probeWorker = servicemirror.NewProbeWorker(fmt.Sprintf("probe-gateway-%s", link.TargetClusterName), &link.ProbeSpec, workerMetrics, link.TargetClusterName)
297 probeWorker.Start()
298 ch = probeWorker.Liveness
299 }
300
301
302 cfg, err := clientcmd.RESTConfigFromKubeConfig(creds)
303 if err != nil {
304 return fmt.Errorf("unable to parse kube config: %w", err)
305 }
306 cw, err := servicemirror.NewRemoteClusterServiceWatcher(
307 ctx,
308 namespace,
309 controllerK8sAPI,
310 cfg,
311 &link,
312 requeueLimit,
313 repairPeriod,
314 ch,
315 enableHeadlessSvc,
316 )
317 if err != nil {
318 return fmt.Errorf("unable to create cluster watcher: %w", err)
319 }
320 clusterWatcher = cw
321 err = clusterWatcher.Start(ctx)
322 if err != nil {
323 return fmt.Errorf("failed to start cluster watcher: %w", err)
324 }
325
326 return nil
327 }
328
View as plain text