...

Source file src/github.com/linkerd/linkerd2/controller/webhook/server.go

Documentation: github.com/linkerd/linkerd2/controller/webhook

     1  package webhook
     2  
     3  import (
     4  	"context"
     5  	"crypto/tls"
     6  	"encoding/json"
     7  	"errors"
     8  	"fmt"
     9  	"net/http"
    10  	"sync/atomic"
    11  	"time"
    12  
    13  	"github.com/linkerd/linkerd2/controller/k8s"
    14  	pkgk8s "github.com/linkerd/linkerd2/pkg/k8s"
    15  	pkgTls "github.com/linkerd/linkerd2/pkg/tls"
    16  	"github.com/linkerd/linkerd2/pkg/util"
    17  	log "github.com/sirupsen/logrus"
    18  	admissionv1beta1 "k8s.io/api/admission/v1beta1"
    19  	v1 "k8s.io/api/core/v1"
    20  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    21  	"k8s.io/client-go/kubernetes/scheme"
    22  	typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
    23  	"k8s.io/client-go/tools/record"
    24  	"sigs.k8s.io/yaml"
    25  )
    26  
    27  // Handler is the signature for the functions that ultimately deal with
    28  // the admission request
    29  type Handler func(
    30  	context.Context,
    31  	*k8s.MetadataAPI,
    32  	*admissionv1beta1.AdmissionRequest,
    33  	record.EventRecorder,
    34  ) (*admissionv1beta1.AdmissionResponse, error)
    35  
    36  // Server describes the https server implementing the webhook
    37  type Server struct {
    38  	*http.Server
    39  	metadataAPI *k8s.MetadataAPI
    40  	handler     Handler
    41  	certValue   *atomic.Value
    42  	recorder    record.EventRecorder
    43  }
    44  
    45  // NewServer returns a new instance of Server
    46  func NewServer(
    47  	ctx context.Context,
    48  	api *pkgk8s.KubernetesAPI,
    49  	metadataAPI *k8s.MetadataAPI,
    50  	addr, certPath string,
    51  	handler Handler,
    52  	component string,
    53  ) (*Server, error) {
    54  	updateEvent := make(chan struct{})
    55  	errEvent := make(chan error)
    56  	watcher := pkgTls.NewFsCredsWatcher(certPath, updateEvent, errEvent).
    57  		WithFilePaths(pkgk8s.MountPathTLSCrtPEM, pkgk8s.MountPathTLSKeyPEM)
    58  	go func() {
    59  		if err := watcher.StartWatching(ctx); err != nil {
    60  			log.Fatalf("Failed to start creds watcher: %s", err)
    61  		}
    62  	}()
    63  
    64  	server := &http.Server{
    65  		Addr:              addr,
    66  		ReadHeaderTimeout: 15 * time.Second,
    67  		TLSConfig: &tls.Config{
    68  			MinVersion: tls.VersionTLS12,
    69  		},
    70  	}
    71  
    72  	eventBroadcaster := record.NewBroadcaster()
    73  	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
    74  		// In order to send events to all namespaces, we need to use an empty string here
    75  		// re: client-go's event_expansion.go CreateWithEventNamespace()
    76  		Interface: api.CoreV1().Events(""),
    77  	})
    78  	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component})
    79  
    80  	s := getConfiguredServer(server, metadataAPI, handler, recorder)
    81  	if err := watcher.UpdateCert(s.certValue); err != nil {
    82  		log.Fatalf("Failed to initialized certificate: %s", err)
    83  	}
    84  
    85  	log := log.WithFields(log.Fields{
    86  		"component": "proxy-injector",
    87  		"addr":      addr,
    88  	})
    89  
    90  	go watcher.ProcessEvents(log, s.certValue, updateEvent, errEvent)
    91  
    92  	return s, nil
    93  }
    94  
    95  func getConfiguredServer(
    96  	httpServer *http.Server,
    97  	metadataAPI *k8s.MetadataAPI,
    98  	handler Handler,
    99  	recorder record.EventRecorder,
   100  ) *Server {
   101  	var emptyCert atomic.Value
   102  	s := &Server{httpServer, metadataAPI, handler, &emptyCert, recorder}
   103  	s.Handler = http.HandlerFunc(s.serve)
   104  	httpServer.TLSConfig.GetCertificate = s.getCertificate
   105  	return s
   106  }
   107  
   108  // Start starts the https server
   109  func (s *Server) Start() {
   110  	log.Infof("listening at %s", s.Server.Addr)
   111  	if err := s.ListenAndServeTLS("", ""); err != nil {
   112  		if errors.Is(err, http.ErrServerClosed) {
   113  			return
   114  		}
   115  		log.Fatal(err)
   116  	}
   117  }
   118  
   119  // getCertificate provides the TLS server with the current cert
   120  func (s *Server) getCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, error) {
   121  	return s.certValue.Load().(*tls.Certificate), nil
   122  }
   123  
   124  func (s *Server) serve(res http.ResponseWriter, req *http.Request) {
   125  	var (
   126  		data []byte
   127  		err  error
   128  	)
   129  	if req.Body != nil {
   130  		data, err = util.ReadAllLimit(req.Body, 10*util.MB)
   131  		if err != nil {
   132  			http.Error(res, err.Error(), http.StatusInternalServerError)
   133  			return
   134  		}
   135  	}
   136  
   137  	if len(data) == 0 {
   138  		log.Warn("received empty payload")
   139  		return
   140  	}
   141  
   142  	response, err := s.processReq(req.Context(), data)
   143  	if err != nil {
   144  		http.Error(res, err.Error(), http.StatusBadRequest)
   145  		return
   146  	}
   147  
   148  	responseJSON, err := json.Marshal(response)
   149  	if err != nil {
   150  		http.Error(res, err.Error(), http.StatusInternalServerError)
   151  		return
   152  	}
   153  
   154  	if _, err := res.Write(responseJSON); err != nil {
   155  		http.Error(res, err.Error(), http.StatusInternalServerError)
   156  		return
   157  	}
   158  }
   159  
   160  func (s *Server) processReq(ctx context.Context, data []byte) (*admissionv1beta1.AdmissionReview, error) {
   161  	admissionReview, err := decode(data)
   162  	if err != nil {
   163  		return nil, fmt.Errorf("failed to decode admission review request: %w", err)
   164  	}
   165  	if admissionReview.Request == nil || admissionReview.Request.UID == "" {
   166  		return nil, fmt.Errorf("invalid admission review request")
   167  	}
   168  	log.Infof("received admission review request %q", admissionReview.Request.UID)
   169  	log.Debugf("admission request: %+v", admissionReview.Request)
   170  
   171  	admissionResponse, err := s.handler(ctx, s.metadataAPI, admissionReview.Request, s.recorder)
   172  	if err != nil {
   173  		log.Error("failed to run webhook handler. Reason: ", err)
   174  		admissionReview.Response = &admissionv1beta1.AdmissionResponse{
   175  			UID:     admissionReview.Request.UID,
   176  			Allowed: false,
   177  			Result: &metav1.Status{
   178  				Message: err.Error(),
   179  			},
   180  		}
   181  		return admissionReview, nil
   182  	}
   183  	admissionReview.Response = admissionResponse
   184  
   185  	return admissionReview, nil
   186  }
   187  
   188  // Shutdown initiates a graceful shutdown of the underlying HTTP server.
   189  func (s *Server) Shutdown(ctx context.Context) error {
   190  	return s.Server.Shutdown(ctx)
   191  }
   192  
   193  func decode(data []byte) (*admissionv1beta1.AdmissionReview, error) {
   194  	var admissionReview admissionv1beta1.AdmissionReview
   195  	err := yaml.Unmarshal(data, &admissionReview)
   196  	return &admissionReview, err
   197  }
   198  

View as plain text