...

Source file src/github.com/linkerd/linkerd2/controller/cmd/destination/main.go

Documentation: github.com/linkerd/linkerd2/controller/cmd/destination

     1  package destination
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"errors"
     7  	"flag"
     8  	"net"
     9  	"net/http"
    10  	"os"
    11  	"os/signal"
    12  	"syscall"
    13  
    14  	pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
    15  	"github.com/linkerd/linkerd2/controller/api/destination"
    16  	externalworkload "github.com/linkerd/linkerd2/controller/api/destination/external-workload"
    17  	"github.com/linkerd/linkerd2/controller/api/destination/watcher"
    18  	"github.com/linkerd/linkerd2/controller/k8s"
    19  	"github.com/linkerd/linkerd2/pkg/admin"
    20  	"github.com/linkerd/linkerd2/pkg/flags"
    21  	pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
    22  	"github.com/linkerd/linkerd2/pkg/trace"
    23  	"github.com/linkerd/linkerd2/pkg/util"
    24  	log "github.com/sirupsen/logrus"
    25  )
    26  
    27  // Main executes the destination subcommand
    28  func Main(args []string) {
    29  	cmd := flag.NewFlagSet("destination", flag.ExitOnError)
    30  
    31  	addr := cmd.String("addr", ":8086", "address to serve on")
    32  	metricsAddr := cmd.String("metrics-addr", ":9996", "address to serve scrapable metrics on")
    33  	kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config")
    34  	controllerNamespace := cmd.String("controller-namespace", "linkerd", "namespace in which Linkerd is installed")
    35  	enableH2Upgrade := cmd.Bool("enable-h2-upgrade", true,
    36  		"Enable transparently upgraded HTTP2 connections among pods in the service mesh")
    37  	enableEndpointSlices := cmd.Bool("enable-endpoint-slices", true,
    38  		"Enable the usage of EndpointSlice informers and resources")
    39  	enableIPv6 := cmd.Bool("enable-ipv6", true,
    40  		"Set to true to allow discovering IPv6 endpoints and preferring IPv6 when both IPv4 and IPv6 are available")
    41  	trustDomain := cmd.String("identity-trust-domain", "", "configures the name suffix used for identities")
    42  	clusterDomain := cmd.String("cluster-domain", "", "kubernetes cluster domain")
    43  	defaultOpaquePorts := cmd.String("default-opaque-ports", "", "configures the default opaque ports")
    44  	enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server")
    45  	// This will default to true. It can be overridden with experimental CLI
    46  	// flags. Currently not exposed as a configuration value through Helm.
    47  	exportControllerQueueMetrics := cmd.Bool("export-queue-metrics", true, "Exports queue metrics for the external workload controller")
    48  
    49  	traceCollector := flags.AddTraceFlags(cmd)
    50  
    51  	// Zone weighting is disabled by default because it is not consumed by
    52  	// proxies. This feature exists to support experimentation on top of the
    53  	// Linkerd control plane API.
    54  	extEndpointZoneWeights := cmd.Bool("ext-endpoint-zone-weights", false,
    55  		"Enable setting endpoint weighting based on zone locality")
    56  
    57  	// Cluster-wide defaults for meshed HTTP/2 client parameters.. These only
    58  	// apply to meshed connections, as we don't want to conflict with HTTP/2
    59  	// servers that enforce policies that limit client keep-alive behavior. The
    60  	// inbound proxy does not enforce such policies, so we're free to use
    61  	// defaults for meshed HTTP/2 connections.
    62  	meshedHTTP2ClientParamsJSON := cmd.String("meshed-http2-client-params", "",
    63  		"HTTP/2 client parameters for meshed connections in JSON format")
    64  
    65  	flags.ConfigureAndParse(cmd, args)
    66  
    67  	if *enableIPv6 && !*enableEndpointSlices {
    68  		log.Fatal("If --enable-ipv6=true then --enable-endpoint-slices needs to be true")
    69  	}
    70  
    71  	var meshedHTTP2ClientParams *pb.Http2ClientParams
    72  	if meshedHTTP2ClientParamsJSON != nil && *meshedHTTP2ClientParamsJSON != "" {
    73  		meshedHTTP2ClientParams = &pb.Http2ClientParams{}
    74  		if err := json.Unmarshal([]byte(*meshedHTTP2ClientParamsJSON), meshedHTTP2ClientParams); err != nil {
    75  			log.Fatalf("Failed to parse meshed HTTP/2 client parameters: %s", err)
    76  		}
    77  	}
    78  
    79  	ready := false
    80  	adminServer := admin.NewServer(*metricsAddr, *enablePprof, &ready)
    81  
    82  	go func() {
    83  		log.Infof("starting admin server on %s", *metricsAddr)
    84  		if err := adminServer.ListenAndServe(); err != nil {
    85  			if errors.Is(err, http.ErrServerClosed) {
    86  				log.Infof("Admin server closed (%s)", *metricsAddr)
    87  			} else {
    88  				log.Errorf("Admin server error (%s): %s", *metricsAddr, err)
    89  			}
    90  		}
    91  	}()
    92  
    93  	stop := make(chan os.Signal, 1)
    94  	signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
    95  
    96  	done := make(chan struct{})
    97  
    98  	lis, err := net.Listen("tcp", *addr)
    99  	if err != nil {
   100  		log.Fatalf("Failed to listen on %s: %s", *addr, err)
   101  	}
   102  
   103  	if *trustDomain == "" {
   104  		*trustDomain = "cluster.local"
   105  		log.Warnf(" expected trust domain through args (falling back to %s)", *trustDomain)
   106  	}
   107  
   108  	if *clusterDomain == "" {
   109  		*clusterDomain = "cluster.local"
   110  		log.Warnf("expected cluster domain through args (falling back to %s)", *clusterDomain)
   111  	}
   112  
   113  	opaquePorts := util.ParsePorts(*defaultOpaquePorts)
   114  
   115  	log.Infof("Using default opaque ports: %v", opaquePorts)
   116  
   117  	if *traceCollector != "" {
   118  		if err := trace.InitializeTracing("linkerd-destination", *traceCollector); err != nil {
   119  			log.Warnf("failed to initialize tracing: %s", err)
   120  		}
   121  	}
   122  
   123  	// we need to create a separate client to check for EndpointSlice access in k8s cluster
   124  	// when slices are enabled and registered, k8sAPI is initialized with 'ES' resource
   125  	k8Client, err := pkgK8s.NewAPI(*kubeConfigPath, "", "", []string{}, 0)
   126  	if err != nil {
   127  		log.Fatalf("Failed to initialize K8s API Client: %s", err)
   128  	}
   129  
   130  	ctx := context.Background()
   131  
   132  	err = pkgK8s.EndpointSliceAccess(ctx, k8Client)
   133  	if *enableEndpointSlices && err != nil {
   134  		log.Fatalf("Failed to start with EndpointSlices enabled: %s", err)
   135  	}
   136  
   137  	var k8sAPI *k8s.API
   138  	if *enableEndpointSlices {
   139  		k8sAPI, err = k8s.InitializeAPI(
   140  			ctx,
   141  			*kubeConfigPath,
   142  			true,
   143  			"local",
   144  			k8s.Endpoint, k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, k8s.ExtWorkload,
   145  		)
   146  	} else {
   147  		k8sAPI, err = k8s.InitializeAPI(
   148  			ctx,
   149  			*kubeConfigPath,
   150  			true,
   151  			"local",
   152  			k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, k8s.ExtWorkload,
   153  		)
   154  	}
   155  	if err != nil {
   156  		log.Fatalf("Failed to initialize K8s API: %s", err)
   157  	}
   158  
   159  	metadataAPI, err := k8s.InitializeMetadataAPI(*kubeConfigPath, "local", k8s.Node, k8s.RS, k8s.Job)
   160  	if err != nil {
   161  		log.Fatalf("Failed to initialize Kubernetes metadata API: %s", err)
   162  	}
   163  
   164  	clusterStore, err := watcher.NewClusterStore(k8Client, *controllerNamespace, *enableEndpointSlices)
   165  	if err != nil {
   166  		log.Fatalf("Failed to initialize Cluster Store: %s", err)
   167  	}
   168  
   169  	config := destination.Config{
   170  		ControllerNS:            *controllerNamespace,
   171  		IdentityTrustDomain:     *trustDomain,
   172  		ClusterDomain:           *clusterDomain,
   173  		DefaultOpaquePorts:      opaquePorts,
   174  		EnableH2Upgrade:         *enableH2Upgrade,
   175  		EnableEndpointSlices:    *enableEndpointSlices,
   176  		EnableIPv6:              *enableIPv6,
   177  		ExtEndpointZoneWeights:  *extEndpointZoneWeights,
   178  		MeshedHttp2ClientParams: meshedHTTP2ClientParams,
   179  	}
   180  	server, err := destination.NewServer(
   181  		*addr,
   182  		config,
   183  		k8sAPI,
   184  		metadataAPI,
   185  		clusterStore,
   186  		done,
   187  	)
   188  
   189  	if err != nil {
   190  		log.Fatalf("Failed to initialize destination server: %s", err)
   191  	}
   192  
   193  	// blocks until caches are synced
   194  	k8sAPI.Sync(nil)
   195  	metadataAPI.Sync(nil)
   196  	clusterStore.Sync(nil)
   197  
   198  	// Start mesh expansion external workload controller to write endpointslices
   199  	// to API Server.
   200  	if *enableEndpointSlices {
   201  		hostname, ok := os.LookupEnv("HOSTNAME")
   202  		if !ok {
   203  			log.Fatal("Failed to initialize External Workload Endpoints Controller, \"HOSTNAME\" value not found")
   204  		}
   205  		externalWorkloadController, err := externalworkload.NewEndpointsController(k8sAPI, hostname, *controllerNamespace, done, *exportControllerQueueMetrics)
   206  		if err != nil {
   207  			log.Fatalf("Failed to initialize External Workload Endpoints Controller: %v", err)
   208  		}
   209  
   210  		externalWorkloadController.Start()
   211  	}
   212  
   213  	go func() {
   214  		log.Infof("starting gRPC server on %s", *addr)
   215  		if err := server.Serve(lis); err != nil {
   216  			log.Errorf("failed to start destination gRPC server: %s", err)
   217  		}
   218  	}()
   219  
   220  	ready = true
   221  
   222  	<-stop
   223  
   224  	log.Infof("shutting down gRPC server on %s", *addr)
   225  	close(done)
   226  	server.GracefulStop()
   227  	adminServer.Shutdown(ctx)
   228  }
   229  

View as plain text