...

Source file src/edge-infra.dev/pkg/lib/runtime/manager/runnable_group.go

Documentation: edge-infra.dev/pkg/lib/runtime/manager

     1  package manager
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"sync"
     7  )
     8  
     9  var (
    10  	errRunnableGroupStopped = errors.New("can't accept new runnable as stop procedure is already engaged")
    11  )
    12  
    13  // readyRunnable encapsulates a runnable with a ready check.
    14  type readyRunnable struct {
    15  	Runnable
    16  	Check       runnableCheck
    17  	signalReady bool
    18  }
    19  
    20  // runnableCheck can be passed to Add() to let the runnable group determine that a
    21  // runnable is ready. A runnable check should block until a runnable is ready,
    22  // if the returned result is false, the runnable is considered not ready and failed.
    23  type runnableCheck func(ctx context.Context) bool
    24  
    25  // runnables handles all the runnables for a manager by grouping them accordingly to their
    26  // type (webhooks, caches etc.).
    27  type runnables struct {
    28  	// Webhooks       *runnableGroup
    29  	// Caches         *runnableGroup
    30  	// LeaderElection *runnableGroup
    31  	Others *runnableGroup
    32  }
    33  
    34  // newRunnables creates a new runnables object.
    35  func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
    36  	return &runnables{
    37  		// Webhooks:       newRunnableGroup(baseContext, errChan),
    38  		// Caches:         newRunnableGroup(baseContext, errChan),
    39  		// LeaderElection: newRunnableGroup(baseContext, errChan),
    40  		Others: newRunnableGroup(baseContext, errChan),
    41  	}
    42  }
    43  
    44  // Add adds a runnable to closest group of runnable that they belong to.
    45  //
    46  // Add should be able to be called before and after Start, but not after StopAndWait.
    47  // Add should return an error when called during StopAndWait.
    48  // The runnables added before Start are started when Start is called.
    49  // The runnables added after Start are started directly.
    50  func (r *runnables) Add(fn Runnable) error {
    51  	// switch runnable := fn.(type) {
    52  	// case hasCache:
    53  	// 	return r.Caches.Add(fn, func(ctx context.Context) bool {
    54  	// 		return runnable.GetCache().WaitForCacheSync(ctx)
    55  	// 	})
    56  	// case *webhook.Server:
    57  	// 	return r.Webhooks.Add(fn, nil)
    58  	// case LeaderElectionRunnable:
    59  	// 	if !runnable.NeedLeaderElection() {
    60  	// 		return r.Others.Add(fn, nil)
    61  	// 	}
    62  	// 	return r.LeaderElection.Add(fn, nil)
    63  	// default:
    64  	// 	return r.LeaderElection.Add(fn, nil)
    65  	// }
    66  	return r.Others.Add(fn, nil)
    67  }
    68  
    69  // runnableGroup manages a group of runnables that are
    70  // meant to be running together until StopAndWait is called.
    71  //
    72  // Runnables can be added to a group after the group has started
    73  // but not after it's stopped or while shutting down.
    74  type runnableGroup struct {
    75  	ctx    context.Context
    76  	cancel context.CancelFunc
    77  
    78  	start        sync.Mutex
    79  	startOnce    sync.Once
    80  	started      bool
    81  	startQueue   []*readyRunnable
    82  	startReadyCh chan *readyRunnable
    83  
    84  	stop     sync.RWMutex
    85  	stopOnce sync.Once
    86  	stopped  bool
    87  
    88  	// errChan is the error channel passed by the caller
    89  	// when the group is created.
    90  	// All errors are forwarded to this channel once they occur.
    91  	errChan chan error
    92  
    93  	// ch is the internal channel where the runnables are read off from.
    94  	ch chan *readyRunnable
    95  
    96  	// wg is an internal sync.WaitGroup that allows us to properly stop
    97  	// and wait for all the runnables to finish before returning.
    98  	wg *sync.WaitGroup
    99  }
   100  
   101  func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup {
   102  	r := &runnableGroup{
   103  		startReadyCh: make(chan *readyRunnable),
   104  		errChan:      errChan,
   105  		ch:           make(chan *readyRunnable),
   106  		wg:           new(sync.WaitGroup),
   107  	}
   108  
   109  	r.ctx, r.cancel = context.WithCancel(baseContext())
   110  	return r
   111  }
   112  
   113  // Started returns true if the group has started.
   114  func (r *runnableGroup) Started() bool {
   115  	r.start.Lock()
   116  	defer r.start.Unlock()
   117  	return r.started
   118  }
   119  
   120  // Start starts the group and waits for all
   121  // initially registered runnables to start.
   122  // It can only be called once, subsequent calls have no effect.
   123  func (r *runnableGroup) Start(ctx context.Context) error {
   124  	var retErr error
   125  
   126  	r.startOnce.Do(func() {
   127  		defer close(r.startReadyCh)
   128  
   129  		// Start the internal reconciler.
   130  		go r.reconcile()
   131  
   132  		// Start the group and queue up all
   133  		// the runnables that were added prior.
   134  		r.start.Lock()
   135  		r.started = true
   136  		for _, rn := range r.startQueue {
   137  			rn.signalReady = true
   138  			r.ch <- rn
   139  		}
   140  		r.start.Unlock()
   141  
   142  		// If we don't have any queue, return.
   143  		if len(r.startQueue) == 0 {
   144  			return
   145  		}
   146  
   147  		// Wait for all runnables to signal.
   148  		for {
   149  			select {
   150  			case <-ctx.Done():
   151  				if err := ctx.Err(); !errors.Is(err, context.Canceled) {
   152  					retErr = err
   153  				}
   154  			case rn := <-r.startReadyCh:
   155  				for i, existing := range r.startQueue {
   156  					if existing == rn {
   157  						// Remove the item from the start queue.
   158  						r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)
   159  						break
   160  					}
   161  				}
   162  				// We're done waiting if the queue is empty, return.
   163  				if len(r.startQueue) == 0 {
   164  					return
   165  				}
   166  			}
   167  		}
   168  	})
   169  
   170  	return retErr
   171  }
   172  
   173  // reconcile is our main entrypoint for every runnable added
   174  // to this group. Its primary job is to read off the internal channel
   175  // and schedule runnables while tracking their state.
   176  func (r *runnableGroup) reconcile() {
   177  	for runnable := range r.ch {
   178  		// Handle stop.
   179  		// If the shutdown has been called we want to avoid
   180  		// adding new goroutines to the WaitGroup because Wait()
   181  		// panics if Add() is called after it.
   182  		{
   183  			r.stop.RLock()
   184  			if r.stopped {
   185  				// Drop any runnables if we're stopped.
   186  				r.errChan <- errRunnableGroupStopped
   187  				r.stop.RUnlock()
   188  				continue
   189  			}
   190  
   191  			// Why is this here?
   192  			// When StopAndWait is called, if a runnable is in the process
   193  			// of being added, we could end up in a situation where
   194  			// the WaitGroup is incremented while StopAndWait has called Wait(),
   195  			// which would result in a panic.
   196  			r.wg.Add(1)
   197  			r.stop.RUnlock()
   198  		}
   199  
   200  		// Start the runnable.
   201  		go func(rn *readyRunnable) {
   202  			go func() {
   203  				if rn.Check(r.ctx) {
   204  					if rn.signalReady {
   205  						r.startReadyCh <- rn
   206  					}
   207  				}
   208  			}()
   209  
   210  			// If we return, the runnable ended cleanly
   211  			// or returned an error to the channel.
   212  			//
   213  			// We should always decrement the WaitGroup here.
   214  			defer r.wg.Done()
   215  
   216  			// Start the runnable.
   217  			if err := rn.Start(r.ctx); err != nil {
   218  				r.errChan <- err
   219  			}
   220  		}(runnable)
   221  	}
   222  }
   223  
   224  // Add should be able to be called before and after Start, but not after StopAndWait.
   225  // Add should return an error when called during StopAndWait.
   226  func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
   227  	r.stop.RLock()
   228  	if r.stopped {
   229  		r.stop.RUnlock()
   230  		return errRunnableGroupStopped
   231  	}
   232  	r.stop.RUnlock()
   233  
   234  	if ready == nil {
   235  		ready = func(_ context.Context) bool { return true }
   236  	}
   237  
   238  	readyRunnable := &readyRunnable{
   239  		Runnable: rn,
   240  		Check:    ready,
   241  	}
   242  
   243  	// Handle start.
   244  	// If the overall runnable group isn't started yet
   245  	// we want to buffer the runnables and let Start()
   246  	// queue them up again later.
   247  	{
   248  		r.start.Lock()
   249  
   250  		// Check if we're already started.
   251  		if !r.started {
   252  			// Store the runnable in the internal if not.
   253  			r.startQueue = append(r.startQueue, readyRunnable)
   254  			r.start.Unlock()
   255  			return nil
   256  		}
   257  		r.start.Unlock()
   258  	}
   259  
   260  	// Enqueue the runnable.
   261  	r.ch <- readyRunnable
   262  	return nil
   263  }
   264  
   265  // StopAndWait waits for all the runnables to finish before returning.
   266  func (r *runnableGroup) StopAndWait(ctx context.Context) {
   267  	r.stopOnce.Do(func() {
   268  		// Close the reconciler channel once we're done.
   269  		defer close(r.ch)
   270  
   271  		_ = r.Start(ctx)
   272  		r.stop.Lock()
   273  		// Store the stopped variable so we don't accept any new
   274  		// runnables for the time being.
   275  		r.stopped = true
   276  		r.stop.Unlock()
   277  
   278  		// Cancel the internal channel.
   279  		r.cancel()
   280  
   281  		done := make(chan struct{})
   282  		go func() {
   283  			defer close(done)
   284  			// Wait for all the runnables to finish.
   285  			r.wg.Wait()
   286  		}()
   287  
   288  		select {
   289  		case <-done:
   290  			// We're done, exit.
   291  		case <-ctx.Done():
   292  			// Calling context has expired, exit.
   293  		}
   294  	})
   295  }
   296  

View as plain text