...

Source file src/github.com/linkerd/linkerd2/controller/cmd/identity/main.go

Documentation: github.com/linkerd/linkerd2/controller/cmd/identity

     1  package identity
     2  
     3  import (
     4  	"context"
     5  	"flag"
     6  	"fmt"
     7  	"net"
     8  	"os"
     9  	"os/signal"
    10  	"path/filepath"
    11  	"syscall"
    12  	"time"
    13  
    14  	idctl "github.com/linkerd/linkerd2/controller/identity"
    15  	"github.com/linkerd/linkerd2/pkg/admin"
    16  	"github.com/linkerd/linkerd2/pkg/flags"
    17  	"github.com/linkerd/linkerd2/pkg/identity"
    18  	"github.com/linkerd/linkerd2/pkg/k8s"
    19  	"github.com/linkerd/linkerd2/pkg/prometheus"
    20  	"github.com/linkerd/linkerd2/pkg/tls"
    21  	"github.com/linkerd/linkerd2/pkg/trace"
    22  	log "github.com/sirupsen/logrus"
    23  	"google.golang.org/grpc"
    24  	corev1 "k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/runtime"
    26  	"k8s.io/client-go/kubernetes/scheme"
    27  	typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
    28  	"k8s.io/client-go/tools/record"
    29  )
    30  
    31  // Main executes the identity subcommand
    32  func Main(args []string) {
    33  	cmd := flag.NewFlagSet("identity", flag.ExitOnError)
    34  
    35  	addr := cmd.String("addr", ":8080", "address to serve on")
    36  	adminAddr := cmd.String("admin-addr", ":9990", "address of HTTP admin server")
    37  	kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config")
    38  	controllerNS := cmd.String("controller-namespace", "", "namespace in which Linkerd is installed")
    39  	identityScheme := cmd.String("identity-scheme", "", "scheme used for the identity issuer secret format")
    40  	trustDomain := cmd.String("identity-trust-domain", "", "configures the name suffix used for identities")
    41  	identityIssuanceLifeTime := cmd.String("identity-issuance-lifetime", "", "the amount of time for which the Identity issuer should certify identity")
    42  	identityClockSkewAllowance := cmd.String("identity-clock-skew-allowance", "", "the amount of time to allow for clock skew within a Linkerd cluster")
    43  	enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server")
    44  	qps := cmd.Float64("kube-apiclient-qps", 100, "Maximum QPS sent to the kube-apiserver before throttling")
    45  	burst := cmd.Int("kube-apiclient-burst", 200, "Burst value over kube-apiclient-qps")
    46  
    47  	issuerPath := cmd.String("issuer",
    48  		"/var/run/linkerd/identity/issuer",
    49  		"path to directory containing issuer credentials")
    50  
    51  	var issuerPathCrt string
    52  	var issuerPathKey string
    53  	traceCollector := flags.AddTraceFlags(cmd)
    54  	componentName := "linkerd-identity"
    55  
    56  	flags.ConfigureAndParse(cmd, args)
    57  
    58  	ready := false
    59  	adminServer := admin.NewServer(*adminAddr, *enablePprof, &ready)
    60  
    61  	go func() {
    62  		log.Infof("starting admin server on %s", *adminAddr)
    63  		if err := adminServer.ListenAndServe(); err != nil {
    64  			log.Errorf("failed to start identity admin server: %s", err)
    65  		}
    66  	}()
    67  
    68  	identityTrustAnchorPEM, err := os.ReadFile(k8s.MountPathTrustRootsPEM)
    69  	if err != nil {
    70  		log.Fatalf("could not read identity trust anchors PEM: %s", err.Error())
    71  	}
    72  
    73  	stop := make(chan os.Signal, 1)
    74  	signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
    75  	ctx, cancel := context.WithCancel(context.Background())
    76  	defer cancel()
    77  
    78  	if *identityScheme == "" || *trustDomain == "" {
    79  		log.Infof("Identity disabled in control plane configuration.")
    80  		//nolint:gocritic
    81  		os.Exit(0)
    82  	}
    83  
    84  	if *identityScheme == k8s.IdentityIssuerSchemeLinkerd {
    85  		issuerPathCrt = filepath.Join(*issuerPath, k8s.IdentityIssuerCrtName)
    86  		issuerPathKey = filepath.Join(*issuerPath, k8s.IdentityIssuerKeyName)
    87  	} else {
    88  		issuerPathCrt = filepath.Join(*issuerPath, corev1.TLSCertKey)
    89  		issuerPathKey = filepath.Join(*issuerPath, corev1.TLSPrivateKeyKey)
    90  	}
    91  
    92  	dom, err := idctl.NewTrustDomain(*controllerNS, *trustDomain)
    93  	if err != nil {
    94  		//nolint:gocritic
    95  		log.Fatalf("Invalid trust domain: %s", err.Error())
    96  	}
    97  
    98  	trustAnchors, err := tls.DecodePEMCertPool(string(identityTrustAnchorPEM))
    99  	if err != nil {
   100  		//nolint:gocritic
   101  		log.Fatalf("Failed to read trust anchors: %s", err)
   102  	}
   103  
   104  	validity := tls.Validity{
   105  		ClockSkewAllowance: tls.DefaultClockSkewAllowance,
   106  		Lifetime:           identity.DefaultIssuanceLifetime,
   107  	}
   108  	if pbd := *identityClockSkewAllowance; pbd != "" {
   109  		csa, err := time.ParseDuration(pbd)
   110  		if err != nil {
   111  			log.Warnf("Invalid clock skew allowance: %s", err)
   112  		} else {
   113  			validity.ClockSkewAllowance = csa
   114  		}
   115  	}
   116  	if pbd := *identityIssuanceLifeTime; pbd != "" {
   117  		il, err := time.ParseDuration(pbd)
   118  		if err != nil {
   119  			log.Warnf("Invalid issuance lifetime: %s", err)
   120  		} else {
   121  			validity.Lifetime = il
   122  		}
   123  	}
   124  
   125  	expectedName := fmt.Sprintf("identity.%s.%s", *controllerNS, *trustDomain)
   126  	issuerEvent := make(chan struct{})
   127  	issuerError := make(chan error)
   128  
   129  	//
   130  	// Create and start FS creds watcher
   131  	//
   132  	watcher := tls.NewFsCredsWatcher(*issuerPath, issuerEvent, issuerError)
   133  	go func() {
   134  		if err := watcher.StartWatching(ctx); err != nil {
   135  			//nolint:gocritic
   136  			log.Fatalf("Failed to start creds watcher: %s", err)
   137  		}
   138  	}()
   139  
   140  	//
   141  	// Create k8s API
   142  	//
   143  	config, err := k8s.GetConfig(*kubeConfigPath, "")
   144  	if err != nil {
   145  		log.Fatalf("Error configuring Kubernetes API client: %s", err)
   146  	}
   147  	k8sAPI, err := k8s.NewAPIForConfig(config, "", []string{}, 0, float32(*qps), *burst)
   148  	if err != nil {
   149  		log.Fatalf("Failed to load kubeconfig: %s: %s", *kubeConfigPath, err)
   150  	}
   151  	log.Infof("Using k8s client with QPS=%.2f Burst=%d", config.QPS, config.Burst)
   152  
   153  	v, err := idctl.NewK8sTokenValidator(ctx, k8sAPI, dom)
   154  	if err != nil {
   155  		log.Fatalf("Failed to initialize identity service: %s", err)
   156  	}
   157  
   158  	// Create K8s event recorder
   159  	eventBroadcaster := record.NewBroadcaster()
   160  	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
   161  		Interface: k8sAPI.CoreV1().Events(""),
   162  	})
   163  	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: componentName})
   164  
   165  	if err != nil {
   166  		log.Fatalf("Failed to construct k8s event recorder: %s", err)
   167  	}
   168  
   169  	recordEventFunc := func(parent runtime.Object, eventType, reason, message string) {
   170  		if parent == nil {
   171  			parent = &corev1.ObjectReference{
   172  				APIVersion: "apps/v1",
   173  				Kind:       "Deployment",
   174  				Namespace:  *controllerNS,
   175  				Name:       componentName,
   176  			}
   177  		}
   178  		recorder.Event(parent, eventType, reason, message)
   179  	}
   180  
   181  	//
   182  	// Create, initialize and run service
   183  	//
   184  	svc := identity.NewService(v, trustAnchors, &validity, recordEventFunc, expectedName, issuerPathCrt, issuerPathKey)
   185  	if err = svc.Initialize(); err != nil {
   186  		//nolint:gocritic
   187  		log.Fatalf("Failed to initialize identity service: %s", err)
   188  	}
   189  	go func() {
   190  		svc.Run(issuerEvent, issuerError)
   191  	}()
   192  
   193  	//
   194  	// Bind and serve
   195  	//
   196  	lis, err := net.Listen("tcp", *addr)
   197  	if err != nil {
   198  		//nolint:gocritic
   199  		log.Fatalf("Failed to listen on %s: %s", *addr, err)
   200  	}
   201  
   202  	if *traceCollector != "" {
   203  		if err := trace.InitializeTracing(componentName, *traceCollector); err != nil {
   204  			log.Warnf("failed to initialize tracing: %s", err)
   205  		}
   206  	}
   207  	srv := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
   208  	identity.Register(srv, svc)
   209  	go func() {
   210  		log.Infof("starting gRPC server on %s", *addr)
   211  		if err := srv.Serve(lis); err != nil {
   212  			log.Errorf("failed to start identity gRPC server: %s", err)
   213  		}
   214  	}()
   215  
   216  	ready = true
   217  
   218  	<-stop
   219  	log.Infof("shutting down gRPC server on %s", *addr)
   220  	srv.GracefulStop()
   221  	adminServer.Shutdown(ctx)
   222  }
   223  

View as plain text