1 package cmd
2
3 import (
4 "bytes"
5 "errors"
6 "fmt"
7 "os"
8 "path"
9 "strings"
10
11 "github.com/linkerd/linkerd2/multicluster/static"
12 multicluster "github.com/linkerd/linkerd2/multicluster/values"
13 "github.com/linkerd/linkerd2/pkg/charts"
14 partials "github.com/linkerd/linkerd2/pkg/charts/static"
15 pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
16 "github.com/linkerd/linkerd2/pkg/flags"
17 "github.com/linkerd/linkerd2/pkg/k8s"
18 mc "github.com/linkerd/linkerd2/pkg/multicluster"
19 "github.com/linkerd/linkerd2/pkg/version"
20 log "github.com/sirupsen/logrus"
21 "github.com/spf13/cobra"
22 chartloader "helm.sh/helm/v3/pkg/chart/loader"
23 "helm.sh/helm/v3/pkg/chartutil"
24 valuespkg "helm.sh/helm/v3/pkg/cli/values"
25 "helm.sh/helm/v3/pkg/engine"
26 corev1 "k8s.io/api/core/v1"
27 kerrors "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/client-go/tools/clientcmd"
30 "k8s.io/client-go/tools/clientcmd/api"
31 "sigs.k8s.io/yaml"
32 )
33
34 const (
35 clusterNameLabel = "multicluster.linkerd.io/cluster-name"
36 trustDomainAnnotation = "multicluster.linkerd.io/trust-domain"
37 clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain"
38 )
39
40 type (
41 linkOptions struct {
42 namespace string
43 clusterName string
44 apiServerAddress string
45 serviceAccountName string
46 gatewayName string
47 gatewayNamespace string
48 serviceMirrorRetryLimit uint32
49 logLevel string
50 logFormat string
51 controlPlaneVersion string
52 dockerRegistry string
53 selector string
54 remoteDiscoverySelector string
55 gatewayAddresses string
56 gatewayPort uint32
57 ha bool
58 enableGateway bool
59 }
60 )
61
62 func newLinkCommand() *cobra.Command {
63 opts, err := newLinkOptionsWithDefault()
64
65
66
67 if registry := os.Getenv(flags.EnvOverrideDockerRegistry); registry != "" {
68 opts.dockerRegistry = registry
69 }
70
71 var valuesOptions valuespkg.Options
72
73 if err != nil {
74 fmt.Fprintln(os.Stderr, err)
75 os.Exit(1)
76 }
77
78 cmd := &cobra.Command{
79 Use: "link",
80 Short: "Outputs resources that allow another cluster to mirror services from this one",
81 Long: `Outputs resources that allow another cluster to mirror services from this one.
82
83 Note that the Link resource applies only in one direction. In order for two
84 clusters to mirror each other, a Link resource will have to be generated for
85 each cluster and applied to the other.`,
86 Args: cobra.NoArgs,
87 Example: ` # To link the west cluster to east
88 linkerd --context=east multicluster link --cluster-name east | kubectl --context=west apply -f -
89
90 The command can be configured by using the --set, --values, --set-string and --set-file flags.
91 A full list of configurable values can be found at https://github.com/linkerd/linkerd2/blob/main/multicluster/charts/linkerd-multicluster-link/README.md
92 `,
93 RunE: func(cmd *cobra.Command, args []string) error {
94
95 if opts.clusterName == "" {
96 return errors.New("You need to specify cluster name")
97 }
98
99 configMap, err := getLinkerdConfigMap(cmd.Context())
100 if err != nil {
101 if kerrors.IsNotFound(err) {
102 return errors.New("you need Linkerd to be installed on a cluster in order to get its credentials")
103 }
104 return err
105 }
106
107 rules := clientcmd.NewDefaultClientConfigLoadingRules()
108 rules.ExplicitPath = kubeconfigPath
109 loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, &clientcmd.ConfigOverrides{})
110 config, err := loader.RawConfig()
111 if err != nil {
112 return err
113 }
114
115 if kubeContext != "" {
116 config.CurrentContext = kubeContext
117 }
118
119 k, err := k8s.NewAPI(kubeconfigPath, config.CurrentContext, impersonate, impersonateGroup, 0)
120 if err != nil {
121 return err
122 }
123
124 sa, err := k.CoreV1().ServiceAccounts(opts.namespace).Get(cmd.Context(), opts.serviceAccountName, metav1.GetOptions{})
125 if err != nil {
126 return err
127 }
128
129 listOpts := metav1.ListOptions{
130 FieldSelector: fmt.Sprintf("type=%s", corev1.SecretTypeServiceAccountToken),
131 }
132 secrets, err := k.CoreV1().Secrets(opts.namespace).List(cmd.Context(), listOpts)
133 if err != nil {
134 return err
135 }
136
137 token, err := extractSAToken(secrets.Items, sa.Name)
138 if err != nil {
139 return err
140 }
141
142 context, ok := config.Contexts[config.CurrentContext]
143 if !ok {
144 return fmt.Errorf("could not extract current context from config")
145 }
146
147 context.AuthInfo = opts.serviceAccountName
148 config.Contexts = map[string]*api.Context{
149 config.CurrentContext: context,
150 }
151 config.AuthInfos = map[string]*api.AuthInfo{
152 opts.serviceAccountName: {
153 Token: token,
154 },
155 }
156
157 cluster := config.Clusters[context.Cluster]
158
159 if opts.apiServerAddress != "" {
160 cluster.Server = opts.apiServerAddress
161 }
162
163 config.Clusters = map[string]*api.Cluster{
164 context.Cluster: cluster,
165 }
166
167 kubeconfig, err := clientcmd.Write(config)
168 if err != nil {
169 return err
170 }
171
172 creds := corev1.Secret{
173 Type: k8s.MirrorSecretType,
174 TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
175 ObjectMeta: metav1.ObjectMeta{
176 Name: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
177 Namespace: opts.namespace,
178 },
179 Data: map[string][]byte{
180 k8s.ConfigKeyName: kubeconfig,
181 },
182 }
183
184 credsOut, err := yaml.Marshal(creds)
185 if err != nil {
186 return err
187 }
188
189 destinationCreds := corev1.Secret{
190 Type: k8s.MirrorSecretType,
191 TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
192 ObjectMeta: metav1.ObjectMeta{
193 Name: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
194 Namespace: controlPlaneNamespace,
195 Labels: map[string]string{
196 clusterNameLabel: opts.clusterName,
197 },
198 Annotations: map[string]string{
199 trustDomainAnnotation: configMap.IdentityTrustDomain,
200 clusterDomainAnnotation: configMap.ClusterDomain,
201 },
202 },
203 Data: map[string][]byte{
204 k8s.ConfigKeyName: kubeconfig,
205 },
206 }
207 destinationCredsOut, err := yaml.Marshal(destinationCreds)
208 if err != nil {
209 return err
210 }
211
212 remoteDiscoverySelector, err := metav1.ParseToLabelSelector(opts.remoteDiscoverySelector)
213 if err != nil {
214 return err
215 }
216
217 link := mc.Link{
218 Name: opts.clusterName,
219 Namespace: opts.namespace,
220 TargetClusterName: opts.clusterName,
221 TargetClusterDomain: configMap.ClusterDomain,
222 TargetClusterLinkerdNamespace: controlPlaneNamespace,
223 ClusterCredentialsSecret: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
224 RemoteDiscoverySelector: *remoteDiscoverySelector,
225 }
226
227
228
229 if opts.enableGateway {
230 gateway, err := k.CoreV1().Services(opts.gatewayNamespace).Get(cmd.Context(), opts.gatewayName, metav1.GetOptions{})
231 if err != nil {
232 return err
233 }
234
235 gwAddresses := []string{}
236 for _, ingress := range gateway.Status.LoadBalancer.Ingress {
237 addr := ingress.IP
238 if addr == "" {
239 addr = ingress.Hostname
240 }
241 if addr == "" {
242 continue
243 }
244 gwAddresses = append(gwAddresses, addr)
245 }
246
247 if opts.gatewayAddresses != "" {
248 link.GatewayAddress = opts.gatewayAddresses
249 } else if len(gwAddresses) > 0 {
250 link.GatewayAddress = strings.Join(gwAddresses, ",")
251 } else {
252 return fmt.Errorf("Gateway %s.%s has no ingress addresses", gateway.Name, gateway.Namespace)
253 }
254
255 gatewayIdentity, ok := gateway.Annotations[k8s.GatewayIdentity]
256 if !ok || gatewayIdentity == "" {
257 return fmt.Errorf("Gateway %s.%s has no %s annotation", gateway.Name, gateway.Namespace, k8s.GatewayIdentity)
258 }
259 link.GatewayIdentity = gatewayIdentity
260
261 probeSpec, err := mc.ExtractProbeSpec(gateway)
262 if err != nil {
263 return err
264 }
265 link.ProbeSpec = probeSpec
266
267 gatewayPort, err := extractGatewayPort(gateway)
268 if err != nil {
269 return err
270 }
271
272
273 if opts.gatewayPort != 0 {
274 gatewayPort = opts.gatewayPort
275 }
276 link.GatewayPort = gatewayPort
277
278 selector, err := metav1.ParseToLabelSelector(opts.selector)
279 if err != nil {
280 return err
281 }
282
283 link.Selector = *selector
284 }
285
286 obj, err := link.ToUnstructured()
287 if err != nil {
288 return err
289 }
290 linkOut, err := yaml.Marshal(obj.Object)
291 if err != nil {
292 return err
293 }
294
295 values, err := buildServiceMirrorValues(opts)
296 if err != nil {
297 return err
298 }
299
300
301 valuesOverrides, err := valuesOptions.MergeValues(nil)
302 if err != nil {
303 return err
304 }
305
306 if opts.ha {
307 if valuesOverrides, err = charts.OverrideFromFile(
308 valuesOverrides,
309 static.Templates,
310 helmMulticlusterLinkDefaultChartName,
311 "values-ha.yaml",
312 ); err != nil {
313 return err
314 }
315 }
316 serviceMirrorOut, err := renderServiceMirror(values, valuesOverrides, opts.namespace)
317 if err != nil {
318 return err
319 }
320
321 stdout.Write(credsOut)
322 stdout.Write([]byte("---\n"))
323 stdout.Write(destinationCredsOut)
324 stdout.Write([]byte("---\n"))
325 stdout.Write(linkOut)
326 stdout.Write([]byte("---\n"))
327 stdout.Write(serviceMirrorOut)
328 stdout.Write([]byte("---\n"))
329
330 return nil
331 },
332 }
333
334 flags.AddValueOptionsFlags(cmd.Flags(), &valuesOptions)
335 cmd.Flags().StringVar(&opts.namespace, "namespace", defaultMulticlusterNamespace, "The namespace for the service account")
336 cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "Cluster name")
337 cmd.Flags().StringVar(&opts.apiServerAddress, "api-server-address", "", "The api server address of the target cluster")
338 cmd.Flags().StringVar(&opts.serviceAccountName, "service-account-name", defaultServiceAccountName, "The name of the service account associated with the credentials")
339 cmd.Flags().StringVar(&opts.controlPlaneVersion, "control-plane-version", opts.controlPlaneVersion, "(Development) Tag to be used for the service mirror controller image")
340 cmd.Flags().StringVar(&opts.gatewayName, "gateway-name", defaultGatewayName, "The name of the gateway service")
341 cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", defaultMulticlusterNamespace, "The namespace of the gateway service")
342 cmd.Flags().Uint32Var(&opts.serviceMirrorRetryLimit, "service-mirror-retry-limit", opts.serviceMirrorRetryLimit, "The number of times a failed update from the target cluster is allowed to be retried")
343 cmd.Flags().StringVar(&opts.logLevel, "log-level", opts.logLevel, "Log level for the Multicluster components")
344 cmd.Flags().StringVar(&opts.logFormat, "log-format", opts.logFormat, "Log format for the Multicluster components")
345 cmd.Flags().StringVar(&opts.dockerRegistry, "registry", opts.dockerRegistry,
346 fmt.Sprintf("Docker registry to pull service mirror controller image from ($%s)", flags.EnvOverrideDockerRegistry))
347 cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror")
348 cmd.Flags().StringVar(&opts.remoteDiscoverySelector, "remote-discovery-selector", opts.remoteDiscoverySelector, "Selector (label query) to filter which services in the target cluster to mirror in remote discovery mode")
349 cmd.Flags().StringVar(&opts.gatewayAddresses, "gateway-addresses", opts.gatewayAddresses, "If specified, overwrites gateway addresses when gateway service is not type LoadBalancer (comma separated list)")
350 cmd.Flags().Uint32Var(&opts.gatewayPort, "gateway-port", opts.gatewayPort, "If specified, overwrites gateway port when gateway service is not type LoadBalancer")
351 cmd.Flags().BoolVar(&opts.ha, "ha", opts.ha, "Enable HA configuration for the service-mirror deployment (default false)")
352 cmd.Flags().BoolVar(&opts.enableGateway, "gateway", opts.enableGateway, "If false, allows a link to be created against a cluster that does not have a gateway service")
353
354 pkgcmd.ConfigureNamespaceFlagCompletion(
355 cmd, []string{"namespace", "gateway-namespace"},
356 kubeconfigPath, impersonate, impersonateGroup, kubeContext)
357 return cmd
358 }
359
360 func renderServiceMirror(values *multicluster.Values, valuesOverrides map[string]interface{}, namespace string) ([]byte, error) {
361 files := []*chartloader.BufferedFile{
362 {Name: chartutil.ChartfileName},
363 {Name: "templates/service-mirror.yaml"},
364 {Name: "templates/psp.yaml"},
365 {Name: "templates/gateway-mirror.yaml"},
366 }
367
368 var partialFiles []*chartloader.BufferedFile
369 for _, template := range charts.L5dPartials {
370 partialFiles = append(partialFiles,
371 &chartloader.BufferedFile{Name: template},
372 )
373 }
374
375
376 if err := charts.FilesReader(static.Templates, helmMulticlusterLinkDefaultChartName+"/", files); err != nil {
377 return nil, err
378 }
379
380
381 if err := charts.FilesReader(partials.Templates, "", partialFiles); err != nil {
382 return nil, err
383 }
384
385
386 chart, err := chartloader.LoadFiles(append(files, partialFiles...))
387 if err != nil {
388 return nil, err
389 }
390
391
392 rawValues, err := yaml.Marshal(values)
393 if err != nil {
394 return nil, err
395 }
396
397
398 err = yaml.Unmarshal(rawValues, &chart.Values)
399 if err != nil {
400 return nil, err
401 }
402
403 vals, err := chartutil.CoalesceValues(chart, valuesOverrides)
404 if err != nil {
405 return nil, err
406 }
407
408 fullValues := map[string]interface{}{
409 "Values": vals,
410 "Release": map[string]interface{}{
411 "Namespace": namespace,
412 "Service": "CLI",
413 },
414 }
415
416
417 renderedTemplates, err := engine.Render(chart, fullValues)
418 if err != nil {
419 return nil, err
420 }
421
422
423 var out bytes.Buffer
424 for _, tmpl := range chart.Templates {
425 t := path.Join(chart.Metadata.Name, tmpl.Name)
426 if _, err := out.WriteString(renderedTemplates[t]); err != nil {
427 return nil, err
428 }
429 }
430
431 return out.Bytes(), nil
432 }
433
434 func newLinkOptionsWithDefault() (*linkOptions, error) {
435 defaults, err := multicluster.NewLinkValues()
436 if err != nil {
437 return nil, err
438 }
439
440 return &linkOptions{
441 controlPlaneVersion: version.Version,
442 namespace: defaultMulticlusterNamespace,
443 dockerRegistry: pkgcmd.DefaultDockerRegistry,
444 serviceMirrorRetryLimit: defaults.ServiceMirrorRetryLimit,
445 logLevel: defaults.LogLevel,
446 logFormat: defaults.LogFormat,
447 selector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "true"),
448 remoteDiscoverySelector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "remote-discovery"),
449 gatewayAddresses: "",
450 gatewayPort: 0,
451 ha: false,
452 enableGateway: true,
453 }, nil
454 }
455
456 func buildServiceMirrorValues(opts *linkOptions) (*multicluster.Values, error) {
457
458 if !alphaNumDashDot.MatchString(opts.controlPlaneVersion) {
459 return nil, fmt.Errorf("%s is not a valid version", opts.controlPlaneVersion)
460 }
461
462 if opts.namespace == "" {
463 return nil, errors.New("you need to specify a namespace")
464 }
465
466 if _, err := log.ParseLevel(opts.logLevel); err != nil {
467 return nil, fmt.Errorf("--log-level must be one of: panic, fatal, error, warn, info, debug, trace")
468 }
469
470 if opts.logFormat != "plain" && opts.logFormat != "json" {
471 return nil, fmt.Errorf("--log-format must be one of: plain, json")
472 }
473
474 if opts.selector != "" && opts.selector != fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "true") {
475 if !opts.enableGateway {
476 return nil, fmt.Errorf("--selector and --gateway=false are mutually exclusive")
477 }
478 }
479
480 if opts.gatewayAddresses != "" && !opts.enableGateway {
481 return nil, fmt.Errorf("--gateway-addresses and --gateway=false are mutually exclusive")
482 }
483
484 if opts.gatewayPort != 0 && !opts.enableGateway {
485 return nil, fmt.Errorf("--gateway-port and --gateway=false are mutually exclusive")
486 }
487
488 defaults, err := multicluster.NewLinkValues()
489 if err != nil {
490 return nil, err
491 }
492
493 defaults.Gateway.Enabled = opts.enableGateway
494 defaults.TargetClusterName = opts.clusterName
495 defaults.ServiceMirrorRetryLimit = opts.serviceMirrorRetryLimit
496 defaults.LogLevel = opts.logLevel
497 defaults.LogFormat = opts.logFormat
498 defaults.ControllerImageVersion = opts.controlPlaneVersion
499 defaults.ControllerImage = fmt.Sprintf("%s/controller", opts.dockerRegistry)
500
501 return defaults, nil
502 }
503
504 func extractGatewayPort(gateway *corev1.Service) (uint32, error) {
505 for _, port := range gateway.Spec.Ports {
506 if port.Name == k8s.GatewayPortName {
507 if gateway.Spec.Type == "NodePort" {
508 return uint32(port.NodePort), nil
509 }
510 return uint32(port.Port), nil
511 }
512 }
513 return 0, fmt.Errorf("gateway service %s has no gateway port named %s", gateway.Name, k8s.GatewayPortName)
514 }
515
516 func extractSAToken(secrets []corev1.Secret, saName string) (string, error) {
517 for _, secret := range secrets {
518 boundSA := secret.Annotations[saNameAnnotationKey]
519 if saName == boundSA {
520 token, ok := secret.Data[tokenKey]
521 if !ok {
522 return "", fmt.Errorf("could not find the token data in service account secret %s", secret.Name)
523 }
524
525 return string(token), nil
526 }
527 }
528
529 return "", fmt.Errorf("could not find service account token secret for %s", saName)
530 }
531
View as plain text