...

Source file src/sigs.k8s.io/controller-runtime/pkg/manager/runnable_group.go

Documentation: sigs.k8s.io/controller-runtime/pkg/manager

     1  package manager
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"sync"
     7  
     8  	"sigs.k8s.io/controller-runtime/pkg/webhook"
     9  )
    10  
    11  var (
    12  	errRunnableGroupStopped = errors.New("can't accept new runnable as stop procedure is already engaged")
    13  )
    14  
    15  // readyRunnable encapsulates a runnable with
    16  // a ready check.
    17  type readyRunnable struct {
    18  	Runnable
    19  	Check       runnableCheck
    20  	signalReady bool
    21  }
    22  
    23  // runnableCheck can be passed to Add() to let the runnable group determine that a
    24  // runnable is ready. A runnable check should block until a runnable is ready,
    25  // if the returned result is false, the runnable is considered not ready and failed.
    26  type runnableCheck func(ctx context.Context) bool
    27  
    28  // runnables handles all the runnables for a manager by grouping them accordingly to their
    29  // type (webhooks, caches etc.).
    30  type runnables struct {
    31  	HTTPServers    *runnableGroup
    32  	Webhooks       *runnableGroup
    33  	Caches         *runnableGroup
    34  	LeaderElection *runnableGroup
    35  	Others         *runnableGroup
    36  }
    37  
    38  // newRunnables creates a new runnables object.
    39  func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
    40  	return &runnables{
    41  		HTTPServers:    newRunnableGroup(baseContext, errChan),
    42  		Webhooks:       newRunnableGroup(baseContext, errChan),
    43  		Caches:         newRunnableGroup(baseContext, errChan),
    44  		LeaderElection: newRunnableGroup(baseContext, errChan),
    45  		Others:         newRunnableGroup(baseContext, errChan),
    46  	}
    47  }
    48  
    49  // Add adds a runnable to closest group of runnable that they belong to.
    50  //
    51  // Add should be able to be called before and after Start, but not after StopAndWait.
    52  // Add should return an error when called during StopAndWait.
    53  // The runnables added before Start are started when Start is called.
    54  // The runnables added after Start are started directly.
    55  func (r *runnables) Add(fn Runnable) error {
    56  	switch runnable := fn.(type) {
    57  	case *Server:
    58  		if runnable.NeedLeaderElection() {
    59  			return r.LeaderElection.Add(fn, nil)
    60  		}
    61  		return r.HTTPServers.Add(fn, nil)
    62  	case hasCache:
    63  		return r.Caches.Add(fn, func(ctx context.Context) bool {
    64  			return runnable.GetCache().WaitForCacheSync(ctx)
    65  		})
    66  	case webhook.Server:
    67  		return r.Webhooks.Add(fn, nil)
    68  	case LeaderElectionRunnable:
    69  		if !runnable.NeedLeaderElection() {
    70  			return r.Others.Add(fn, nil)
    71  		}
    72  		return r.LeaderElection.Add(fn, nil)
    73  	default:
    74  		return r.LeaderElection.Add(fn, nil)
    75  	}
    76  }
    77  
    78  // runnableGroup manages a group of runnables that are
    79  // meant to be running together until StopAndWait is called.
    80  //
    81  // Runnables can be added to a group after the group has started
    82  // but not after it's stopped or while shutting down.
    83  type runnableGroup struct {
    84  	ctx    context.Context
    85  	cancel context.CancelFunc
    86  
    87  	start        sync.Mutex
    88  	startOnce    sync.Once
    89  	started      bool
    90  	startQueue   []*readyRunnable
    91  	startReadyCh chan *readyRunnable
    92  
    93  	stop     sync.RWMutex
    94  	stopOnce sync.Once
    95  	stopped  bool
    96  
    97  	// errChan is the error channel passed by the caller
    98  	// when the group is created.
    99  	// All errors are forwarded to this channel once they occur.
   100  	errChan chan error
   101  
   102  	// ch is the internal channel where the runnables are read off from.
   103  	ch chan *readyRunnable
   104  
   105  	// wg is an internal sync.WaitGroup that allows us to properly stop
   106  	// and wait for all the runnables to finish before returning.
   107  	wg *sync.WaitGroup
   108  }
   109  
   110  func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup {
   111  	r := &runnableGroup{
   112  		startReadyCh: make(chan *readyRunnable),
   113  		errChan:      errChan,
   114  		ch:           make(chan *readyRunnable),
   115  		wg:           new(sync.WaitGroup),
   116  	}
   117  
   118  	r.ctx, r.cancel = context.WithCancel(baseContext())
   119  	return r
   120  }
   121  
   122  // Started returns true if the group has started.
   123  func (r *runnableGroup) Started() bool {
   124  	r.start.Lock()
   125  	defer r.start.Unlock()
   126  	return r.started
   127  }
   128  
   129  // Start starts the group and waits for all
   130  // initially registered runnables to start.
   131  // It can only be called once, subsequent calls have no effect.
   132  func (r *runnableGroup) Start(ctx context.Context) error {
   133  	var retErr error
   134  
   135  	r.startOnce.Do(func() {
   136  		defer close(r.startReadyCh)
   137  
   138  		// Start the internal reconciler.
   139  		go r.reconcile()
   140  
   141  		// Start the group and queue up all
   142  		// the runnables that were added prior.
   143  		r.start.Lock()
   144  		r.started = true
   145  		for _, rn := range r.startQueue {
   146  			rn.signalReady = true
   147  			r.ch <- rn
   148  		}
   149  		r.start.Unlock()
   150  
   151  		// If we don't have any queue, return.
   152  		if len(r.startQueue) == 0 {
   153  			return
   154  		}
   155  
   156  		// Wait for all runnables to signal.
   157  		for {
   158  			select {
   159  			case <-ctx.Done():
   160  				if err := ctx.Err(); !errors.Is(err, context.Canceled) {
   161  					retErr = err
   162  				}
   163  			case rn := <-r.startReadyCh:
   164  				for i, existing := range r.startQueue {
   165  					if existing == rn {
   166  						// Remove the item from the start queue.
   167  						r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)
   168  						break
   169  					}
   170  				}
   171  				// We're done waiting if the queue is empty, return.
   172  				if len(r.startQueue) == 0 {
   173  					return
   174  				}
   175  			}
   176  		}
   177  	})
   178  
   179  	return retErr
   180  }
   181  
   182  // reconcile is our main entrypoint for every runnable added
   183  // to this group. Its primary job is to read off the internal channel
   184  // and schedule runnables while tracking their state.
   185  func (r *runnableGroup) reconcile() {
   186  	for runnable := range r.ch {
   187  		// Handle stop.
   188  		// If the shutdown has been called we want to avoid
   189  		// adding new goroutines to the WaitGroup because Wait()
   190  		// panics if Add() is called after it.
   191  		{
   192  			r.stop.RLock()
   193  			if r.stopped {
   194  				// Drop any runnables if we're stopped.
   195  				r.errChan <- errRunnableGroupStopped
   196  				r.stop.RUnlock()
   197  				continue
   198  			}
   199  
   200  			// Why is this here?
   201  			// When StopAndWait is called, if a runnable is in the process
   202  			// of being added, we could end up in a situation where
   203  			// the WaitGroup is incremented while StopAndWait has called Wait(),
   204  			// which would result in a panic.
   205  			r.wg.Add(1)
   206  			r.stop.RUnlock()
   207  		}
   208  
   209  		// Start the runnable.
   210  		go func(rn *readyRunnable) {
   211  			go func() {
   212  				if rn.Check(r.ctx) {
   213  					if rn.signalReady {
   214  						r.startReadyCh <- rn
   215  					}
   216  				}
   217  			}()
   218  
   219  			// If we return, the runnable ended cleanly
   220  			// or returned an error to the channel.
   221  			//
   222  			// We should always decrement the WaitGroup here.
   223  			defer r.wg.Done()
   224  
   225  			// Start the runnable.
   226  			if err := rn.Start(r.ctx); err != nil {
   227  				r.errChan <- err
   228  			}
   229  		}(runnable)
   230  	}
   231  }
   232  
   233  // Add should be able to be called before and after Start, but not after StopAndWait.
   234  // Add should return an error when called during StopAndWait.
   235  func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
   236  	r.stop.RLock()
   237  	if r.stopped {
   238  		r.stop.RUnlock()
   239  		return errRunnableGroupStopped
   240  	}
   241  	r.stop.RUnlock()
   242  
   243  	if ready == nil {
   244  		ready = func(_ context.Context) bool { return true }
   245  	}
   246  
   247  	readyRunnable := &readyRunnable{
   248  		Runnable: rn,
   249  		Check:    ready,
   250  	}
   251  
   252  	// Handle start.
   253  	// If the overall runnable group isn't started yet
   254  	// we want to buffer the runnables and let Start()
   255  	// queue them up again later.
   256  	{
   257  		r.start.Lock()
   258  
   259  		// Check if we're already started.
   260  		if !r.started {
   261  			// Store the runnable in the internal if not.
   262  			r.startQueue = append(r.startQueue, readyRunnable)
   263  			r.start.Unlock()
   264  			return nil
   265  		}
   266  		r.start.Unlock()
   267  	}
   268  
   269  	// Recheck if we're stopped and hold the readlock, given that the stop and start can be called
   270  	// at the same time, we can end up in a situation where the runnable is added
   271  	// after the group is stopped and the channel is closed.
   272  	r.stop.RLock()
   273  	defer r.stop.RUnlock()
   274  	if r.stopped {
   275  		return errRunnableGroupStopped
   276  	}
   277  
   278  	// Enqueue the runnable.
   279  	r.ch <- readyRunnable
   280  	return nil
   281  }
   282  
   283  // StopAndWait waits for all the runnables to finish before returning.
   284  func (r *runnableGroup) StopAndWait(ctx context.Context) {
   285  	r.stopOnce.Do(func() {
   286  		// Close the reconciler channel once we're done.
   287  		defer func() {
   288  			r.stop.Lock()
   289  			close(r.ch)
   290  			r.stop.Unlock()
   291  		}()
   292  
   293  		_ = r.Start(ctx)
   294  		r.stop.Lock()
   295  		// Store the stopped variable so we don't accept any new
   296  		// runnables for the time being.
   297  		r.stopped = true
   298  		r.stop.Unlock()
   299  
   300  		// Cancel the internal channel.
   301  		r.cancel()
   302  
   303  		done := make(chan struct{})
   304  		go func() {
   305  			defer close(done)
   306  			// Wait for all the runnables to finish.
   307  			r.wg.Wait()
   308  		}()
   309  
   310  		select {
   311  		case <-done:
   312  			// We're done, exit.
   313  		case <-ctx.Done():
   314  			// Calling context has expired, exit.
   315  		}
   316  	})
   317  }
   318  

View as plain text