1 package webhook
2
3 import (
4 "context"
5 "crypto/tls"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "net/http"
10 "sync/atomic"
11 "time"
12
13 "github.com/linkerd/linkerd2/controller/k8s"
14 pkgk8s "github.com/linkerd/linkerd2/pkg/k8s"
15 pkgTls "github.com/linkerd/linkerd2/pkg/tls"
16 "github.com/linkerd/linkerd2/pkg/util"
17 log "github.com/sirupsen/logrus"
18 admissionv1beta1 "k8s.io/api/admission/v1beta1"
19 v1 "k8s.io/api/core/v1"
20 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21 "k8s.io/client-go/kubernetes/scheme"
22 typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
23 "k8s.io/client-go/tools/record"
24 "sigs.k8s.io/yaml"
25 )
26
27
28
29 type Handler func(
30 context.Context,
31 *k8s.MetadataAPI,
32 *admissionv1beta1.AdmissionRequest,
33 record.EventRecorder,
34 ) (*admissionv1beta1.AdmissionResponse, error)
35
36
37 type Server struct {
38 *http.Server
39 metadataAPI *k8s.MetadataAPI
40 handler Handler
41 certValue *atomic.Value
42 recorder record.EventRecorder
43 }
44
45
46 func NewServer(
47 ctx context.Context,
48 api *pkgk8s.KubernetesAPI,
49 metadataAPI *k8s.MetadataAPI,
50 addr, certPath string,
51 handler Handler,
52 component string,
53 ) (*Server, error) {
54 updateEvent := make(chan struct{})
55 errEvent := make(chan error)
56 watcher := pkgTls.NewFsCredsWatcher(certPath, updateEvent, errEvent).
57 WithFilePaths(pkgk8s.MountPathTLSCrtPEM, pkgk8s.MountPathTLSKeyPEM)
58 go func() {
59 if err := watcher.StartWatching(ctx); err != nil {
60 log.Fatalf("Failed to start creds watcher: %s", err)
61 }
62 }()
63
64 server := &http.Server{
65 Addr: addr,
66 ReadHeaderTimeout: 15 * time.Second,
67 TLSConfig: &tls.Config{
68 MinVersion: tls.VersionTLS12,
69 },
70 }
71
72 eventBroadcaster := record.NewBroadcaster()
73 eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
74
75
76 Interface: api.CoreV1().Events(""),
77 })
78 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component})
79
80 s := getConfiguredServer(server, metadataAPI, handler, recorder)
81 if err := watcher.UpdateCert(s.certValue); err != nil {
82 log.Fatalf("Failed to initialized certificate: %s", err)
83 }
84
85 log := log.WithFields(log.Fields{
86 "component": "proxy-injector",
87 "addr": addr,
88 })
89
90 go watcher.ProcessEvents(log, s.certValue, updateEvent, errEvent)
91
92 return s, nil
93 }
94
95 func getConfiguredServer(
96 httpServer *http.Server,
97 metadataAPI *k8s.MetadataAPI,
98 handler Handler,
99 recorder record.EventRecorder,
100 ) *Server {
101 var emptyCert atomic.Value
102 s := &Server{httpServer, metadataAPI, handler, &emptyCert, recorder}
103 s.Handler = http.HandlerFunc(s.serve)
104 httpServer.TLSConfig.GetCertificate = s.getCertificate
105 return s
106 }
107
108
109 func (s *Server) Start() {
110 log.Infof("listening at %s", s.Server.Addr)
111 if err := s.ListenAndServeTLS("", ""); err != nil {
112 if errors.Is(err, http.ErrServerClosed) {
113 return
114 }
115 log.Fatal(err)
116 }
117 }
118
119
120 func (s *Server) getCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, error) {
121 return s.certValue.Load().(*tls.Certificate), nil
122 }
123
124 func (s *Server) serve(res http.ResponseWriter, req *http.Request) {
125 var (
126 data []byte
127 err error
128 )
129 if req.Body != nil {
130 data, err = util.ReadAllLimit(req.Body, 10*util.MB)
131 if err != nil {
132 http.Error(res, err.Error(), http.StatusInternalServerError)
133 return
134 }
135 }
136
137 if len(data) == 0 {
138 log.Warn("received empty payload")
139 return
140 }
141
142 response, err := s.processReq(req.Context(), data)
143 if err != nil {
144 http.Error(res, err.Error(), http.StatusBadRequest)
145 return
146 }
147
148 responseJSON, err := json.Marshal(response)
149 if err != nil {
150 http.Error(res, err.Error(), http.StatusInternalServerError)
151 return
152 }
153
154 if _, err := res.Write(responseJSON); err != nil {
155 http.Error(res, err.Error(), http.StatusInternalServerError)
156 return
157 }
158 }
159
160 func (s *Server) processReq(ctx context.Context, data []byte) (*admissionv1beta1.AdmissionReview, error) {
161 admissionReview, err := decode(data)
162 if err != nil {
163 return nil, fmt.Errorf("failed to decode admission review request: %w", err)
164 }
165 if admissionReview.Request == nil || admissionReview.Request.UID == "" {
166 return nil, fmt.Errorf("invalid admission review request")
167 }
168 log.Infof("received admission review request %q", admissionReview.Request.UID)
169 log.Debugf("admission request: %+v", admissionReview.Request)
170
171 admissionResponse, err := s.handler(ctx, s.metadataAPI, admissionReview.Request, s.recorder)
172 if err != nil {
173 log.Error("failed to run webhook handler. Reason: ", err)
174 admissionReview.Response = &admissionv1beta1.AdmissionResponse{
175 UID: admissionReview.Request.UID,
176 Allowed: false,
177 Result: &metav1.Status{
178 Message: err.Error(),
179 },
180 }
181 return admissionReview, nil
182 }
183 admissionReview.Response = admissionResponse
184
185 return admissionReview, nil
186 }
187
188
189 func (s *Server) Shutdown(ctx context.Context) error {
190 return s.Server.Shutdown(ctx)
191 }
192
193 func decode(data []byte) (*admissionv1beta1.AdmissionReview, error) {
194 var admissionReview admissionv1beta1.AdmissionReview
195 err := yaml.Unmarshal(data, &admissionReview)
196 return &admissionReview, err
197 }
198
View as plain text