1 package destination
2
3 import (
4 "context"
5 "encoding/json"
6 "errors"
7 "flag"
8 "net"
9 "net/http"
10 "os"
11 "os/signal"
12 "syscall"
13
14 pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
15 "github.com/linkerd/linkerd2/controller/api/destination"
16 externalworkload "github.com/linkerd/linkerd2/controller/api/destination/external-workload"
17 "github.com/linkerd/linkerd2/controller/api/destination/watcher"
18 "github.com/linkerd/linkerd2/controller/k8s"
19 "github.com/linkerd/linkerd2/pkg/admin"
20 "github.com/linkerd/linkerd2/pkg/flags"
21 pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
22 "github.com/linkerd/linkerd2/pkg/trace"
23 "github.com/linkerd/linkerd2/pkg/util"
24 log "github.com/sirupsen/logrus"
25 )
26
27
28 func Main(args []string) {
29 cmd := flag.NewFlagSet("destination", flag.ExitOnError)
30
31 addr := cmd.String("addr", ":8086", "address to serve on")
32 metricsAddr := cmd.String("metrics-addr", ":9996", "address to serve scrapable metrics on")
33 kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config")
34 controllerNamespace := cmd.String("controller-namespace", "linkerd", "namespace in which Linkerd is installed")
35 enableH2Upgrade := cmd.Bool("enable-h2-upgrade", true,
36 "Enable transparently upgraded HTTP2 connections among pods in the service mesh")
37 enableEndpointSlices := cmd.Bool("enable-endpoint-slices", true,
38 "Enable the usage of EndpointSlice informers and resources")
39 enableIPv6 := cmd.Bool("enable-ipv6", true,
40 "Set to true to allow discovering IPv6 endpoints and preferring IPv6 when both IPv4 and IPv6 are available")
41 trustDomain := cmd.String("identity-trust-domain", "", "configures the name suffix used for identities")
42 clusterDomain := cmd.String("cluster-domain", "", "kubernetes cluster domain")
43 defaultOpaquePorts := cmd.String("default-opaque-ports", "", "configures the default opaque ports")
44 enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server")
45
46
47 exportControllerQueueMetrics := cmd.Bool("export-queue-metrics", true, "Exports queue metrics for the external workload controller")
48
49 traceCollector := flags.AddTraceFlags(cmd)
50
51
52
53
54 extEndpointZoneWeights := cmd.Bool("ext-endpoint-zone-weights", false,
55 "Enable setting endpoint weighting based on zone locality")
56
57
58
59
60
61
62 meshedHTTP2ClientParamsJSON := cmd.String("meshed-http2-client-params", "",
63 "HTTP/2 client parameters for meshed connections in JSON format")
64
65 flags.ConfigureAndParse(cmd, args)
66
67 if *enableIPv6 && !*enableEndpointSlices {
68 log.Fatal("If --enable-ipv6=true then --enable-endpoint-slices needs to be true")
69 }
70
71 var meshedHTTP2ClientParams *pb.Http2ClientParams
72 if meshedHTTP2ClientParamsJSON != nil && *meshedHTTP2ClientParamsJSON != "" {
73 meshedHTTP2ClientParams = &pb.Http2ClientParams{}
74 if err := json.Unmarshal([]byte(*meshedHTTP2ClientParamsJSON), meshedHTTP2ClientParams); err != nil {
75 log.Fatalf("Failed to parse meshed HTTP/2 client parameters: %s", err)
76 }
77 }
78
79 ready := false
80 adminServer := admin.NewServer(*metricsAddr, *enablePprof, &ready)
81
82 go func() {
83 log.Infof("starting admin server on %s", *metricsAddr)
84 if err := adminServer.ListenAndServe(); err != nil {
85 if errors.Is(err, http.ErrServerClosed) {
86 log.Infof("Admin server closed (%s)", *metricsAddr)
87 } else {
88 log.Errorf("Admin server error (%s): %s", *metricsAddr, err)
89 }
90 }
91 }()
92
93 stop := make(chan os.Signal, 1)
94 signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
95
96 done := make(chan struct{})
97
98 lis, err := net.Listen("tcp", *addr)
99 if err != nil {
100 log.Fatalf("Failed to listen on %s: %s", *addr, err)
101 }
102
103 if *trustDomain == "" {
104 *trustDomain = "cluster.local"
105 log.Warnf(" expected trust domain through args (falling back to %s)", *trustDomain)
106 }
107
108 if *clusterDomain == "" {
109 *clusterDomain = "cluster.local"
110 log.Warnf("expected cluster domain through args (falling back to %s)", *clusterDomain)
111 }
112
113 opaquePorts := util.ParsePorts(*defaultOpaquePorts)
114
115 log.Infof("Using default opaque ports: %v", opaquePorts)
116
117 if *traceCollector != "" {
118 if err := trace.InitializeTracing("linkerd-destination", *traceCollector); err != nil {
119 log.Warnf("failed to initialize tracing: %s", err)
120 }
121 }
122
123
124
125 k8Client, err := pkgK8s.NewAPI(*kubeConfigPath, "", "", []string{}, 0)
126 if err != nil {
127 log.Fatalf("Failed to initialize K8s API Client: %s", err)
128 }
129
130 ctx := context.Background()
131
132 err = pkgK8s.EndpointSliceAccess(ctx, k8Client)
133 if *enableEndpointSlices && err != nil {
134 log.Fatalf("Failed to start with EndpointSlices enabled: %s", err)
135 }
136
137 var k8sAPI *k8s.API
138 if *enableEndpointSlices {
139 k8sAPI, err = k8s.InitializeAPI(
140 ctx,
141 *kubeConfigPath,
142 true,
143 "local",
144 k8s.Endpoint, k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, k8s.ExtWorkload,
145 )
146 } else {
147 k8sAPI, err = k8s.InitializeAPI(
148 ctx,
149 *kubeConfigPath,
150 true,
151 "local",
152 k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, k8s.ExtWorkload,
153 )
154 }
155 if err != nil {
156 log.Fatalf("Failed to initialize K8s API: %s", err)
157 }
158
159 metadataAPI, err := k8s.InitializeMetadataAPI(*kubeConfigPath, "local", k8s.Node, k8s.RS, k8s.Job)
160 if err != nil {
161 log.Fatalf("Failed to initialize Kubernetes metadata API: %s", err)
162 }
163
164 clusterStore, err := watcher.NewClusterStore(k8Client, *controllerNamespace, *enableEndpointSlices)
165 if err != nil {
166 log.Fatalf("Failed to initialize Cluster Store: %s", err)
167 }
168
169 config := destination.Config{
170 ControllerNS: *controllerNamespace,
171 IdentityTrustDomain: *trustDomain,
172 ClusterDomain: *clusterDomain,
173 DefaultOpaquePorts: opaquePorts,
174 EnableH2Upgrade: *enableH2Upgrade,
175 EnableEndpointSlices: *enableEndpointSlices,
176 EnableIPv6: *enableIPv6,
177 ExtEndpointZoneWeights: *extEndpointZoneWeights,
178 MeshedHttp2ClientParams: meshedHTTP2ClientParams,
179 }
180 server, err := destination.NewServer(
181 *addr,
182 config,
183 k8sAPI,
184 metadataAPI,
185 clusterStore,
186 done,
187 )
188
189 if err != nil {
190 log.Fatalf("Failed to initialize destination server: %s", err)
191 }
192
193
194 k8sAPI.Sync(nil)
195 metadataAPI.Sync(nil)
196 clusterStore.Sync(nil)
197
198
199
200 if *enableEndpointSlices {
201 hostname, ok := os.LookupEnv("HOSTNAME")
202 if !ok {
203 log.Fatal("Failed to initialize External Workload Endpoints Controller, \"HOSTNAME\" value not found")
204 }
205 externalWorkloadController, err := externalworkload.NewEndpointsController(k8sAPI, hostname, *controllerNamespace, done, *exportControllerQueueMetrics)
206 if err != nil {
207 log.Fatalf("Failed to initialize External Workload Endpoints Controller: %v", err)
208 }
209
210 externalWorkloadController.Start()
211 }
212
213 go func() {
214 log.Infof("starting gRPC server on %s", *addr)
215 if err := server.Serve(lis); err != nil {
216 log.Errorf("failed to start destination gRPC server: %s", err)
217 }
218 }()
219
220 ready = true
221
222 <-stop
223
224 log.Infof("shutting down gRPC server on %s", *addr)
225 close(done)
226 server.GracefulStop()
227 adminServer.Shutdown(ctx)
228 }
229
View as plain text