1 /* 2 Copyright 2018 The Kubernetes Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 package manager 18 19 import ( 20 "context" 21 "errors" 22 "fmt" 23 "net" 24 "net/http" 25 "time" 26 27 "github.com/go-logr/logr" 28 coordinationv1 "k8s.io/api/coordination/v1" 29 corev1 "k8s.io/api/core/v1" 30 "k8s.io/apimachinery/pkg/api/meta" 31 "k8s.io/apimachinery/pkg/runtime" 32 "k8s.io/client-go/rest" 33 "k8s.io/client-go/tools/leaderelection/resourcelock" 34 "k8s.io/client-go/tools/record" 35 "k8s.io/utils/ptr" 36 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" 37 38 "sigs.k8s.io/controller-runtime/pkg/cache" 39 "sigs.k8s.io/controller-runtime/pkg/client" 40 "sigs.k8s.io/controller-runtime/pkg/cluster" 41 "sigs.k8s.io/controller-runtime/pkg/config" 42 "sigs.k8s.io/controller-runtime/pkg/healthz" 43 intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" 44 "sigs.k8s.io/controller-runtime/pkg/leaderelection" 45 "sigs.k8s.io/controller-runtime/pkg/log" 46 "sigs.k8s.io/controller-runtime/pkg/recorder" 47 "sigs.k8s.io/controller-runtime/pkg/webhook" 48 ) 49 50 // Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables. 51 // A Manager is required to create Controllers. 52 type Manager interface { 53 // Cluster holds a variety of methods to interact with a cluster. 54 cluster.Cluster 55 56 // Add will set requested dependencies on the component, and cause the component to be 57 // started when Start is called. 58 // Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either 59 // non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled). 60 Add(Runnable) error 61 62 // Elected is closed when this manager is elected leader of a group of 63 // managers, either because it won a leader election or because no leader 64 // election was configured. 65 Elected() <-chan struct{} 66 67 // AddHealthzCheck allows you to add Healthz checker 68 AddHealthzCheck(name string, check healthz.Checker) error 69 70 // AddReadyzCheck allows you to add Readyz checker 71 AddReadyzCheck(name string, check healthz.Checker) error 72 73 // Start starts all registered Controllers and blocks until the context is cancelled. 74 // Returns an error if there is an error starting any controller. 75 // 76 // If LeaderElection is used, the binary must be exited immediately after this returns, 77 // otherwise components that need leader election might continue to run after the leader 78 // lock was lost. 79 Start(ctx context.Context) error 80 81 // GetWebhookServer returns a webhook.Server 82 GetWebhookServer() webhook.Server 83 84 // GetLogger returns this manager's logger. 85 GetLogger() logr.Logger 86 87 // GetControllerOptions returns controller global configuration options. 88 GetControllerOptions() config.Controller 89 } 90 91 // Options are the arguments for creating a new Manager. 92 type Options struct { 93 // Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources. 94 // Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better 95 // to pass your own scheme in. See the documentation in pkg/scheme for more information. 96 // 97 // If set, the Scheme will be used to create the default Client and Cache. 98 Scheme *runtime.Scheme 99 100 // MapperProvider provides the rest mapper used to map go types to Kubernetes APIs. 101 // 102 // If set, the RESTMapper returned by this function is used to create the RESTMapper 103 // used by the Client and Cache. 104 MapperProvider func(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) 105 106 // Cache is the cache.Options that will be used to create the default Cache. 107 // By default, the cache will watch and list requested objects in all namespaces. 108 Cache cache.Options 109 110 // NewCache is the function that will create the cache to be used 111 // by the manager. If not set this will use the default new cache function. 112 // 113 // When using a custom NewCache, the Cache options will be passed to the 114 // NewCache function. 115 // 116 // NOTE: LOW LEVEL PRIMITIVE! 117 // Only use a custom NewCache if you know what you are doing. 118 NewCache cache.NewCacheFunc 119 120 // Client is the client.Options that will be used to create the default Client. 121 // By default, the client will use the cache for reads and direct calls for writes. 122 Client client.Options 123 124 // NewClient is the func that creates the client to be used by the manager. 125 // If not set this will create a Client backed by a Cache for read operations 126 // and a direct Client for write operations. 127 // 128 // When using a custom NewClient, the Client options will be passed to the 129 // NewClient function. 130 // 131 // NOTE: LOW LEVEL PRIMITIVE! 132 // Only use a custom NewClient if you know what you are doing. 133 NewClient client.NewClientFunc 134 135 // Logger is the logger that should be used by this manager. 136 // If none is set, it defaults to log.Log global logger. 137 Logger logr.Logger 138 139 // LeaderElection determines whether or not to use leader election when 140 // starting the manager. 141 LeaderElection bool 142 143 // LeaderElectionResourceLock determines which resource lock to use for leader election, 144 // defaults to "leases". Change this value only if you know what you are doing. 145 // 146 // If you are using `configmaps`/`endpoints` resource lock and want to migrate to "leases", 147 // you might do so by migrating to the respective multilock first ("configmapsleases" or "endpointsleases"), 148 // which will acquire a leader lock on both resources. 149 // After all your users have migrated to the multilock, you can go ahead and migrate to "leases". 150 // Please also keep in mind, that users might skip versions of your controller. 151 // 152 // Note: before controller-runtime version v0.7, it was set to "configmaps". 153 // And from v0.7 to v0.11, the default was "configmapsleases", which was 154 // used to migrate from configmaps to leases. 155 // Since the default was "configmapsleases" for over a year, spanning five minor releases, 156 // any actively maintained operators are very likely to have a released version that uses 157 // "configmapsleases". Therefore defaulting to "leases" should be safe since v0.12. 158 // 159 // So, what do you have to do when you are updating your controller-runtime dependency 160 // from a lower version to v0.12 or newer? 161 // - If your operator matches at least one of these conditions: 162 // - the LeaderElectionResourceLock in your operator has already been explicitly set to "leases" 163 // - the old controller-runtime version is between v0.7.0 and v0.11.x and the 164 // LeaderElectionResourceLock wasn't set or was set to "leases"/"configmapsleases"/"endpointsleases" 165 // feel free to update controller-runtime to v0.12 or newer. 166 // - Otherwise, you may have to take these steps: 167 // 1. update controller-runtime to v0.12 or newer in your go.mod 168 // 2. set LeaderElectionResourceLock to "configmapsleases" (or "endpointsleases") 169 // 3. package your operator and upgrade it in all your clusters 170 // 4. only if you have finished 3, you can remove the LeaderElectionResourceLock to use the default "leases" 171 // Otherwise, your operator might end up with multiple running instances that 172 // each acquired leadership through different resource locks during upgrades and thus 173 // act on the same resources concurrently. 174 LeaderElectionResourceLock string 175 176 // LeaderElectionNamespace determines the namespace in which the leader 177 // election resource will be created. 178 LeaderElectionNamespace string 179 180 // LeaderElectionID determines the name of the resource that leader election 181 // will use for holding the leader lock. 182 LeaderElectionID string 183 184 // LeaderElectionConfig can be specified to override the default configuration 185 // that is used to build the leader election client. 186 LeaderElectionConfig *rest.Config 187 188 // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily 189 // when the Manager ends. This requires the binary to immediately end when the 190 // Manager is stopped, otherwise this setting is unsafe. Setting this significantly 191 // speeds up voluntary leader transitions as the new leader doesn't have to wait 192 // LeaseDuration time first. 193 LeaderElectionReleaseOnCancel bool 194 195 // LeaderElectionResourceLockInterface allows to provide a custom resourcelock.Interface that was created outside 196 // of the controller-runtime. If this value is set the options LeaderElectionID, LeaderElectionNamespace, 197 // LeaderElectionResourceLock, LeaseDuration, RenewDeadline and RetryPeriod will be ignored. This can be useful if you 198 // want to use a locking mechanism that is currently not supported, like a MultiLock across two Kubernetes clusters. 199 LeaderElectionResourceLockInterface resourcelock.Interface 200 201 // LeaseDuration is the duration that non-leader candidates will 202 // wait to force acquire leadership. This is measured against time of 203 // last observed ack. Default is 15 seconds. 204 LeaseDuration *time.Duration 205 206 // RenewDeadline is the duration that the acting controlplane will retry 207 // refreshing leadership before giving up. Default is 10 seconds. 208 RenewDeadline *time.Duration 209 210 // RetryPeriod is the duration the LeaderElector clients should wait 211 // between tries of actions. Default is 2 seconds. 212 RetryPeriod *time.Duration 213 214 // Metrics are the metricsserver.Options that will be used to create the metricsserver.Server. 215 Metrics metricsserver.Options 216 217 // HealthProbeBindAddress is the TCP address that the controller should bind to 218 // for serving health probes 219 // It can be set to "0" or "" to disable serving the health probe. 220 HealthProbeBindAddress string 221 222 // Readiness probe endpoint name, defaults to "readyz" 223 ReadinessEndpointName string 224 225 // Liveness probe endpoint name, defaults to "healthz" 226 LivenessEndpointName string 227 228 // PprofBindAddress is the TCP address that the controller should bind to 229 // for serving pprof. 230 // It can be set to "" or "0" to disable the pprof serving. 231 // Since pprof may contain sensitive information, make sure to protect it 232 // before exposing it to public. 233 PprofBindAddress string 234 235 // WebhookServer is an externally configured webhook.Server. By default, 236 // a Manager will create a server via webhook.NewServer with default settings. 237 // If this is set, the Manager will use this server instead. 238 WebhookServer webhook.Server 239 240 // BaseContext is the function that provides Context values to Runnables 241 // managed by the Manager. If a BaseContext function isn't provided, Runnables 242 // will receive a new Background Context instead. 243 BaseContext BaseContextFunc 244 245 // EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API 246 // Use this to customize the event correlator and spam filter 247 // 248 // Deprecated: using this may cause goroutine leaks if the lifetime of your manager or controllers 249 // is shorter than the lifetime of your process. 250 EventBroadcaster record.EventBroadcaster 251 252 // GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop. 253 // To disable graceful shutdown, set to time.Duration(0) 254 // To use graceful shutdown without timeout, set to a negative duration, e.G. time.Duration(-1) 255 // The graceful shutdown is skipped for safety reasons in case the leader election lease is lost. 256 GracefulShutdownTimeout *time.Duration 257 258 // Controller contains global configuration options for controllers 259 // registered within this manager. 260 // +optional 261 Controller config.Controller 262 263 // makeBroadcaster allows deferring the creation of the broadcaster to 264 // avoid leaking goroutines if we never call Start on this manager. It also 265 // returns whether or not this is a "owned" broadcaster, and as such should be 266 // stopped with the manager. 267 makeBroadcaster intrec.EventBroadcasterProducer 268 269 // Dependency injection for testing 270 newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error) 271 newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) 272 newMetricsServer func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) 273 newHealthProbeListener func(addr string) (net.Listener, error) 274 newPprofListener func(addr string) (net.Listener, error) 275 } 276 277 // BaseContextFunc is a function used to provide a base Context to Runnables 278 // managed by a Manager. 279 type BaseContextFunc func() context.Context 280 281 // Runnable allows a component to be started. 282 // It's very important that Start blocks until 283 // it's done running. 284 type Runnable interface { 285 // Start starts running the component. The component will stop running 286 // when the context is closed. Start blocks until the context is closed or 287 // an error occurs. 288 Start(context.Context) error 289 } 290 291 // RunnableFunc implements Runnable using a function. 292 // It's very important that the given function block 293 // until it's done running. 294 type RunnableFunc func(context.Context) error 295 296 // Start implements Runnable. 297 func (r RunnableFunc) Start(ctx context.Context) error { 298 return r(ctx) 299 } 300 301 // LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode. 302 type LeaderElectionRunnable interface { 303 // NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode. 304 // e.g. controllers need to be run in leader election mode, while webhook server doesn't. 305 NeedLeaderElection() bool 306 } 307 308 // New returns a new Manager for creating Controllers. 309 // Note that if ContentType in the given config is not set, "application/vnd.kubernetes.protobuf" 310 // will be used for all built-in resources of Kubernetes, and "application/json" is for other types 311 // including all CRD resources. 312 func New(config *rest.Config, options Options) (Manager, error) { 313 if config == nil { 314 return nil, errors.New("must specify Config") 315 } 316 // Set default values for options fields 317 options = setOptionsDefaults(options) 318 319 cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { 320 clusterOptions.Scheme = options.Scheme 321 clusterOptions.MapperProvider = options.MapperProvider 322 clusterOptions.Logger = options.Logger 323 clusterOptions.NewCache = options.NewCache 324 clusterOptions.NewClient = options.NewClient 325 clusterOptions.Cache = options.Cache 326 clusterOptions.Client = options.Client 327 clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck 328 }) 329 if err != nil { 330 return nil, err 331 } 332 333 config = rest.CopyConfig(config) 334 if config.UserAgent == "" { 335 config.UserAgent = rest.DefaultKubernetesUserAgent() 336 } 337 338 // Create the recorder provider to inject event recorders for the components. 339 // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific 340 // to the particular controller that it's being injected into, rather than a generic one like is here. 341 recorderProvider, err := options.newRecorderProvider(config, cluster.GetHTTPClient(), cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster) 342 if err != nil { 343 return nil, err 344 } 345 346 // Create the resource lock to enable leader election) 347 var leaderConfig *rest.Config 348 var leaderRecorderProvider *intrec.Provider 349 350 if options.LeaderElectionConfig == nil { 351 leaderConfig = rest.CopyConfig(config) 352 leaderRecorderProvider = recorderProvider 353 } else { 354 leaderConfig = rest.CopyConfig(options.LeaderElectionConfig) 355 scheme := cluster.GetScheme() 356 err := corev1.AddToScheme(scheme) 357 if err != nil { 358 return nil, err 359 } 360 err = coordinationv1.AddToScheme(scheme) 361 if err != nil { 362 return nil, err 363 } 364 httpClient, err := rest.HTTPClientFor(options.LeaderElectionConfig) 365 if err != nil { 366 return nil, err 367 } 368 leaderRecorderProvider, err = options.newRecorderProvider(leaderConfig, httpClient, scheme, options.Logger.WithName("events"), options.makeBroadcaster) 369 if err != nil { 370 return nil, err 371 } 372 } 373 374 var resourceLock resourcelock.Interface 375 if options.LeaderElectionResourceLockInterface != nil && options.LeaderElection { 376 resourceLock = options.LeaderElectionResourceLockInterface 377 } else { 378 resourceLock, err = options.newResourceLock(leaderConfig, leaderRecorderProvider, leaderelection.Options{ 379 LeaderElection: options.LeaderElection, 380 LeaderElectionResourceLock: options.LeaderElectionResourceLock, 381 LeaderElectionID: options.LeaderElectionID, 382 LeaderElectionNamespace: options.LeaderElectionNamespace, 383 }) 384 if err != nil { 385 return nil, err 386 } 387 } 388 389 // Create the metrics server. 390 metricsServer, err := options.newMetricsServer(options.Metrics, config, cluster.GetHTTPClient()) 391 if err != nil { 392 return nil, err 393 } 394 395 // Create health probes listener. This will throw an error if the bind 396 // address is invalid or already in use. 397 healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress) 398 if err != nil { 399 return nil, err 400 } 401 402 // Create pprof listener. This will throw an error if the bind 403 // address is invalid or already in use. 404 pprofListener, err := options.newPprofListener(options.PprofBindAddress) 405 if err != nil { 406 return nil, fmt.Errorf("failed to new pprof listener: %w", err) 407 } 408 409 errChan := make(chan error, 1) 410 runnables := newRunnables(options.BaseContext, errChan) 411 return &controllerManager{ 412 stopProcedureEngaged: ptr.To(int64(0)), 413 cluster: cluster, 414 runnables: runnables, 415 errChan: errChan, 416 recorderProvider: recorderProvider, 417 resourceLock: resourceLock, 418 metricsServer: metricsServer, 419 controllerConfig: options.Controller, 420 logger: options.Logger, 421 elected: make(chan struct{}), 422 webhookServer: options.WebhookServer, 423 leaderElectionID: options.LeaderElectionID, 424 leaseDuration: *options.LeaseDuration, 425 renewDeadline: *options.RenewDeadline, 426 retryPeriod: *options.RetryPeriod, 427 healthProbeListener: healthProbeListener, 428 readinessEndpointName: options.ReadinessEndpointName, 429 livenessEndpointName: options.LivenessEndpointName, 430 pprofListener: pprofListener, 431 gracefulShutdownTimeout: *options.GracefulShutdownTimeout, 432 internalProceduresStop: make(chan struct{}), 433 leaderElectionStopped: make(chan struct{}), 434 leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel, 435 }, nil 436 } 437 438 // defaultHealthProbeListener creates the default health probes listener bound to the given address. 439 func defaultHealthProbeListener(addr string) (net.Listener, error) { 440 if addr == "" || addr == "0" { 441 return nil, nil 442 } 443 444 ln, err := net.Listen("tcp", addr) 445 if err != nil { 446 return nil, fmt.Errorf("error listening on %s: %w", addr, err) 447 } 448 return ln, nil 449 } 450 451 // defaultPprofListener creates the default pprof listener bound to the given address. 452 func defaultPprofListener(addr string) (net.Listener, error) { 453 if addr == "" || addr == "0" { 454 return nil, nil 455 } 456 457 ln, err := net.Listen("tcp", addr) 458 if err != nil { 459 return nil, fmt.Errorf("error listening on %s: %w", addr, err) 460 } 461 return ln, nil 462 } 463 464 // defaultBaseContext is used as the BaseContext value in Options if one 465 // has not already been set. 466 func defaultBaseContext() context.Context { 467 return context.Background() 468 } 469 470 // setOptionsDefaults set default values for Options fields. 471 func setOptionsDefaults(options Options) Options { 472 // Allow newResourceLock to be mocked 473 if options.newResourceLock == nil { 474 options.newResourceLock = leaderelection.NewResourceLock 475 } 476 477 // Allow newRecorderProvider to be mocked 478 if options.newRecorderProvider == nil { 479 options.newRecorderProvider = intrec.NewProvider 480 } 481 482 // This is duplicated with pkg/cluster, we need it here 483 // for the leader election and there to provide the user with 484 // an EventBroadcaster 485 if options.EventBroadcaster == nil { 486 // defer initialization to avoid leaking by default 487 options.makeBroadcaster = func() (record.EventBroadcaster, bool) { 488 return record.NewBroadcaster(), true 489 } 490 } else { 491 options.makeBroadcaster = func() (record.EventBroadcaster, bool) { 492 return options.EventBroadcaster, false 493 } 494 } 495 496 if options.newMetricsServer == nil { 497 options.newMetricsServer = metricsserver.NewServer 498 } 499 leaseDuration, renewDeadline, retryPeriod := defaultLeaseDuration, defaultRenewDeadline, defaultRetryPeriod 500 if options.LeaseDuration == nil { 501 options.LeaseDuration = &leaseDuration 502 } 503 504 if options.RenewDeadline == nil { 505 options.RenewDeadline = &renewDeadline 506 } 507 508 if options.RetryPeriod == nil { 509 options.RetryPeriod = &retryPeriod 510 } 511 512 if options.ReadinessEndpointName == "" { 513 options.ReadinessEndpointName = defaultReadinessEndpoint 514 } 515 516 if options.LivenessEndpointName == "" { 517 options.LivenessEndpointName = defaultLivenessEndpoint 518 } 519 520 if options.newHealthProbeListener == nil { 521 options.newHealthProbeListener = defaultHealthProbeListener 522 } 523 524 if options.newPprofListener == nil { 525 options.newPprofListener = defaultPprofListener 526 } 527 528 if options.GracefulShutdownTimeout == nil { 529 gracefulShutdownTimeout := defaultGracefulShutdownPeriod 530 options.GracefulShutdownTimeout = &gracefulShutdownTimeout 531 } 532 533 if options.Logger.GetSink() == nil { 534 options.Logger = log.Log 535 } 536 537 if options.BaseContext == nil { 538 options.BaseContext = defaultBaseContext 539 } 540 541 if options.WebhookServer == nil { 542 options.WebhookServer = webhook.NewServer(webhook.Options{}) 543 } 544 545 return options 546 } 547