...

Source file src/sigs.k8s.io/controller-runtime/pkg/manager/internal.go

Documentation: sigs.k8s.io/controller-runtime/pkg/manager

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package manager
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"net"
    24  	"net/http"
    25  	"net/http/pprof"
    26  	"sync"
    27  	"sync/atomic"
    28  	"time"
    29  
    30  	"github.com/go-logr/logr"
    31  	"k8s.io/apimachinery/pkg/api/meta"
    32  	"k8s.io/apimachinery/pkg/runtime"
    33  	kerrors "k8s.io/apimachinery/pkg/util/errors"
    34  	"k8s.io/client-go/rest"
    35  	"k8s.io/client-go/tools/leaderelection"
    36  	"k8s.io/client-go/tools/leaderelection/resourcelock"
    37  	"k8s.io/client-go/tools/record"
    38  
    39  	"sigs.k8s.io/controller-runtime/pkg/cache"
    40  	"sigs.k8s.io/controller-runtime/pkg/client"
    41  	"sigs.k8s.io/controller-runtime/pkg/cluster"
    42  	"sigs.k8s.io/controller-runtime/pkg/config"
    43  	"sigs.k8s.io/controller-runtime/pkg/healthz"
    44  	"sigs.k8s.io/controller-runtime/pkg/internal/httpserver"
    45  	intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
    46  	metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
    47  	"sigs.k8s.io/controller-runtime/pkg/webhook"
    48  )
    49  
    50  const (
    51  	// Values taken from: https://github.com/kubernetes/component-base/blob/master/config/v1alpha1/defaults.go
    52  	defaultLeaseDuration          = 15 * time.Second
    53  	defaultRenewDeadline          = 10 * time.Second
    54  	defaultRetryPeriod            = 2 * time.Second
    55  	defaultGracefulShutdownPeriod = 30 * time.Second
    56  
    57  	defaultReadinessEndpoint = "/readyz"
    58  	defaultLivenessEndpoint  = "/healthz"
    59  )
    60  
    61  var _ Runnable = &controllerManager{}
    62  
    63  type controllerManager struct {
    64  	sync.Mutex
    65  	started bool
    66  
    67  	stopProcedureEngaged *int64
    68  	errChan              chan error
    69  	runnables            *runnables
    70  
    71  	// cluster holds a variety of methods to interact with a cluster. Required.
    72  	cluster cluster.Cluster
    73  
    74  	// recorderProvider is used to generate event recorders that will be injected into Controllers
    75  	// (and EventHandlers, Sources and Predicates).
    76  	recorderProvider *intrec.Provider
    77  
    78  	// resourceLock forms the basis for leader election
    79  	resourceLock resourcelock.Interface
    80  
    81  	// leaderElectionReleaseOnCancel defines if the manager should step back from the leader lease
    82  	// on shutdown
    83  	leaderElectionReleaseOnCancel bool
    84  
    85  	// metricsServer is used to serve prometheus metrics
    86  	metricsServer metricsserver.Server
    87  
    88  	// healthProbeListener is used to serve liveness probe
    89  	healthProbeListener net.Listener
    90  
    91  	// Readiness probe endpoint name
    92  	readinessEndpointName string
    93  
    94  	// Liveness probe endpoint name
    95  	livenessEndpointName string
    96  
    97  	// Readyz probe handler
    98  	readyzHandler *healthz.Handler
    99  
   100  	// Healthz probe handler
   101  	healthzHandler *healthz.Handler
   102  
   103  	// pprofListener is used to serve pprof
   104  	pprofListener net.Listener
   105  
   106  	// controllerConfig are the global controller options.
   107  	controllerConfig config.Controller
   108  
   109  	// Logger is the logger that should be used by this manager.
   110  	// If none is set, it defaults to log.Log global logger.
   111  	logger logr.Logger
   112  
   113  	// leaderElectionStopped is an internal channel used to signal the stopping procedure that the
   114  	// LeaderElection.Run(...) function has returned and the shutdown can proceed.
   115  	leaderElectionStopped chan struct{}
   116  
   117  	// leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper,
   118  	// because for safety reasons we need to os.Exit() when we lose the leader election, meaning that
   119  	// it must be deferred until after gracefulShutdown is done.
   120  	leaderElectionCancel context.CancelFunc
   121  
   122  	// elected is closed when this manager becomes the leader of a group of
   123  	// managers, either because it won a leader election or because no leader
   124  	// election was configured.
   125  	elected chan struct{}
   126  
   127  	webhookServer webhook.Server
   128  	// webhookServerOnce will be called in GetWebhookServer() to optionally initialize
   129  	// webhookServer if unset, and Add() it to controllerManager.
   130  	webhookServerOnce sync.Once
   131  
   132  	// leaderElectionID is the name of the resource that leader election
   133  	// will use for holding the leader lock.
   134  	leaderElectionID string
   135  	// leaseDuration is the duration that non-leader candidates will
   136  	// wait to force acquire leadership.
   137  	leaseDuration time.Duration
   138  	// renewDeadline is the duration that the acting controlplane will retry
   139  	// refreshing leadership before giving up.
   140  	renewDeadline time.Duration
   141  	// retryPeriod is the duration the LeaderElector clients should wait
   142  	// between tries of actions.
   143  	retryPeriod time.Duration
   144  
   145  	// gracefulShutdownTimeout is the duration given to runnable to stop
   146  	// before the manager actually returns on stop.
   147  	gracefulShutdownTimeout time.Duration
   148  
   149  	// onStoppedLeading is callled when the leader election lease is lost.
   150  	// It can be overridden for tests.
   151  	onStoppedLeading func()
   152  
   153  	// shutdownCtx is the context that can be used during shutdown. It will be cancelled
   154  	// after the gracefulShutdownTimeout ended. It must not be accessed before internalStop
   155  	// is closed because it will be nil.
   156  	shutdownCtx context.Context
   157  
   158  	internalCtx    context.Context
   159  	internalCancel context.CancelFunc
   160  
   161  	// internalProceduresStop channel is used internally to the manager when coordinating
   162  	// the proper shutdown of servers. This channel is also used for dependency injection.
   163  	internalProceduresStop chan struct{}
   164  }
   165  
   166  type hasCache interface {
   167  	Runnable
   168  	GetCache() cache.Cache
   169  }
   170  
   171  // Add sets dependencies on i, and adds it to the list of Runnables to start.
   172  func (cm *controllerManager) Add(r Runnable) error {
   173  	cm.Lock()
   174  	defer cm.Unlock()
   175  	return cm.add(r)
   176  }
   177  
   178  func (cm *controllerManager) add(r Runnable) error {
   179  	return cm.runnables.Add(r)
   180  }
   181  
   182  // AddHealthzCheck allows you to add Healthz checker.
   183  func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error {
   184  	cm.Lock()
   185  	defer cm.Unlock()
   186  
   187  	if cm.started {
   188  		return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
   189  	}
   190  
   191  	if cm.healthzHandler == nil {
   192  		cm.healthzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}}
   193  	}
   194  
   195  	cm.healthzHandler.Checks[name] = check
   196  	return nil
   197  }
   198  
   199  // AddReadyzCheck allows you to add Readyz checker.
   200  func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error {
   201  	cm.Lock()
   202  	defer cm.Unlock()
   203  
   204  	if cm.started {
   205  		return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
   206  	}
   207  
   208  	if cm.readyzHandler == nil {
   209  		cm.readyzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}}
   210  	}
   211  
   212  	cm.readyzHandler.Checks[name] = check
   213  	return nil
   214  }
   215  
   216  func (cm *controllerManager) GetHTTPClient() *http.Client {
   217  	return cm.cluster.GetHTTPClient()
   218  }
   219  
   220  func (cm *controllerManager) GetConfig() *rest.Config {
   221  	return cm.cluster.GetConfig()
   222  }
   223  
   224  func (cm *controllerManager) GetClient() client.Client {
   225  	return cm.cluster.GetClient()
   226  }
   227  
   228  func (cm *controllerManager) GetScheme() *runtime.Scheme {
   229  	return cm.cluster.GetScheme()
   230  }
   231  
   232  func (cm *controllerManager) GetFieldIndexer() client.FieldIndexer {
   233  	return cm.cluster.GetFieldIndexer()
   234  }
   235  
   236  func (cm *controllerManager) GetCache() cache.Cache {
   237  	return cm.cluster.GetCache()
   238  }
   239  
   240  func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder {
   241  	return cm.cluster.GetEventRecorderFor(name)
   242  }
   243  
   244  func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
   245  	return cm.cluster.GetRESTMapper()
   246  }
   247  
   248  func (cm *controllerManager) GetAPIReader() client.Reader {
   249  	return cm.cluster.GetAPIReader()
   250  }
   251  
   252  func (cm *controllerManager) GetWebhookServer() webhook.Server {
   253  	cm.webhookServerOnce.Do(func() {
   254  		if cm.webhookServer == nil {
   255  			panic("webhook should not be nil")
   256  		}
   257  		if err := cm.Add(cm.webhookServer); err != nil {
   258  			panic(fmt.Sprintf("unable to add webhook server to the controller manager: %s", err))
   259  		}
   260  	})
   261  	return cm.webhookServer
   262  }
   263  
   264  func (cm *controllerManager) GetLogger() logr.Logger {
   265  	return cm.logger
   266  }
   267  
   268  func (cm *controllerManager) GetControllerOptions() config.Controller {
   269  	return cm.controllerConfig
   270  }
   271  
   272  func (cm *controllerManager) addHealthProbeServer() error {
   273  	mux := http.NewServeMux()
   274  	srv := httpserver.New(mux)
   275  
   276  	if cm.readyzHandler != nil {
   277  		mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
   278  		// Append '/' suffix to handle subpaths
   279  		mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
   280  	}
   281  	if cm.healthzHandler != nil {
   282  		mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
   283  		// Append '/' suffix to handle subpaths
   284  		mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
   285  	}
   286  
   287  	return cm.add(&Server{
   288  		Name:     "health probe",
   289  		Server:   srv,
   290  		Listener: cm.healthProbeListener,
   291  	})
   292  }
   293  
   294  func (cm *controllerManager) addPprofServer() error {
   295  	mux := http.NewServeMux()
   296  	srv := httpserver.New(mux)
   297  
   298  	mux.HandleFunc("/debug/pprof/", pprof.Index)
   299  	mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
   300  	mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
   301  	mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
   302  	mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
   303  
   304  	return cm.add(&Server{
   305  		Name:     "pprof",
   306  		Server:   srv,
   307  		Listener: cm.pprofListener,
   308  	})
   309  }
   310  
   311  // Start starts the manager and waits indefinitely.
   312  // There is only two ways to have start return:
   313  // An error has occurred during in one of the internal operations,
   314  // such as leader election, cache start, webhooks, and so on.
   315  // Or, the context is cancelled.
   316  func (cm *controllerManager) Start(ctx context.Context) (err error) {
   317  	cm.Lock()
   318  	if cm.started {
   319  		cm.Unlock()
   320  		return errors.New("manager already started")
   321  	}
   322  	cm.started = true
   323  
   324  	var ready bool
   325  	defer func() {
   326  		// Only unlock the manager if we haven't reached
   327  		// the internal readiness condition.
   328  		if !ready {
   329  			cm.Unlock()
   330  		}
   331  	}()
   332  
   333  	// Initialize the internal context.
   334  	cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
   335  
   336  	// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
   337  	stopComplete := make(chan struct{})
   338  	defer close(stopComplete)
   339  	// This must be deferred after closing stopComplete, otherwise we deadlock.
   340  	defer func() {
   341  		// https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg
   342  		stopErr := cm.engageStopProcedure(stopComplete)
   343  		if stopErr != nil {
   344  			if err != nil {
   345  				// Utilerrors.Aggregate allows to use errors.Is for all contained errors
   346  				// whereas fmt.Errorf allows wrapping at most one error which means the
   347  				// other one can not be found anymore.
   348  				err = kerrors.NewAggregate([]error{err, stopErr})
   349  			} else {
   350  				err = stopErr
   351  			}
   352  		}
   353  	}()
   354  
   355  	// Add the cluster runnable.
   356  	if err := cm.add(cm.cluster); err != nil {
   357  		return fmt.Errorf("failed to add cluster to runnables: %w", err)
   358  	}
   359  
   360  	// Metrics should be served whether the controller is leader or not.
   361  	// (If we don't serve metrics for non-leaders, prometheus will still scrape
   362  	// the pod but will get a connection refused).
   363  	if cm.metricsServer != nil {
   364  		// Note: We are adding the metrics server directly to HTTPServers here as matching on the
   365  		// metricsserver.Server interface in cm.runnables.Add would be very brittle.
   366  		if err := cm.runnables.HTTPServers.Add(cm.metricsServer, nil); err != nil {
   367  			return fmt.Errorf("failed to add metrics server: %w", err)
   368  		}
   369  	}
   370  
   371  	// Serve health probes.
   372  	if cm.healthProbeListener != nil {
   373  		if err := cm.addHealthProbeServer(); err != nil {
   374  			return fmt.Errorf("failed to add health probe server: %w", err)
   375  		}
   376  	}
   377  
   378  	// Add pprof server
   379  	if cm.pprofListener != nil {
   380  		if err := cm.addPprofServer(); err != nil {
   381  			return fmt.Errorf("failed to add pprof server: %w", err)
   382  		}
   383  	}
   384  
   385  	// First start any HTTP servers, which includes health probes, metrics and profiling if enabled.
   386  	//
   387  	// WARNING: HTTPServers includes the health probes, which MUST start before any cache is populated, otherwise
   388  	// it would block conversion webhooks to be ready for serving which make the cache never get ready.
   389  	logCtx := logr.NewContext(cm.internalCtx, cm.logger)
   390  	if err := cm.runnables.HTTPServers.Start(logCtx); err != nil {
   391  		return fmt.Errorf("failed to start HTTP servers: %w", err)
   392  	}
   393  
   394  	// Start any webhook servers, which includes conversion, validation, and defaulting
   395  	// webhooks that are registered.
   396  	//
   397  	// WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
   398  	// between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
   399  	// to never start because no cache can be populated.
   400  	if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
   401  		return fmt.Errorf("failed to start webhooks: %w", err)
   402  	}
   403  
   404  	// Start and wait for caches.
   405  	if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
   406  		return fmt.Errorf("failed to start caches: %w", err)
   407  	}
   408  
   409  	// Start the non-leaderelection Runnables after the cache has synced.
   410  	if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
   411  		return fmt.Errorf("failed to start other runnables: %w", err)
   412  	}
   413  
   414  	// Start the leader election and all required runnables.
   415  	{
   416  		ctx, cancel := context.WithCancel(context.Background())
   417  		cm.leaderElectionCancel = cancel
   418  		go func() {
   419  			if cm.resourceLock != nil {
   420  				if err := cm.startLeaderElection(ctx); err != nil {
   421  					cm.errChan <- err
   422  				}
   423  			} else {
   424  				// Treat not having leader election enabled the same as being elected.
   425  				if err := cm.startLeaderElectionRunnables(); err != nil {
   426  					cm.errChan <- err
   427  				}
   428  				close(cm.elected)
   429  			}
   430  		}()
   431  	}
   432  
   433  	ready = true
   434  	cm.Unlock()
   435  	select {
   436  	case <-ctx.Done():
   437  		// We are done
   438  		return nil
   439  	case err := <-cm.errChan:
   440  		// Error starting or running a runnable
   441  		return err
   442  	}
   443  }
   444  
   445  // engageStopProcedure signals all runnables to stop, reads potential errors
   446  // from the errChan and waits for them to end. It must not be called more than once.
   447  func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) error {
   448  	if !atomic.CompareAndSwapInt64(cm.stopProcedureEngaged, 0, 1) {
   449  		return errors.New("stop procedure already engaged")
   450  	}
   451  
   452  	// Populate the shutdown context, this operation MUST be done before
   453  	// closing the internalProceduresStop channel.
   454  	//
   455  	// The shutdown context immediately expires if the gracefulShutdownTimeout is not set.
   456  	var shutdownCancel context.CancelFunc
   457  	if cm.gracefulShutdownTimeout < 0 {
   458  		// We want to wait forever for the runnables to stop.
   459  		cm.shutdownCtx, shutdownCancel = context.WithCancel(context.Background())
   460  	} else {
   461  		cm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout)
   462  	}
   463  	defer shutdownCancel()
   464  
   465  	// Start draining the errors before acquiring the lock to make sure we don't deadlock
   466  	// if something that has the lock is blocked on trying to write into the unbuffered
   467  	// channel after something else already wrote into it.
   468  	var closeOnce sync.Once
   469  	go func() {
   470  		for {
   471  			// Closing in the for loop is required to avoid race conditions between
   472  			// the closure of all internal procedures and making sure to have a reader off the error channel.
   473  			closeOnce.Do(func() {
   474  				// Cancel the internal stop channel and wait for the procedures to stop and complete.
   475  				close(cm.internalProceduresStop)
   476  				cm.internalCancel()
   477  			})
   478  			select {
   479  			case err, ok := <-cm.errChan:
   480  				if ok {
   481  					cm.logger.Error(err, "error received after stop sequence was engaged")
   482  				}
   483  			case <-stopComplete:
   484  				return
   485  			}
   486  		}
   487  	}()
   488  
   489  	// We want to close this after the other runnables stop, because we don't
   490  	// want things like leader election to try and emit events on a closed
   491  	// channel
   492  	defer cm.recorderProvider.Stop(cm.shutdownCtx)
   493  	defer func() {
   494  		// Cancel leader election only after we waited. It will os.Exit() the app for safety.
   495  		if cm.resourceLock != nil {
   496  			// After asking the context to be cancelled, make sure
   497  			// we wait for the leader stopped channel to be closed, otherwise
   498  			// we might encounter race conditions between this code
   499  			// and the event recorder, which is used within leader election code.
   500  			cm.leaderElectionCancel()
   501  			<-cm.leaderElectionStopped
   502  		}
   503  	}()
   504  
   505  	go func() {
   506  		// First stop the non-leader election runnables.
   507  		cm.logger.Info("Stopping and waiting for non leader election runnables")
   508  		cm.runnables.Others.StopAndWait(cm.shutdownCtx)
   509  
   510  		// Stop all the leader election runnables, which includes reconcilers.
   511  		cm.logger.Info("Stopping and waiting for leader election runnables")
   512  		// Prevent leader election when shutting down a non-elected manager
   513  		cm.runnables.LeaderElection.startOnce.Do(func() {})
   514  		cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx)
   515  
   516  		// Stop the caches before the leader election runnables, this is an important
   517  		// step to make sure that we don't race with the reconcilers by receiving more events
   518  		// from the API servers and enqueueing them.
   519  		cm.logger.Info("Stopping and waiting for caches")
   520  		cm.runnables.Caches.StopAndWait(cm.shutdownCtx)
   521  
   522  		// Webhooks and internal HTTP servers should come last, as they might be still serving some requests.
   523  		cm.logger.Info("Stopping and waiting for webhooks")
   524  		cm.runnables.Webhooks.StopAndWait(cm.shutdownCtx)
   525  
   526  		cm.logger.Info("Stopping and waiting for HTTP servers")
   527  		cm.runnables.HTTPServers.StopAndWait(cm.shutdownCtx)
   528  
   529  		// Proceed to close the manager and overall shutdown context.
   530  		cm.logger.Info("Wait completed, proceeding to shutdown the manager")
   531  		shutdownCancel()
   532  	}()
   533  
   534  	<-cm.shutdownCtx.Done()
   535  	if err := cm.shutdownCtx.Err(); err != nil && !errors.Is(err, context.Canceled) {
   536  		if errors.Is(err, context.DeadlineExceeded) {
   537  			if cm.gracefulShutdownTimeout > 0 {
   538  				return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err)
   539  			}
   540  			return nil
   541  		}
   542  		// For any other error, return the error.
   543  		return err
   544  	}
   545  
   546  	return nil
   547  }
   548  
   549  func (cm *controllerManager) startLeaderElectionRunnables() error {
   550  	return cm.runnables.LeaderElection.Start(cm.internalCtx)
   551  }
   552  
   553  func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) {
   554  	l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
   555  		Lock:          cm.resourceLock,
   556  		LeaseDuration: cm.leaseDuration,
   557  		RenewDeadline: cm.renewDeadline,
   558  		RetryPeriod:   cm.retryPeriod,
   559  		Callbacks: leaderelection.LeaderCallbacks{
   560  			OnStartedLeading: func(_ context.Context) {
   561  				if err := cm.startLeaderElectionRunnables(); err != nil {
   562  					cm.errChan <- err
   563  					return
   564  				}
   565  				close(cm.elected)
   566  			},
   567  			OnStoppedLeading: func() {
   568  				if cm.onStoppedLeading != nil {
   569  					cm.onStoppedLeading()
   570  				}
   571  				// Make sure graceful shutdown is skipped if we lost the leader lock without
   572  				// intending to.
   573  				cm.gracefulShutdownTimeout = time.Duration(0)
   574  				// Most implementations of leader election log.Fatal() here.
   575  				// Since Start is wrapped in log.Fatal when called, we can just return
   576  				// an error here which will cause the program to exit.
   577  				cm.errChan <- errors.New("leader election lost")
   578  			},
   579  		},
   580  		ReleaseOnCancel: cm.leaderElectionReleaseOnCancel,
   581  		Name:            cm.leaderElectionID,
   582  	})
   583  	if err != nil {
   584  		return err
   585  	}
   586  
   587  	// Start the leader elector process
   588  	go func() {
   589  		l.Run(ctx)
   590  		<-ctx.Done()
   591  		close(cm.leaderElectionStopped)
   592  	}()
   593  	return nil
   594  }
   595  
   596  func (cm *controllerManager) Elected() <-chan struct{} {
   597  	return cm.elected
   598  }
   599  

View as plain text