...

Source file src/github.com/linkerd/linkerd2/multicluster/cmd/link.go

Documentation: github.com/linkerd/linkerd2/multicluster/cmd

     1  package cmd
     2  
     3  import (
     4  	"bytes"
     5  	"errors"
     6  	"fmt"
     7  	"os"
     8  	"path"
     9  	"strings"
    10  
    11  	"github.com/linkerd/linkerd2/multicluster/static"
    12  	multicluster "github.com/linkerd/linkerd2/multicluster/values"
    13  	"github.com/linkerd/linkerd2/pkg/charts"
    14  	partials "github.com/linkerd/linkerd2/pkg/charts/static"
    15  	pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
    16  	"github.com/linkerd/linkerd2/pkg/flags"
    17  	"github.com/linkerd/linkerd2/pkg/k8s"
    18  	mc "github.com/linkerd/linkerd2/pkg/multicluster"
    19  	"github.com/linkerd/linkerd2/pkg/version"
    20  	log "github.com/sirupsen/logrus"
    21  	"github.com/spf13/cobra"
    22  	chartloader "helm.sh/helm/v3/pkg/chart/loader"
    23  	"helm.sh/helm/v3/pkg/chartutil"
    24  	valuespkg "helm.sh/helm/v3/pkg/cli/values"
    25  	"helm.sh/helm/v3/pkg/engine"
    26  	corev1 "k8s.io/api/core/v1"
    27  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/client-go/tools/clientcmd"
    30  	"k8s.io/client-go/tools/clientcmd/api"
    31  	"sigs.k8s.io/yaml"
    32  )
    33  
    34  const (
    35  	clusterNameLabel        = "multicluster.linkerd.io/cluster-name"
    36  	trustDomainAnnotation   = "multicluster.linkerd.io/trust-domain"
    37  	clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain"
    38  )
    39  
    40  type (
    41  	linkOptions struct {
    42  		namespace               string
    43  		clusterName             string
    44  		apiServerAddress        string
    45  		serviceAccountName      string
    46  		gatewayName             string
    47  		gatewayNamespace        string
    48  		serviceMirrorRetryLimit uint32
    49  		logLevel                string
    50  		logFormat               string
    51  		controlPlaneVersion     string
    52  		dockerRegistry          string
    53  		selector                string
    54  		remoteDiscoverySelector string
    55  		gatewayAddresses        string
    56  		gatewayPort             uint32
    57  		ha                      bool
    58  		enableGateway           bool
    59  	}
    60  )
    61  
    62  func newLinkCommand() *cobra.Command {
    63  	opts, err := newLinkOptionsWithDefault()
    64  
    65  	// Override the default value with env registry path.
    66  	// If cli cmd contains --registry flag, it will override env variable.
    67  	if registry := os.Getenv(flags.EnvOverrideDockerRegistry); registry != "" {
    68  		opts.dockerRegistry = registry
    69  	}
    70  
    71  	var valuesOptions valuespkg.Options
    72  
    73  	if err != nil {
    74  		fmt.Fprintln(os.Stderr, err)
    75  		os.Exit(1)
    76  	}
    77  
    78  	cmd := &cobra.Command{
    79  		Use:   "link",
    80  		Short: "Outputs resources that allow another cluster to mirror services from this one",
    81  		Long: `Outputs resources that allow another cluster to mirror services from this one.
    82  
    83  Note that the Link resource applies only in one direction. In order for two
    84  clusters to mirror each other, a Link resource will have to be generated for
    85  each cluster and applied to the other.`,
    86  		Args: cobra.NoArgs,
    87  		Example: `  # To link the west cluster to east
    88    linkerd --context=east multicluster link --cluster-name east | kubectl --context=west apply -f -
    89  
    90  The command can be configured by using the --set, --values, --set-string and --set-file flags.
    91  A full list of configurable values can be found at https://github.com/linkerd/linkerd2/blob/main/multicluster/charts/linkerd-multicluster-link/README.md
    92    `,
    93  		RunE: func(cmd *cobra.Command, args []string) error {
    94  
    95  			if opts.clusterName == "" {
    96  				return errors.New("You need to specify cluster name")
    97  			}
    98  
    99  			configMap, err := getLinkerdConfigMap(cmd.Context())
   100  			if err != nil {
   101  				if kerrors.IsNotFound(err) {
   102  					return errors.New("you need Linkerd to be installed on a cluster in order to get its credentials")
   103  				}
   104  				return err
   105  			}
   106  
   107  			rules := clientcmd.NewDefaultClientConfigLoadingRules()
   108  			rules.ExplicitPath = kubeconfigPath
   109  			loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, &clientcmd.ConfigOverrides{})
   110  			config, err := loader.RawConfig()
   111  			if err != nil {
   112  				return err
   113  			}
   114  
   115  			if kubeContext != "" {
   116  				config.CurrentContext = kubeContext
   117  			}
   118  
   119  			k, err := k8s.NewAPI(kubeconfigPath, config.CurrentContext, impersonate, impersonateGroup, 0)
   120  			if err != nil {
   121  				return err
   122  			}
   123  
   124  			sa, err := k.CoreV1().ServiceAccounts(opts.namespace).Get(cmd.Context(), opts.serviceAccountName, metav1.GetOptions{})
   125  			if err != nil {
   126  				return err
   127  			}
   128  
   129  			listOpts := metav1.ListOptions{
   130  				FieldSelector: fmt.Sprintf("type=%s", corev1.SecretTypeServiceAccountToken),
   131  			}
   132  			secrets, err := k.CoreV1().Secrets(opts.namespace).List(cmd.Context(), listOpts)
   133  			if err != nil {
   134  				return err
   135  			}
   136  
   137  			token, err := extractSAToken(secrets.Items, sa.Name)
   138  			if err != nil {
   139  				return err
   140  			}
   141  
   142  			context, ok := config.Contexts[config.CurrentContext]
   143  			if !ok {
   144  				return fmt.Errorf("could not extract current context from config")
   145  			}
   146  
   147  			context.AuthInfo = opts.serviceAccountName
   148  			config.Contexts = map[string]*api.Context{
   149  				config.CurrentContext: context,
   150  			}
   151  			config.AuthInfos = map[string]*api.AuthInfo{
   152  				opts.serviceAccountName: {
   153  					Token: token,
   154  				},
   155  			}
   156  
   157  			cluster := config.Clusters[context.Cluster]
   158  
   159  			if opts.apiServerAddress != "" {
   160  				cluster.Server = opts.apiServerAddress
   161  			}
   162  
   163  			config.Clusters = map[string]*api.Cluster{
   164  				context.Cluster: cluster,
   165  			}
   166  
   167  			kubeconfig, err := clientcmd.Write(config)
   168  			if err != nil {
   169  				return err
   170  			}
   171  
   172  			creds := corev1.Secret{
   173  				Type:     k8s.MirrorSecretType,
   174  				TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
   175  				ObjectMeta: metav1.ObjectMeta{
   176  					Name:      fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
   177  					Namespace: opts.namespace,
   178  				},
   179  				Data: map[string][]byte{
   180  					k8s.ConfigKeyName: kubeconfig,
   181  				},
   182  			}
   183  
   184  			credsOut, err := yaml.Marshal(creds)
   185  			if err != nil {
   186  				return err
   187  			}
   188  
   189  			destinationCreds := corev1.Secret{
   190  				Type:     k8s.MirrorSecretType,
   191  				TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
   192  				ObjectMeta: metav1.ObjectMeta{
   193  					Name:      fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
   194  					Namespace: controlPlaneNamespace,
   195  					Labels: map[string]string{
   196  						clusterNameLabel: opts.clusterName,
   197  					},
   198  					Annotations: map[string]string{
   199  						trustDomainAnnotation:   configMap.IdentityTrustDomain,
   200  						clusterDomainAnnotation: configMap.ClusterDomain,
   201  					},
   202  				},
   203  				Data: map[string][]byte{
   204  					k8s.ConfigKeyName: kubeconfig,
   205  				},
   206  			}
   207  			destinationCredsOut, err := yaml.Marshal(destinationCreds)
   208  			if err != nil {
   209  				return err
   210  			}
   211  
   212  			remoteDiscoverySelector, err := metav1.ParseToLabelSelector(opts.remoteDiscoverySelector)
   213  			if err != nil {
   214  				return err
   215  			}
   216  
   217  			link := mc.Link{
   218  				Name:                          opts.clusterName,
   219  				Namespace:                     opts.namespace,
   220  				TargetClusterName:             opts.clusterName,
   221  				TargetClusterDomain:           configMap.ClusterDomain,
   222  				TargetClusterLinkerdNamespace: controlPlaneNamespace,
   223  				ClusterCredentialsSecret:      fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
   224  				RemoteDiscoverySelector:       *remoteDiscoverySelector,
   225  			}
   226  
   227  			// If there is a gateway in the exporting cluster, populate Link
   228  			// resource with gateway information
   229  			if opts.enableGateway {
   230  				gateway, err := k.CoreV1().Services(opts.gatewayNamespace).Get(cmd.Context(), opts.gatewayName, metav1.GetOptions{})
   231  				if err != nil {
   232  					return err
   233  				}
   234  
   235  				gwAddresses := []string{}
   236  				for _, ingress := range gateway.Status.LoadBalancer.Ingress {
   237  					addr := ingress.IP
   238  					if addr == "" {
   239  						addr = ingress.Hostname
   240  					}
   241  					if addr == "" {
   242  						continue
   243  					}
   244  					gwAddresses = append(gwAddresses, addr)
   245  				}
   246  
   247  				if opts.gatewayAddresses != "" {
   248  					link.GatewayAddress = opts.gatewayAddresses
   249  				} else if len(gwAddresses) > 0 {
   250  					link.GatewayAddress = strings.Join(gwAddresses, ",")
   251  				} else {
   252  					return fmt.Errorf("Gateway %s.%s has no ingress addresses", gateway.Name, gateway.Namespace)
   253  				}
   254  
   255  				gatewayIdentity, ok := gateway.Annotations[k8s.GatewayIdentity]
   256  				if !ok || gatewayIdentity == "" {
   257  					return fmt.Errorf("Gateway %s.%s has no %s annotation", gateway.Name, gateway.Namespace, k8s.GatewayIdentity)
   258  				}
   259  				link.GatewayIdentity = gatewayIdentity
   260  
   261  				probeSpec, err := mc.ExtractProbeSpec(gateway)
   262  				if err != nil {
   263  					return err
   264  				}
   265  				link.ProbeSpec = probeSpec
   266  
   267  				gatewayPort, err := extractGatewayPort(gateway)
   268  				if err != nil {
   269  					return err
   270  				}
   271  
   272  				// Override with user provided gateway port if present
   273  				if opts.gatewayPort != 0 {
   274  					gatewayPort = opts.gatewayPort
   275  				}
   276  				link.GatewayPort = gatewayPort
   277  
   278  				selector, err := metav1.ParseToLabelSelector(opts.selector)
   279  				if err != nil {
   280  					return err
   281  				}
   282  
   283  				link.Selector = *selector
   284  			}
   285  
   286  			obj, err := link.ToUnstructured()
   287  			if err != nil {
   288  				return err
   289  			}
   290  			linkOut, err := yaml.Marshal(obj.Object)
   291  			if err != nil {
   292  				return err
   293  			}
   294  
   295  			values, err := buildServiceMirrorValues(opts)
   296  			if err != nil {
   297  				return err
   298  			}
   299  
   300  			// Create values override
   301  			valuesOverrides, err := valuesOptions.MergeValues(nil)
   302  			if err != nil {
   303  				return err
   304  			}
   305  
   306  			if opts.ha {
   307  				if valuesOverrides, err = charts.OverrideFromFile(
   308  					valuesOverrides,
   309  					static.Templates,
   310  					helmMulticlusterLinkDefaultChartName,
   311  					"values-ha.yaml",
   312  				); err != nil {
   313  					return err
   314  				}
   315  			}
   316  			serviceMirrorOut, err := renderServiceMirror(values, valuesOverrides, opts.namespace)
   317  			if err != nil {
   318  				return err
   319  			}
   320  
   321  			stdout.Write(credsOut)
   322  			stdout.Write([]byte("---\n"))
   323  			stdout.Write(destinationCredsOut)
   324  			stdout.Write([]byte("---\n"))
   325  			stdout.Write(linkOut)
   326  			stdout.Write([]byte("---\n"))
   327  			stdout.Write(serviceMirrorOut)
   328  			stdout.Write([]byte("---\n"))
   329  
   330  			return nil
   331  		},
   332  	}
   333  
   334  	flags.AddValueOptionsFlags(cmd.Flags(), &valuesOptions)
   335  	cmd.Flags().StringVar(&opts.namespace, "namespace", defaultMulticlusterNamespace, "The namespace for the service account")
   336  	cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "Cluster name")
   337  	cmd.Flags().StringVar(&opts.apiServerAddress, "api-server-address", "", "The api server address of the target cluster")
   338  	cmd.Flags().StringVar(&opts.serviceAccountName, "service-account-name", defaultServiceAccountName, "The name of the service account associated with the credentials")
   339  	cmd.Flags().StringVar(&opts.controlPlaneVersion, "control-plane-version", opts.controlPlaneVersion, "(Development) Tag to be used for the service mirror controller image")
   340  	cmd.Flags().StringVar(&opts.gatewayName, "gateway-name", defaultGatewayName, "The name of the gateway service")
   341  	cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", defaultMulticlusterNamespace, "The namespace of the gateway service")
   342  	cmd.Flags().Uint32Var(&opts.serviceMirrorRetryLimit, "service-mirror-retry-limit", opts.serviceMirrorRetryLimit, "The number of times a failed update from the target cluster is allowed to be retried")
   343  	cmd.Flags().StringVar(&opts.logLevel, "log-level", opts.logLevel, "Log level for the Multicluster components")
   344  	cmd.Flags().StringVar(&opts.logFormat, "log-format", opts.logFormat, "Log format for the Multicluster components")
   345  	cmd.Flags().StringVar(&opts.dockerRegistry, "registry", opts.dockerRegistry,
   346  		fmt.Sprintf("Docker registry to pull service mirror controller image from ($%s)", flags.EnvOverrideDockerRegistry))
   347  	cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror")
   348  	cmd.Flags().StringVar(&opts.remoteDiscoverySelector, "remote-discovery-selector", opts.remoteDiscoverySelector, "Selector (label query) to filter which services in the target cluster to mirror in remote discovery mode")
   349  	cmd.Flags().StringVar(&opts.gatewayAddresses, "gateway-addresses", opts.gatewayAddresses, "If specified, overwrites gateway addresses when gateway service is not type LoadBalancer (comma separated list)")
   350  	cmd.Flags().Uint32Var(&opts.gatewayPort, "gateway-port", opts.gatewayPort, "If specified, overwrites gateway port when gateway service is not type LoadBalancer")
   351  	cmd.Flags().BoolVar(&opts.ha, "ha", opts.ha, "Enable HA configuration for the service-mirror deployment (default false)")
   352  	cmd.Flags().BoolVar(&opts.enableGateway, "gateway", opts.enableGateway, "If false, allows a link to be created against a cluster that does not have a gateway service")
   353  
   354  	pkgcmd.ConfigureNamespaceFlagCompletion(
   355  		cmd, []string{"namespace", "gateway-namespace"},
   356  		kubeconfigPath, impersonate, impersonateGroup, kubeContext)
   357  	return cmd
   358  }
   359  
   360  func renderServiceMirror(values *multicluster.Values, valuesOverrides map[string]interface{}, namespace string) ([]byte, error) {
   361  	files := []*chartloader.BufferedFile{
   362  		{Name: chartutil.ChartfileName},
   363  		{Name: "templates/service-mirror.yaml"},
   364  		{Name: "templates/psp.yaml"},
   365  		{Name: "templates/gateway-mirror.yaml"},
   366  	}
   367  
   368  	var partialFiles []*chartloader.BufferedFile
   369  	for _, template := range charts.L5dPartials {
   370  		partialFiles = append(partialFiles,
   371  			&chartloader.BufferedFile{Name: template},
   372  		)
   373  	}
   374  
   375  	// Load all multicluster link chart files into buffer
   376  	if err := charts.FilesReader(static.Templates, helmMulticlusterLinkDefaultChartName+"/", files); err != nil {
   377  		return nil, err
   378  	}
   379  
   380  	// Load all partial chart files into buffer
   381  	if err := charts.FilesReader(partials.Templates, "", partialFiles); err != nil {
   382  		return nil, err
   383  	}
   384  
   385  	// Create a Chart obj from the files
   386  	chart, err := chartloader.LoadFiles(append(files, partialFiles...))
   387  	if err != nil {
   388  		return nil, err
   389  	}
   390  
   391  	// Render raw values and create chart config
   392  	rawValues, err := yaml.Marshal(values)
   393  	if err != nil {
   394  		return nil, err
   395  	}
   396  
   397  	// Store final Values generated from values.yaml and CLI flags
   398  	err = yaml.Unmarshal(rawValues, &chart.Values)
   399  	if err != nil {
   400  		return nil, err
   401  	}
   402  
   403  	vals, err := chartutil.CoalesceValues(chart, valuesOverrides)
   404  	if err != nil {
   405  		return nil, err
   406  	}
   407  
   408  	fullValues := map[string]interface{}{
   409  		"Values": vals,
   410  		"Release": map[string]interface{}{
   411  			"Namespace": namespace,
   412  			"Service":   "CLI",
   413  		},
   414  	}
   415  
   416  	// Attach the final values into the `Values` field for rendering to work
   417  	renderedTemplates, err := engine.Render(chart, fullValues)
   418  	if err != nil {
   419  		return nil, err
   420  	}
   421  
   422  	// Merge templates and inject
   423  	var out bytes.Buffer
   424  	for _, tmpl := range chart.Templates {
   425  		t := path.Join(chart.Metadata.Name, tmpl.Name)
   426  		if _, err := out.WriteString(renderedTemplates[t]); err != nil {
   427  			return nil, err
   428  		}
   429  	}
   430  
   431  	return out.Bytes(), nil
   432  }
   433  
   434  func newLinkOptionsWithDefault() (*linkOptions, error) {
   435  	defaults, err := multicluster.NewLinkValues()
   436  	if err != nil {
   437  		return nil, err
   438  	}
   439  
   440  	return &linkOptions{
   441  		controlPlaneVersion:     version.Version,
   442  		namespace:               defaultMulticlusterNamespace,
   443  		dockerRegistry:          pkgcmd.DefaultDockerRegistry,
   444  		serviceMirrorRetryLimit: defaults.ServiceMirrorRetryLimit,
   445  		logLevel:                defaults.LogLevel,
   446  		logFormat:               defaults.LogFormat,
   447  		selector:                fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "true"),
   448  		remoteDiscoverySelector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "remote-discovery"),
   449  		gatewayAddresses:        "",
   450  		gatewayPort:             0,
   451  		ha:                      false,
   452  		enableGateway:           true,
   453  	}, nil
   454  }
   455  
   456  func buildServiceMirrorValues(opts *linkOptions) (*multicluster.Values, error) {
   457  
   458  	if !alphaNumDashDot.MatchString(opts.controlPlaneVersion) {
   459  		return nil, fmt.Errorf("%s is not a valid version", opts.controlPlaneVersion)
   460  	}
   461  
   462  	if opts.namespace == "" {
   463  		return nil, errors.New("you need to specify a namespace")
   464  	}
   465  
   466  	if _, err := log.ParseLevel(opts.logLevel); err != nil {
   467  		return nil, fmt.Errorf("--log-level must be one of: panic, fatal, error, warn, info, debug, trace")
   468  	}
   469  
   470  	if opts.logFormat != "plain" && opts.logFormat != "json" {
   471  		return nil, fmt.Errorf("--log-format must be one of: plain, json")
   472  	}
   473  
   474  	if opts.selector != "" && opts.selector != fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "true") {
   475  		if !opts.enableGateway {
   476  			return nil, fmt.Errorf("--selector and --gateway=false are mutually exclusive")
   477  		}
   478  	}
   479  
   480  	if opts.gatewayAddresses != "" && !opts.enableGateway {
   481  		return nil, fmt.Errorf("--gateway-addresses and --gateway=false are mutually exclusive")
   482  	}
   483  
   484  	if opts.gatewayPort != 0 && !opts.enableGateway {
   485  		return nil, fmt.Errorf("--gateway-port and --gateway=false are mutually exclusive")
   486  	}
   487  
   488  	defaults, err := multicluster.NewLinkValues()
   489  	if err != nil {
   490  		return nil, err
   491  	}
   492  
   493  	defaults.Gateway.Enabled = opts.enableGateway
   494  	defaults.TargetClusterName = opts.clusterName
   495  	defaults.ServiceMirrorRetryLimit = opts.serviceMirrorRetryLimit
   496  	defaults.LogLevel = opts.logLevel
   497  	defaults.LogFormat = opts.logFormat
   498  	defaults.ControllerImageVersion = opts.controlPlaneVersion
   499  	defaults.ControllerImage = fmt.Sprintf("%s/controller", opts.dockerRegistry)
   500  
   501  	return defaults, nil
   502  }
   503  
   504  func extractGatewayPort(gateway *corev1.Service) (uint32, error) {
   505  	for _, port := range gateway.Spec.Ports {
   506  		if port.Name == k8s.GatewayPortName {
   507  			if gateway.Spec.Type == "NodePort" {
   508  				return uint32(port.NodePort), nil
   509  			}
   510  			return uint32(port.Port), nil
   511  		}
   512  	}
   513  	return 0, fmt.Errorf("gateway service %s has no gateway port named %s", gateway.Name, k8s.GatewayPortName)
   514  }
   515  
   516  func extractSAToken(secrets []corev1.Secret, saName string) (string, error) {
   517  	for _, secret := range secrets {
   518  		boundSA := secret.Annotations[saNameAnnotationKey]
   519  		if saName == boundSA {
   520  			token, ok := secret.Data[tokenKey]
   521  			if !ok {
   522  				return "", fmt.Errorf("could not find the token data in service account secret %s", secret.Name)
   523  			}
   524  
   525  			return string(token), nil
   526  		}
   527  	}
   528  
   529  	return "", fmt.Errorf("could not find service account token secret for %s", saName)
   530  }
   531  

View as plain text