package manifests import ( "context" _ "embed" "errors" "os" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "sigs.k8s.io/controller-runtime/pkg/client" l5drender "edge-infra.dev/pkg/edge/linkerd/helm/render" l5dv1alpha1 "edge-infra.dev/pkg/edge/linkerd/k8s/apis/linkerd/v1alpha1" "edge-infra.dev/pkg/k8s/decoder" uobj "edge-infra.dev/pkg/k8s/unstructured" "edge-infra.dev/pkg/edge/linkerd" ) //go:embed linkerd_pod_monitor.yaml var l5dPodMonitor []byte // Render will generate the linkerd manifests into an array of unstructured // objects so we can apply with fluxcd/pkg/ssa func Render(ctx context.Context, c client.Client, l5d l5dv1alpha1.Linkerd, renderingOpts []l5drender.Option, thickPos bool) ([]*unstructured.Unstructured, error) { manifests, err := l5drender.Unstructured(renderingOpts...) if err != nil { return nil, err } if thickPos { // add l5d control plane as daemonsets and scale down deployments to zero manifests, err = addDaemonSets(manifests) if err != nil { return nil, err } // mutate the headless services to use local traffic policy and set a ClusterIP manifests, err = mutateHeadlessServices(ctx, c, manifests) if err != nil { return nil, err } manifests, err = scaleDeployments(manifests, 0) if err != nil { return nil, err } } for i := range manifests { manifests[i].SetOwnerReferences(linkerd.OwnerRef(&l5d)) } return manifests, nil } // ReadUnstructuredObjects reads json file from a file system and converts the multi-doc // to unstructured.Unstructured objects func ReadUnstructuredObjects(path string) ([]*unstructured.Unstructured, error) { if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { return nil, nil } jsonDoc, err := os.ReadFile(path) if err != nil { return nil, err } return uobj.FromJSON(jsonDoc) } // WriteUnstructuredObjects writes the unstructured object as json multi-doc to file-system func WriteUnstructuredObjects(path string, uobjs []*unstructured.Unstructured, perm os.FileMode) error { jsonDoc, err := uobj.ToJSON(uobjs) if err != nil { return err } return os.WriteFile(path, jsonDoc, perm) } // MonitoringManifests returns the linkerd monitoring objects. // This includes pod monitors for linkerd-controller and linkerd-proxy func MonitoringManifests() ([]*unstructured.Unstructured, error) { return decoder.DecodeYAML(l5dPodMonitor) } // addDaemonSets will convert the linkerd Deployments to DaemonSets and append them // to the unstructured manifests array. func addDaemonSets(manifests []*unstructured.Unstructured) ([]*unstructured.Unstructured, error) { for _, deployment := range parseL5dDeployments(manifests) { ds := convertDeploymentToDaemonSet(deployment) unstructured, err := convertToUnstructured(ds) if err != nil { return nil, err } manifests = append(manifests, unstructured) } return manifests, nil } // scaleDeployments will scale the linkerd control-plane deployments func scaleDeployments(manifests []*unstructured.Unstructured, scale int32) ([]*unstructured.Unstructured, error) { for idx, object := range manifests { deployment, ok := tryConvertUnstructuredToDeployment(object) if !ok { continue } // set number of deployment replicas deployment.Spec.Replicas = &scale unstructured, err := convertToUnstructured(deployment) if err != nil { return nil, err } manifests[idx] = unstructured } return manifests, nil } // Parses the manifests and returns a list of linkerd control plane deployments func parseL5dDeployments(manifests []*unstructured.Unstructured) []*appsv1.Deployment { var deployments []*appsv1.Deployment for _, object := range manifests { l5dDeployment, ok := tryConvertUnstructuredToDeployment(object) if ok { deployments = append(deployments, l5dDeployment) } } return deployments } // MutateHeadlessServices iterates and parses the linkerd headless services. func mutateHeadlessServices(ctx context.Context, c client.Client, manifests []*unstructured.Unstructured) ([]*unstructured.Unstructured, error) { for idx, object := range manifests { service, ok := tryConvertUnstructuredToService(object) if !ok { continue } // ignore non-headless services if !serviceIsHeadless(service) { continue } // we must delete service and re-create to make it no longer headless if err := deleteObject(ctx, c, client.ObjectKeyFromObject(service)); err != nil { return nil, err } // thick-pos configure service and update unstructured manifest unstructured, err := convertToUnstructured(mutateHeadlessService(service)) if err != nil { return nil, err } manifests[idx] = unstructured } return manifests, nil } // deleteObject deletes a runtime object, ignoring not found errors func deleteObject(ctx context.Context, c client.Client, key client.ObjectKey) error { obj := &v1.Service{} if err := c.Get(ctx, key, obj); kerrors.IsNotFound(err) { return nil } else if err != nil { return err } return c.Delete(ctx, obj) } // returns true if a service ClusterIP is None func serviceIsHeadless(service *v1.Service) bool { return service.Spec.ClusterIP == "None" } // mutateHeadlessService will convert the service to a non-headless service func mutateHeadlessService(service *v1.Service) *v1.Service { service.Spec.ClusterIP = "" internalTrafficPolicy := v1.ServiceInternalTrafficPolicyLocal service.Spec.InternalTrafficPolicy = &internalTrafficPolicy return service }