...

Source file src/github.com/linkerd/linkerd2/pkg/multicluster/link.go

Documentation: github.com/linkerd/linkerd2/pkg/multicluster

     1  package multicluster
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"errors"
     7  	"fmt"
     8  	"strconv"
     9  	"strings"
    10  	"time"
    11  
    12  	"github.com/linkerd/linkerd2/pkg/k8s"
    13  	corev1 "k8s.io/api/core/v1"
    14  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    15  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    16  	"k8s.io/apimachinery/pkg/runtime/schema"
    17  	"k8s.io/client-go/dynamic"
    18  )
    19  
    20  type (
    21  	// ProbeSpec defines how a gateway should be queried for health. Once per
    22  	// period, the probe workers will send an HTTP request to the remote gateway
    23  	// on the given  port with the given path and expect a HTTP 200 response.
    24  	ProbeSpec struct {
    25  		Path   string
    26  		Port   uint32
    27  		Period time.Duration
    28  	}
    29  
    30  	// Link is an internal representation of the link.multicluster.linkerd.io
    31  	// custom resource.  It defines a multicluster link to a gateway in a
    32  	// target cluster and is configures the behavior of a service mirror
    33  	// controller.
    34  	Link struct {
    35  		Name                          string
    36  		Namespace                     string
    37  		TargetClusterName             string
    38  		TargetClusterDomain           string
    39  		TargetClusterLinkerdNamespace string
    40  		ClusterCredentialsSecret      string
    41  		GatewayAddress                string
    42  		GatewayPort                   uint32
    43  		GatewayIdentity               string
    44  		ProbeSpec                     ProbeSpec
    45  		Selector                      metav1.LabelSelector
    46  		RemoteDiscoverySelector       metav1.LabelSelector
    47  	}
    48  )
    49  
    50  // LinkGVR is the Group Version and Resource of the Link custom resource.
    51  var LinkGVR = schema.GroupVersionResource{
    52  	Group:    k8s.LinkAPIGroup,
    53  	Version:  k8s.LinkAPIVersion,
    54  	Resource: "links",
    55  }
    56  
    57  func (ps ProbeSpec) String() string {
    58  	return fmt.Sprintf("ProbeSpec: {path: %s, port: %d, period: %s}", ps.Path, ps.Port, ps.Period)
    59  }
    60  
    61  // NewLink parses an unstructured link.multicluster.linkerd.io resource and
    62  // converts it to a structured internal representation.
    63  func NewLink(u unstructured.Unstructured) (Link, error) {
    64  
    65  	spec, ok := u.Object["spec"]
    66  	if !ok {
    67  		return Link{}, errors.New("Field 'spec' is missing")
    68  	}
    69  	specObj, ok := spec.(map[string]interface{})
    70  	if !ok {
    71  		return Link{}, errors.New("Field 'spec' is not an object")
    72  	}
    73  
    74  	ps, ok := specObj["probeSpec"]
    75  	if !ok {
    76  		return Link{}, errors.New("Field 'probeSpec' is missing")
    77  	}
    78  	psObj, ok := ps.(map[string]interface{})
    79  	if !ok {
    80  		return Link{}, errors.New("Field 'probeSpec' it not an object")
    81  	}
    82  
    83  	probeSpec, err := newProbeSpec(psObj)
    84  	if err != nil {
    85  		return Link{}, err
    86  	}
    87  
    88  	targetClusterName, err := stringField(specObj, "targetClusterName")
    89  	if err != nil {
    90  		return Link{}, err
    91  	}
    92  
    93  	targetClusterDomain, err := stringField(specObj, "targetClusterDomain")
    94  	if err != nil {
    95  		return Link{}, err
    96  	}
    97  
    98  	targetClusterLinkerdNamespace, err := stringField(specObj, "targetClusterLinkerdNamespace")
    99  	if err != nil {
   100  		return Link{}, err
   101  	}
   102  
   103  	clusterCredentialsSecret, err := stringField(specObj, "clusterCredentialsSecret")
   104  	if err != nil {
   105  		return Link{}, err
   106  	}
   107  
   108  	gatewayAddress, err := stringField(specObj, "gatewayAddress")
   109  	if err != nil {
   110  		return Link{}, err
   111  	}
   112  
   113  	portStr, err := stringField(specObj, "gatewayPort")
   114  	if err != nil {
   115  		return Link{}, err
   116  	}
   117  	gatewayPort, err := strconv.ParseUint(portStr, 10, 32)
   118  	if err != nil {
   119  		return Link{}, err
   120  	}
   121  
   122  	gatewayIdentity, err := stringField(specObj, "gatewayIdentity")
   123  	if err != nil {
   124  		return Link{}, err
   125  	}
   126  
   127  	selector := metav1.LabelSelector{}
   128  	if selectorObj, ok := specObj["selector"]; ok {
   129  		bytes, err := json.Marshal(selectorObj)
   130  		if err != nil {
   131  			return Link{}, err
   132  		}
   133  		err = json.Unmarshal(bytes, &selector)
   134  		if err != nil {
   135  			return Link{}, err
   136  		}
   137  	}
   138  
   139  	remoteDiscoverySelector := metav1.LabelSelector{}
   140  	if selectorObj, ok := specObj["remoteDiscoverySelector"]; ok {
   141  		bytes, err := json.Marshal(selectorObj)
   142  		if err != nil {
   143  			return Link{}, err
   144  		}
   145  		err = json.Unmarshal(bytes, &remoteDiscoverySelector)
   146  		if err != nil {
   147  			return Link{}, err
   148  		}
   149  	}
   150  
   151  	return Link{
   152  		Name:                          u.GetName(),
   153  		Namespace:                     u.GetNamespace(),
   154  		TargetClusterName:             targetClusterName,
   155  		TargetClusterDomain:           targetClusterDomain,
   156  		TargetClusterLinkerdNamespace: targetClusterLinkerdNamespace,
   157  		ClusterCredentialsSecret:      clusterCredentialsSecret,
   158  		GatewayAddress:                gatewayAddress,
   159  		GatewayPort:                   uint32(gatewayPort),
   160  		GatewayIdentity:               gatewayIdentity,
   161  		ProbeSpec:                     probeSpec,
   162  		Selector:                      selector,
   163  		RemoteDiscoverySelector:       remoteDiscoverySelector,
   164  	}, nil
   165  }
   166  
   167  // ToUnstructured converts a Link struct into an unstructured resource that can
   168  // be used by a kubernetes dynamic client.
   169  func (l Link) ToUnstructured() (unstructured.Unstructured, error) {
   170  	spec := map[string]interface{}{
   171  		"targetClusterName":             l.TargetClusterName,
   172  		"targetClusterDomain":           l.TargetClusterDomain,
   173  		"targetClusterLinkerdNamespace": l.TargetClusterLinkerdNamespace,
   174  		"clusterCredentialsSecret":      l.ClusterCredentialsSecret,
   175  		"gatewayAddress":                l.GatewayAddress,
   176  		"gatewayPort":                   fmt.Sprintf("%d", l.GatewayPort),
   177  		"gatewayIdentity":               l.GatewayIdentity,
   178  		"probeSpec": map[string]interface{}{
   179  			"path":   l.ProbeSpec.Path,
   180  			"port":   fmt.Sprintf("%d", l.ProbeSpec.Port),
   181  			"period": l.ProbeSpec.Period.String(),
   182  		},
   183  	}
   184  
   185  	data, err := json.Marshal(l.Selector)
   186  	if err != nil {
   187  		return unstructured.Unstructured{}, err
   188  	}
   189  	selector := make(map[string]interface{})
   190  	err = json.Unmarshal(data, &selector)
   191  	if err != nil {
   192  		return unstructured.Unstructured{}, err
   193  	}
   194  	spec["selector"] = selector
   195  
   196  	data, err = json.Marshal(l.RemoteDiscoverySelector)
   197  	if err != nil {
   198  		return unstructured.Unstructured{}, err
   199  	}
   200  	remoteDiscoverySelector := make(map[string]interface{})
   201  	err = json.Unmarshal(data, &remoteDiscoverySelector)
   202  	if err != nil {
   203  		return unstructured.Unstructured{}, err
   204  	}
   205  	spec["remoteDiscoverySelector"] = remoteDiscoverySelector
   206  
   207  	return unstructured.Unstructured{
   208  		Object: map[string]interface{}{
   209  			"apiVersion": k8s.LinkAPIGroupVersion,
   210  			"kind":       k8s.LinkKind,
   211  			"metadata": map[string]interface{}{
   212  				"name":      l.Name,
   213  				"namespace": l.Namespace,
   214  			},
   215  			"spec": spec,
   216  		},
   217  	}, nil
   218  }
   219  
   220  // ExtractProbeSpec parses the ProbSpec from a gateway service's annotations.
   221  func ExtractProbeSpec(gateway *corev1.Service) (ProbeSpec, error) {
   222  	path := gateway.Annotations[k8s.GatewayProbePath]
   223  	if path == "" {
   224  		return ProbeSpec{}, errors.New("probe path is empty")
   225  	}
   226  
   227  	port, err := extractPort(gateway.Spec, k8s.ProbePortName)
   228  	if err != nil {
   229  		return ProbeSpec{}, err
   230  	}
   231  
   232  	period, err := strconv.ParseUint(gateway.Annotations[k8s.GatewayProbePeriod], 10, 32)
   233  	if err != nil {
   234  		return ProbeSpec{}, err
   235  	}
   236  
   237  	return ProbeSpec{
   238  		Path:   path,
   239  		Port:   port,
   240  		Period: time.Duration(period) * time.Second,
   241  	}, nil
   242  }
   243  
   244  // GetLinks fetches a list of all Link objects in the cluster.
   245  func GetLinks(ctx context.Context, client dynamic.Interface) ([]Link, error) {
   246  	list, err := client.Resource(LinkGVR).List(ctx, metav1.ListOptions{})
   247  	if err != nil {
   248  		return nil, err
   249  	}
   250  	links := []Link{}
   251  	errs := []string{}
   252  	for _, u := range list.Items {
   253  		link, err := NewLink(u)
   254  		if err != nil {
   255  			errs = append(errs, fmt.Sprintf("failed to parse Link %s: %s", u.GetName(), err))
   256  		} else {
   257  			links = append(links, link)
   258  		}
   259  	}
   260  	if len(errs) > 0 {
   261  		return nil, errors.New(strings.Join(errs, "\n"))
   262  	}
   263  	return links, nil
   264  }
   265  
   266  // GetLink fetches a Link object from Kubernetes by name/namespace.
   267  func GetLink(ctx context.Context, client dynamic.Interface, namespace, name string) (Link, error) {
   268  	unstructured, err := client.Resource(LinkGVR).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
   269  	if err != nil {
   270  		return Link{}, err
   271  	}
   272  	return NewLink(*unstructured)
   273  }
   274  
   275  func extractPort(spec corev1.ServiceSpec, portName string) (uint32, error) {
   276  	for _, p := range spec.Ports {
   277  		if p.Name == portName {
   278  			if spec.Type == "NodePort" {
   279  				return uint32(p.NodePort), nil
   280  			}
   281  			return uint32(p.Port), nil
   282  		}
   283  	}
   284  	return 0, fmt.Errorf("could not find port with name %s", portName)
   285  }
   286  
   287  func newProbeSpec(obj map[string]interface{}) (ProbeSpec, error) {
   288  	periodStr, err := stringField(obj, "period")
   289  	if err != nil {
   290  		return ProbeSpec{}, err
   291  	}
   292  	period, err := time.ParseDuration(periodStr)
   293  	if err != nil {
   294  		return ProbeSpec{}, err
   295  	}
   296  
   297  	path, err := stringField(obj, "path")
   298  	if err != nil {
   299  		return ProbeSpec{}, err
   300  	}
   301  
   302  	portStr, err := stringField(obj, "port")
   303  	if err != nil {
   304  		return ProbeSpec{}, err
   305  	}
   306  	port, err := strconv.ParseUint(portStr, 10, 32)
   307  	if err != nil {
   308  		return ProbeSpec{}, err
   309  	}
   310  
   311  	return ProbeSpec{
   312  		Path:   path,
   313  		Port:   uint32(port),
   314  		Period: period,
   315  	}, nil
   316  }
   317  
   318  func stringField(obj map[string]interface{}, key string) (string, error) {
   319  	value, ok := obj[key]
   320  	if !ok {
   321  		return "", fmt.Errorf("Field '%s' is missing", key)
   322  	}
   323  	str, ok := value.(string)
   324  	if !ok {
   325  		return "", fmt.Errorf("Field '%s' is not a string", key)
   326  	}
   327  	return str, nil
   328  }
   329  

View as plain text