package destination import ( "context" "encoding/json" "errors" "flag" "net" "net/http" "os" "os/signal" "syscall" pb "github.com/linkerd/linkerd2-proxy-api/go/destination" "github.com/linkerd/linkerd2/controller/api/destination" externalworkload "github.com/linkerd/linkerd2/controller/api/destination/external-workload" "github.com/linkerd/linkerd2/controller/api/destination/watcher" "github.com/linkerd/linkerd2/controller/k8s" "github.com/linkerd/linkerd2/pkg/admin" "github.com/linkerd/linkerd2/pkg/flags" pkgK8s "github.com/linkerd/linkerd2/pkg/k8s" "github.com/linkerd/linkerd2/pkg/trace" "github.com/linkerd/linkerd2/pkg/util" log "github.com/sirupsen/logrus" ) // Main executes the destination subcommand func Main(args []string) { cmd := flag.NewFlagSet("destination", flag.ExitOnError) addr := cmd.String("addr", ":8086", "address to serve on") metricsAddr := cmd.String("metrics-addr", ":9996", "address to serve scrapable metrics on") kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config") controllerNamespace := cmd.String("controller-namespace", "linkerd", "namespace in which Linkerd is installed") enableH2Upgrade := cmd.Bool("enable-h2-upgrade", true, "Enable transparently upgraded HTTP2 connections among pods in the service mesh") enableEndpointSlices := cmd.Bool("enable-endpoint-slices", true, "Enable the usage of EndpointSlice informers and resources") enableIPv6 := cmd.Bool("enable-ipv6", true, "Set to true to allow discovering IPv6 endpoints and preferring IPv6 when both IPv4 and IPv6 are available") trustDomain := cmd.String("identity-trust-domain", "", "configures the name suffix used for identities") clusterDomain := cmd.String("cluster-domain", "", "kubernetes cluster domain") defaultOpaquePorts := cmd.String("default-opaque-ports", "", "configures the default opaque ports") enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server") // This will default to true. It can be overridden with experimental CLI // flags. Currently not exposed as a configuration value through Helm. exportControllerQueueMetrics := cmd.Bool("export-queue-metrics", true, "Exports queue metrics for the external workload controller") traceCollector := flags.AddTraceFlags(cmd) // Zone weighting is disabled by default because it is not consumed by // proxies. This feature exists to support experimentation on top of the // Linkerd control plane API. extEndpointZoneWeights := cmd.Bool("ext-endpoint-zone-weights", false, "Enable setting endpoint weighting based on zone locality") // Cluster-wide defaults for meshed HTTP/2 client parameters.. These only // apply to meshed connections, as we don't want to conflict with HTTP/2 // servers that enforce policies that limit client keep-alive behavior. The // inbound proxy does not enforce such policies, so we're free to use // defaults for meshed HTTP/2 connections. meshedHTTP2ClientParamsJSON := cmd.String("meshed-http2-client-params", "", "HTTP/2 client parameters for meshed connections in JSON format") flags.ConfigureAndParse(cmd, args) if *enableIPv6 && !*enableEndpointSlices { log.Fatal("If --enable-ipv6=true then --enable-endpoint-slices needs to be true") } var meshedHTTP2ClientParams *pb.Http2ClientParams if meshedHTTP2ClientParamsJSON != nil && *meshedHTTP2ClientParamsJSON != "" { meshedHTTP2ClientParams = &pb.Http2ClientParams{} if err := json.Unmarshal([]byte(*meshedHTTP2ClientParamsJSON), meshedHTTP2ClientParams); err != nil { log.Fatalf("Failed to parse meshed HTTP/2 client parameters: %s", err) } } ready := false adminServer := admin.NewServer(*metricsAddr, *enablePprof, &ready) go func() { log.Infof("starting admin server on %s", *metricsAddr) if err := adminServer.ListenAndServe(); err != nil { if errors.Is(err, http.ErrServerClosed) { log.Infof("Admin server closed (%s)", *metricsAddr) } else { log.Errorf("Admin server error (%s): %s", *metricsAddr, err) } } }() stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) done := make(chan struct{}) lis, err := net.Listen("tcp", *addr) if err != nil { log.Fatalf("Failed to listen on %s: %s", *addr, err) } if *trustDomain == "" { *trustDomain = "cluster.local" log.Warnf(" expected trust domain through args (falling back to %s)", *trustDomain) } if *clusterDomain == "" { *clusterDomain = "cluster.local" log.Warnf("expected cluster domain through args (falling back to %s)", *clusterDomain) } opaquePorts := util.ParsePorts(*defaultOpaquePorts) log.Infof("Using default opaque ports: %v", opaquePorts) if *traceCollector != "" { if err := trace.InitializeTracing("linkerd-destination", *traceCollector); err != nil { log.Warnf("failed to initialize tracing: %s", err) } } // we need to create a separate client to check for EndpointSlice access in k8s cluster // when slices are enabled and registered, k8sAPI is initialized with 'ES' resource k8Client, err := pkgK8s.NewAPI(*kubeConfigPath, "", "", []string{}, 0) if err != nil { log.Fatalf("Failed to initialize K8s API Client: %s", err) } ctx := context.Background() err = pkgK8s.EndpointSliceAccess(ctx, k8Client) if *enableEndpointSlices && err != nil { log.Fatalf("Failed to start with EndpointSlices enabled: %s", err) } var k8sAPI *k8s.API if *enableEndpointSlices { k8sAPI, err = k8s.InitializeAPI( ctx, *kubeConfigPath, true, "local", k8s.Endpoint, k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, k8s.ExtWorkload, ) } else { k8sAPI, err = k8s.InitializeAPI( ctx, *kubeConfigPath, true, "local", k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, k8s.ExtWorkload, ) } if err != nil { log.Fatalf("Failed to initialize K8s API: %s", err) } metadataAPI, err := k8s.InitializeMetadataAPI(*kubeConfigPath, "local", k8s.Node, k8s.RS, k8s.Job) if err != nil { log.Fatalf("Failed to initialize Kubernetes metadata API: %s", err) } clusterStore, err := watcher.NewClusterStore(k8Client, *controllerNamespace, *enableEndpointSlices) if err != nil { log.Fatalf("Failed to initialize Cluster Store: %s", err) } config := destination.Config{ ControllerNS: *controllerNamespace, IdentityTrustDomain: *trustDomain, ClusterDomain: *clusterDomain, DefaultOpaquePorts: opaquePorts, EnableH2Upgrade: *enableH2Upgrade, EnableEndpointSlices: *enableEndpointSlices, EnableIPv6: *enableIPv6, ExtEndpointZoneWeights: *extEndpointZoneWeights, MeshedHttp2ClientParams: meshedHTTP2ClientParams, } server, err := destination.NewServer( *addr, config, k8sAPI, metadataAPI, clusterStore, done, ) if err != nil { log.Fatalf("Failed to initialize destination server: %s", err) } // blocks until caches are synced k8sAPI.Sync(nil) metadataAPI.Sync(nil) clusterStore.Sync(nil) // Start mesh expansion external workload controller to write endpointslices // to API Server. if *enableEndpointSlices { hostname, ok := os.LookupEnv("HOSTNAME") if !ok { log.Fatal("Failed to initialize External Workload Endpoints Controller, \"HOSTNAME\" value not found") } externalWorkloadController, err := externalworkload.NewEndpointsController(k8sAPI, hostname, *controllerNamespace, done, *exportControllerQueueMetrics) if err != nil { log.Fatalf("Failed to initialize External Workload Endpoints Controller: %v", err) } externalWorkloadController.Start() } go func() { log.Infof("starting gRPC server on %s", *addr) if err := server.Serve(lis); err != nil { log.Errorf("failed to start destination gRPC server: %s", err) } }() ready = true <-stop log.Infof("shutting down gRPC server on %s", *addr) close(done) server.GracefulStop() adminServer.Shutdown(ctx) }