1 package identity
2
3 import (
4 "context"
5 "flag"
6 "fmt"
7 "net"
8 "os"
9 "os/signal"
10 "path/filepath"
11 "syscall"
12 "time"
13
14 idctl "github.com/linkerd/linkerd2/controller/identity"
15 "github.com/linkerd/linkerd2/pkg/admin"
16 "github.com/linkerd/linkerd2/pkg/flags"
17 "github.com/linkerd/linkerd2/pkg/identity"
18 "github.com/linkerd/linkerd2/pkg/k8s"
19 "github.com/linkerd/linkerd2/pkg/prometheus"
20 "github.com/linkerd/linkerd2/pkg/tls"
21 "github.com/linkerd/linkerd2/pkg/trace"
22 log "github.com/sirupsen/logrus"
23 "google.golang.org/grpc"
24 corev1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/runtime"
26 "k8s.io/client-go/kubernetes/scheme"
27 typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
28 "k8s.io/client-go/tools/record"
29 )
30
31
32 func Main(args []string) {
33 cmd := flag.NewFlagSet("identity", flag.ExitOnError)
34
35 addr := cmd.String("addr", ":8080", "address to serve on")
36 adminAddr := cmd.String("admin-addr", ":9990", "address of HTTP admin server")
37 kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config")
38 controllerNS := cmd.String("controller-namespace", "", "namespace in which Linkerd is installed")
39 identityScheme := cmd.String("identity-scheme", "", "scheme used for the identity issuer secret format")
40 trustDomain := cmd.String("identity-trust-domain", "", "configures the name suffix used for identities")
41 identityIssuanceLifeTime := cmd.String("identity-issuance-lifetime", "", "the amount of time for which the Identity issuer should certify identity")
42 identityClockSkewAllowance := cmd.String("identity-clock-skew-allowance", "", "the amount of time to allow for clock skew within a Linkerd cluster")
43 enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server")
44 qps := cmd.Float64("kube-apiclient-qps", 100, "Maximum QPS sent to the kube-apiserver before throttling")
45 burst := cmd.Int("kube-apiclient-burst", 200, "Burst value over kube-apiclient-qps")
46
47 issuerPath := cmd.String("issuer",
48 "/var/run/linkerd/identity/issuer",
49 "path to directory containing issuer credentials")
50
51 var issuerPathCrt string
52 var issuerPathKey string
53 traceCollector := flags.AddTraceFlags(cmd)
54 componentName := "linkerd-identity"
55
56 flags.ConfigureAndParse(cmd, args)
57
58 ready := false
59 adminServer := admin.NewServer(*adminAddr, *enablePprof, &ready)
60
61 go func() {
62 log.Infof("starting admin server on %s", *adminAddr)
63 if err := adminServer.ListenAndServe(); err != nil {
64 log.Errorf("failed to start identity admin server: %s", err)
65 }
66 }()
67
68 identityTrustAnchorPEM, err := os.ReadFile(k8s.MountPathTrustRootsPEM)
69 if err != nil {
70 log.Fatalf("could not read identity trust anchors PEM: %s", err.Error())
71 }
72
73 stop := make(chan os.Signal, 1)
74 signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
75 ctx, cancel := context.WithCancel(context.Background())
76 defer cancel()
77
78 if *identityScheme == "" || *trustDomain == "" {
79 log.Infof("Identity disabled in control plane configuration.")
80
81 os.Exit(0)
82 }
83
84 if *identityScheme == k8s.IdentityIssuerSchemeLinkerd {
85 issuerPathCrt = filepath.Join(*issuerPath, k8s.IdentityIssuerCrtName)
86 issuerPathKey = filepath.Join(*issuerPath, k8s.IdentityIssuerKeyName)
87 } else {
88 issuerPathCrt = filepath.Join(*issuerPath, corev1.TLSCertKey)
89 issuerPathKey = filepath.Join(*issuerPath, corev1.TLSPrivateKeyKey)
90 }
91
92 dom, err := idctl.NewTrustDomain(*controllerNS, *trustDomain)
93 if err != nil {
94
95 log.Fatalf("Invalid trust domain: %s", err.Error())
96 }
97
98 trustAnchors, err := tls.DecodePEMCertPool(string(identityTrustAnchorPEM))
99 if err != nil {
100
101 log.Fatalf("Failed to read trust anchors: %s", err)
102 }
103
104 validity := tls.Validity{
105 ClockSkewAllowance: tls.DefaultClockSkewAllowance,
106 Lifetime: identity.DefaultIssuanceLifetime,
107 }
108 if pbd := *identityClockSkewAllowance; pbd != "" {
109 csa, err := time.ParseDuration(pbd)
110 if err != nil {
111 log.Warnf("Invalid clock skew allowance: %s", err)
112 } else {
113 validity.ClockSkewAllowance = csa
114 }
115 }
116 if pbd := *identityIssuanceLifeTime; pbd != "" {
117 il, err := time.ParseDuration(pbd)
118 if err != nil {
119 log.Warnf("Invalid issuance lifetime: %s", err)
120 } else {
121 validity.Lifetime = il
122 }
123 }
124
125 expectedName := fmt.Sprintf("identity.%s.%s", *controllerNS, *trustDomain)
126 issuerEvent := make(chan struct{})
127 issuerError := make(chan error)
128
129
130
131
132 watcher := tls.NewFsCredsWatcher(*issuerPath, issuerEvent, issuerError)
133 go func() {
134 if err := watcher.StartWatching(ctx); err != nil {
135
136 log.Fatalf("Failed to start creds watcher: %s", err)
137 }
138 }()
139
140
141
142
143 config, err := k8s.GetConfig(*kubeConfigPath, "")
144 if err != nil {
145 log.Fatalf("Error configuring Kubernetes API client: %s", err)
146 }
147 k8sAPI, err := k8s.NewAPIForConfig(config, "", []string{}, 0, float32(*qps), *burst)
148 if err != nil {
149 log.Fatalf("Failed to load kubeconfig: %s: %s", *kubeConfigPath, err)
150 }
151 log.Infof("Using k8s client with QPS=%.2f Burst=%d", config.QPS, config.Burst)
152
153 v, err := idctl.NewK8sTokenValidator(ctx, k8sAPI, dom)
154 if err != nil {
155 log.Fatalf("Failed to initialize identity service: %s", err)
156 }
157
158
159 eventBroadcaster := record.NewBroadcaster()
160 eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
161 Interface: k8sAPI.CoreV1().Events(""),
162 })
163 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: componentName})
164
165 if err != nil {
166 log.Fatalf("Failed to construct k8s event recorder: %s", err)
167 }
168
169 recordEventFunc := func(parent runtime.Object, eventType, reason, message string) {
170 if parent == nil {
171 parent = &corev1.ObjectReference{
172 APIVersion: "apps/v1",
173 Kind: "Deployment",
174 Namespace: *controllerNS,
175 Name: componentName,
176 }
177 }
178 recorder.Event(parent, eventType, reason, message)
179 }
180
181
182
183
184 svc := identity.NewService(v, trustAnchors, &validity, recordEventFunc, expectedName, issuerPathCrt, issuerPathKey)
185 if err = svc.Initialize(); err != nil {
186
187 log.Fatalf("Failed to initialize identity service: %s", err)
188 }
189 go func() {
190 svc.Run(issuerEvent, issuerError)
191 }()
192
193
194
195
196 lis, err := net.Listen("tcp", *addr)
197 if err != nil {
198
199 log.Fatalf("Failed to listen on %s: %s", *addr, err)
200 }
201
202 if *traceCollector != "" {
203 if err := trace.InitializeTracing(componentName, *traceCollector); err != nil {
204 log.Warnf("failed to initialize tracing: %s", err)
205 }
206 }
207 srv := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
208 identity.Register(srv, svc)
209 go func() {
210 log.Infof("starting gRPC server on %s", *addr)
211 if err := srv.Serve(lis); err != nil {
212 log.Errorf("failed to start identity gRPC server: %s", err)
213 }
214 }()
215
216 ready = true
217
218 <-stop
219 log.Infof("shutting down gRPC server on %s", *addr)
220 srv.GracefulStop()
221 adminServer.Shutdown(ctx)
222 }
223
View as plain text