...

Source file src/edge-infra.dev/pkg/edge/linkerd/manifests/manifests.go

Documentation: edge-infra.dev/pkg/edge/linkerd/manifests

     1  package manifests
     2  
     3  import (
     4  	"context"
     5  	_ "embed"
     6  	"errors"
     7  	"os"
     8  
     9  	appsv1 "k8s.io/api/apps/v1"
    10  	v1 "k8s.io/api/core/v1"
    11  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    12  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    13  	"sigs.k8s.io/controller-runtime/pkg/client"
    14  
    15  	l5drender "edge-infra.dev/pkg/edge/linkerd/helm/render"
    16  	l5dv1alpha1 "edge-infra.dev/pkg/edge/linkerd/k8s/apis/linkerd/v1alpha1"
    17  
    18  	"edge-infra.dev/pkg/k8s/decoder"
    19  	uobj "edge-infra.dev/pkg/k8s/unstructured"
    20  
    21  	"edge-infra.dev/pkg/edge/linkerd"
    22  )
    23  
    24  //go:embed linkerd_pod_monitor.yaml
    25  var l5dPodMonitor []byte
    26  
    27  // Render will generate the linkerd manifests into an array of unstructured
    28  // objects so we can apply with fluxcd/pkg/ssa
    29  func Render(ctx context.Context, c client.Client, l5d l5dv1alpha1.Linkerd, renderingOpts []l5drender.Option, thickPos bool) ([]*unstructured.Unstructured, error) {
    30  	manifests, err := l5drender.Unstructured(renderingOpts...)
    31  	if err != nil {
    32  		return nil, err
    33  	}
    34  
    35  	if thickPos {
    36  		// add l5d control plane as daemonsets and scale down deployments to zero
    37  		manifests, err = addDaemonSets(manifests)
    38  		if err != nil {
    39  			return nil, err
    40  		}
    41  		// mutate the headless services to use local traffic policy and set a ClusterIP
    42  		manifests, err = mutateHeadlessServices(ctx, c, manifests)
    43  		if err != nil {
    44  			return nil, err
    45  		}
    46  		manifests, err = scaleDeployments(manifests, 0)
    47  		if err != nil {
    48  			return nil, err
    49  		}
    50  	}
    51  
    52  	for i := range manifests {
    53  		manifests[i].SetOwnerReferences(linkerd.OwnerRef(&l5d))
    54  	}
    55  
    56  	return manifests, nil
    57  }
    58  
    59  // ReadUnstructuredObjects reads json file from a file system and converts the multi-doc
    60  // to unstructured.Unstructured objects
    61  func ReadUnstructuredObjects(path string) ([]*unstructured.Unstructured, error) {
    62  	if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
    63  		return nil, nil
    64  	}
    65  	jsonDoc, err := os.ReadFile(path)
    66  	if err != nil {
    67  		return nil, err
    68  	}
    69  	return uobj.FromJSON(jsonDoc)
    70  }
    71  
    72  // WriteUnstructuredObjects writes the unstructured object as json multi-doc to file-system
    73  func WriteUnstructuredObjects(path string, uobjs []*unstructured.Unstructured, perm os.FileMode) error {
    74  	jsonDoc, err := uobj.ToJSON(uobjs)
    75  	if err != nil {
    76  		return err
    77  	}
    78  	return os.WriteFile(path, jsonDoc, perm)
    79  }
    80  
    81  // MonitoringManifests returns the linkerd monitoring objects.
    82  // This includes pod monitors for linkerd-controller and linkerd-proxy
    83  func MonitoringManifests() ([]*unstructured.Unstructured, error) {
    84  	return decoder.DecodeYAML(l5dPodMonitor)
    85  }
    86  
    87  // addDaemonSets will convert the linkerd Deployments to DaemonSets and append them
    88  // to the unstructured manifests array.
    89  func addDaemonSets(manifests []*unstructured.Unstructured) ([]*unstructured.Unstructured, error) {
    90  	for _, deployment := range parseL5dDeployments(manifests) {
    91  		ds := convertDeploymentToDaemonSet(deployment)
    92  		unstructured, err := convertToUnstructured(ds)
    93  		if err != nil {
    94  			return nil, err
    95  		}
    96  		manifests = append(manifests, unstructured)
    97  	}
    98  	return manifests, nil
    99  }
   100  
   101  // scaleDeployments will scale the linkerd control-plane deployments
   102  func scaleDeployments(manifests []*unstructured.Unstructured, scale int32) ([]*unstructured.Unstructured, error) {
   103  	for idx, object := range manifests {
   104  		deployment, ok := tryConvertUnstructuredToDeployment(object)
   105  		if !ok {
   106  			continue
   107  		}
   108  		// set number of deployment replicas
   109  		deployment.Spec.Replicas = &scale
   110  
   111  		unstructured, err := convertToUnstructured(deployment)
   112  		if err != nil {
   113  			return nil, err
   114  		}
   115  		manifests[idx] = unstructured
   116  	}
   117  	return manifests, nil
   118  }
   119  
   120  // Parses the manifests and returns a list of linkerd control plane deployments
   121  func parseL5dDeployments(manifests []*unstructured.Unstructured) []*appsv1.Deployment {
   122  	var deployments []*appsv1.Deployment
   123  	for _, object := range manifests {
   124  		l5dDeployment, ok := tryConvertUnstructuredToDeployment(object)
   125  		if ok {
   126  			deployments = append(deployments, l5dDeployment)
   127  		}
   128  	}
   129  	return deployments
   130  }
   131  
   132  // MutateHeadlessServices iterates and parses the linkerd headless services.
   133  func mutateHeadlessServices(ctx context.Context, c client.Client, manifests []*unstructured.Unstructured) ([]*unstructured.Unstructured, error) {
   134  	for idx, object := range manifests {
   135  		service, ok := tryConvertUnstructuredToService(object)
   136  		if !ok {
   137  			continue
   138  		}
   139  
   140  		// ignore non-headless services
   141  		if !serviceIsHeadless(service) {
   142  			continue
   143  		}
   144  
   145  		// we must delete service and re-create to make it no longer headless
   146  		if err := deleteObject(ctx, c, client.ObjectKeyFromObject(service)); err != nil {
   147  			return nil, err
   148  		}
   149  
   150  		// thick-pos configure service and update unstructured manifest
   151  		unstructured, err := convertToUnstructured(mutateHeadlessService(service))
   152  		if err != nil {
   153  			return nil, err
   154  		}
   155  		manifests[idx] = unstructured
   156  	}
   157  
   158  	return manifests, nil
   159  }
   160  
   161  // deleteObject deletes a runtime object, ignoring not found errors
   162  func deleteObject(ctx context.Context, c client.Client, key client.ObjectKey) error {
   163  	obj := &v1.Service{}
   164  	if err := c.Get(ctx, key, obj); kerrors.IsNotFound(err) {
   165  		return nil
   166  	} else if err != nil {
   167  		return err
   168  	}
   169  	return c.Delete(ctx, obj)
   170  }
   171  
   172  // returns true if a service ClusterIP is None
   173  func serviceIsHeadless(service *v1.Service) bool {
   174  	return service.Spec.ClusterIP == "None"
   175  }
   176  
   177  // mutateHeadlessService will convert the service to a non-headless service
   178  func mutateHeadlessService(service *v1.Service) *v1.Service {
   179  	service.Spec.ClusterIP = ""
   180  	internalTrafficPolicy := v1.ServiceInternalTrafficPolicyLocal
   181  	service.Spec.InternalTrafficPolicy = &internalTrafficPolicy
   182  	return service
   183  }
   184  

View as plain text