...
1
16
17 package webhook
18
19 import (
20 "context"
21 "crypto/tls"
22 "crypto/x509"
23 "fmt"
24 "net"
25 "net/http"
26 "os"
27 "path/filepath"
28 "strconv"
29 "sync"
30 "time"
31
32 "sigs.k8s.io/controller-runtime/pkg/certwatcher"
33 "sigs.k8s.io/controller-runtime/pkg/healthz"
34 "sigs.k8s.io/controller-runtime/pkg/internal/httpserver"
35 "sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics"
36 )
37
38
39 var DefaultPort = 9443
40
41
42
43
44
45
46
47
48
49 type Server interface {
50
51
52 NeedLeaderElection() bool
53
54
55
56 Register(path string, hook http.Handler)
57
58
59
60 Start(ctx context.Context) error
61
62
63
64 StartedChecker() healthz.Checker
65
66
67 WebhookMux() *http.ServeMux
68 }
69
70
71 type Options struct {
72
73
74 Host string
75
76
77
78 Port int
79
80
81
82 CertDir string
83
84
85
86
87 CertName string
88
89
90
91
92 KeyName string
93
94
95
96 ClientCAName string
97
98
99
100 TLSOpts []func(*tls.Config)
101
102
103 WebhookMux *http.ServeMux
104 }
105
106
107 func NewServer(o Options) Server {
108 return &DefaultServer{
109 Options: o,
110 }
111 }
112
113
114 type DefaultServer struct {
115 Options Options
116
117
118 webhooks map[string]http.Handler
119
120
121 defaultingOnce sync.Once
122
123
124
125 started bool
126
127
128 mu sync.Mutex
129
130 webhookMux *http.ServeMux
131 }
132
133
134 func (o *Options) setDefaults() {
135 if o.WebhookMux == nil {
136 o.WebhookMux = http.NewServeMux()
137 }
138
139 if o.Port <= 0 {
140 o.Port = DefaultPort
141 }
142
143 if len(o.CertDir) == 0 {
144 o.CertDir = filepath.Join(os.TempDir(), "k8s-webhook-server", "serving-certs")
145 }
146
147 if len(o.CertName) == 0 {
148 o.CertName = "tls.crt"
149 }
150
151 if len(o.KeyName) == 0 {
152 o.KeyName = "tls.key"
153 }
154 }
155
156 func (s *DefaultServer) setDefaults() {
157 s.webhooks = map[string]http.Handler{}
158 s.Options.setDefaults()
159
160 s.webhookMux = s.Options.WebhookMux
161 }
162
163
164
165 func (*DefaultServer) NeedLeaderElection() bool {
166 return false
167 }
168
169
170
171 func (s *DefaultServer) Register(path string, hook http.Handler) {
172 s.mu.Lock()
173 defer s.mu.Unlock()
174
175 s.defaultingOnce.Do(s.setDefaults)
176 if _, found := s.webhooks[path]; found {
177 panic(fmt.Errorf("can't register duplicate path: %v", path))
178 }
179 s.webhooks[path] = hook
180 s.webhookMux.Handle(path, metrics.InstrumentedHook(path, hook))
181
182 regLog := log.WithValues("path", path)
183 regLog.Info("Registering webhook")
184 }
185
186
187
188 func (s *DefaultServer) Start(ctx context.Context) error {
189 s.defaultingOnce.Do(s.setDefaults)
190
191 log.Info("Starting webhook server")
192
193 cfg := &tls.Config{
194 NextProtos: []string{"h2"},
195 }
196
197 for _, op := range s.Options.TLSOpts {
198 op(cfg)
199 }
200
201 if cfg.GetCertificate == nil {
202 certPath := filepath.Join(s.Options.CertDir, s.Options.CertName)
203 keyPath := filepath.Join(s.Options.CertDir, s.Options.KeyName)
204
205
206
207 certWatcher, err := certwatcher.New(certPath, keyPath)
208 if err != nil {
209 return err
210 }
211 cfg.GetCertificate = certWatcher.GetCertificate
212
213 go func() {
214 if err := certWatcher.Start(ctx); err != nil {
215 log.Error(err, "certificate watcher error")
216 }
217 }()
218 }
219
220
221 if s.Options.ClientCAName != "" {
222 certPool := x509.NewCertPool()
223 clientCABytes, err := os.ReadFile(filepath.Join(s.Options.CertDir, s.Options.ClientCAName))
224 if err != nil {
225 return fmt.Errorf("failed to read client CA cert: %w", err)
226 }
227
228 ok := certPool.AppendCertsFromPEM(clientCABytes)
229 if !ok {
230 return fmt.Errorf("failed to append client CA cert to CA pool")
231 }
232
233 cfg.ClientCAs = certPool
234 cfg.ClientAuth = tls.RequireAndVerifyClientCert
235 }
236
237 listener, err := tls.Listen("tcp", net.JoinHostPort(s.Options.Host, strconv.Itoa(s.Options.Port)), cfg)
238 if err != nil {
239 return err
240 }
241
242 log.Info("Serving webhook server", "host", s.Options.Host, "port", s.Options.Port)
243
244 srv := httpserver.New(s.webhookMux)
245
246 idleConnsClosed := make(chan struct{})
247 go func() {
248 <-ctx.Done()
249 log.Info("Shutting down webhook server with timeout of 1 minute")
250
251 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
252 defer cancel()
253 if err := srv.Shutdown(ctx); err != nil {
254
255 log.Error(err, "error shutting down the HTTP server")
256 }
257 close(idleConnsClosed)
258 }()
259
260 s.mu.Lock()
261 s.started = true
262 s.mu.Unlock()
263 if err := srv.Serve(listener); err != nil && err != http.ErrServerClosed {
264 return err
265 }
266
267 <-idleConnsClosed
268 return nil
269 }
270
271
272
273 func (s *DefaultServer) StartedChecker() healthz.Checker {
274 config := &tls.Config{
275 InsecureSkipVerify: true,
276 }
277 return func(req *http.Request) error {
278 s.mu.Lock()
279 defer s.mu.Unlock()
280
281 if !s.started {
282 return fmt.Errorf("webhook server has not been started yet")
283 }
284
285 d := &net.Dialer{Timeout: 10 * time.Second}
286 conn, err := tls.DialWithDialer(d, "tcp", net.JoinHostPort(s.Options.Host, strconv.Itoa(s.Options.Port)), config)
287 if err != nil {
288 return fmt.Errorf("webhook server is not reachable: %w", err)
289 }
290
291 if err := conn.Close(); err != nil {
292 return fmt.Errorf("webhook server is not reachable: closing connection: %w", err)
293 }
294
295 return nil
296 }
297 }
298
299
300 func (s *DefaultServer) WebhookMux() *http.ServeMux {
301 return s.webhookMux
302 }
303
View as plain text