...

Source file src/github.com/palantir/go-githubapp/githubapp/scheduler.go

Documentation: github.com/palantir/go-githubapp/githubapp

     1  // Copyright 2020 Palantir Technologies, Inc.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package githubapp
    16  
    17  import (
    18  	"context"
    19  	"sync/atomic"
    20  	"time"
    21  
    22  	"github.com/pkg/errors"
    23  	"github.com/rcrowley/go-metrics"
    24  	"github.com/rs/zerolog"
    25  )
    26  
    27  const (
    28  	MetricsKeyQueueLength   = "github.event.queued"
    29  	MetricsKeyActiveWorkers = "github.event.workers"
    30  	MetricsKeyEventAge      = "github.event.age"
    31  	MetricsKeyDroppedEvents = "github.event.dropped"
    32  )
    33  
    34  const (
    35  	// values from metrics.NewTimer, which match those used by UNIX load averages
    36  	histogramReservoirSize = 1028
    37  	histogramAlpha         = 0.015
    38  )
    39  
    40  var (
    41  	ErrCapacityExceeded = errors.New("scheduler: capacity exceeded")
    42  )
    43  
    44  // Dispatch is a webhook payload and the handler that handles it.
    45  type Dispatch struct {
    46  	Handler EventHandler
    47  
    48  	EventType  string
    49  	DeliveryID string
    50  	Payload    []byte
    51  }
    52  
    53  // Execute calls the Dispatch's handler with the stored arguments.
    54  func (d Dispatch) Execute(ctx context.Context) error {
    55  	return d.Handler.Handle(ctx, d.EventType, d.DeliveryID, d.Payload)
    56  }
    57  
    58  // AsyncErrorCallback is called by an asynchronous scheduler when an event
    59  // handler returns an error or panics. The error from the handler is passed
    60  // directly as the final argument.
    61  //
    62  // If the handler panics, err will be a HandlerPanicError.
    63  type AsyncErrorCallback func(ctx context.Context, d Dispatch, err error)
    64  
    65  // DefaultAsyncErrorCallback logs errors.
    66  func DefaultAsyncErrorCallback(ctx context.Context, d Dispatch, err error) {
    67  	defaultAsyncErrorCallback(ctx, d, err)
    68  }
    69  
    70  var defaultAsyncErrorCallback = MetricsAsyncErrorCallback(nil)
    71  
    72  // MetricsAsyncErrorCallback logs errors and increments an error counter.
    73  func MetricsAsyncErrorCallback(reg metrics.Registry) AsyncErrorCallback {
    74  	return func(ctx context.Context, d Dispatch, err error) {
    75  		zerolog.Ctx(ctx).Error().Err(err).Msg("Unexpected error handling webhook")
    76  		errorCounter(reg, d.EventType).Inc(1)
    77  	}
    78  }
    79  
    80  // ContextDeriver creates a new independent context from a request's context.
    81  // The new context must be based on context.Background(), not the input.
    82  type ContextDeriver func(context.Context) context.Context
    83  
    84  // DefaultContextDeriver copies the logger from the request's context to a new
    85  // context.
    86  func DefaultContextDeriver(ctx context.Context) context.Context {
    87  	newCtx := context.Background()
    88  
    89  	// this value is always unused by async schedulers, but is set for
    90  	// compatibility with existing handlers that call SetResponder
    91  	newCtx = InitializeResponder(newCtx)
    92  
    93  	return zerolog.Ctx(ctx).WithContext(newCtx)
    94  }
    95  
    96  // Scheduler is a strategy for executing event handlers.
    97  //
    98  // The Schedule method takes a Dispatch and executes it by calling the handler
    99  // for the payload. The execution may be asynchronous, but the scheduler must
   100  // create a new context in this case. The dispatcher waits for Schedule to
   101  // return before responding to GitHub, so asynchronous schedulers should only
   102  // return errors that happen during scheduling, not during execution.
   103  //
   104  // Schedule may return ErrCapacityExceeded if it cannot schedule or queue new
   105  // events at the time of the call.
   106  type Scheduler interface {
   107  	Schedule(ctx context.Context, d Dispatch) error
   108  }
   109  
   110  // SchedulerOption configures properties of a scheduler.
   111  type SchedulerOption func(*scheduler)
   112  
   113  // WithAsyncErrorCallback sets the error callback for an asynchronous
   114  // scheduler. If not set, the scheduler uses DefaultAsyncErrorCallback.
   115  func WithAsyncErrorCallback(onError AsyncErrorCallback) SchedulerOption {
   116  	return func(s *scheduler) {
   117  		if onError != nil {
   118  			s.onError = onError
   119  		}
   120  	}
   121  }
   122  
   123  // WithContextDeriver sets the context deriver for an asynchronous scheduler.
   124  // If not set, the scheduler uses DefaultContextDeriver.
   125  func WithContextDeriver(deriver ContextDeriver) SchedulerOption {
   126  	return func(s *scheduler) {
   127  		if deriver != nil {
   128  			s.deriver = deriver
   129  		}
   130  	}
   131  }
   132  
   133  // WithSchedulingMetrics enables metrics reporting for schedulers.
   134  func WithSchedulingMetrics(r metrics.Registry) SchedulerOption {
   135  	return func(s *scheduler) {
   136  		metrics.NewRegisteredFunctionalGauge(MetricsKeyQueueLength, r, func() int64 {
   137  			return int64(len(s.queue))
   138  		})
   139  		metrics.NewRegisteredFunctionalGauge(MetricsKeyActiveWorkers, r, func() int64 {
   140  			return atomic.LoadInt64(&s.activeWorkers)
   141  		})
   142  
   143  		sample := metrics.NewExpDecaySample(histogramReservoirSize, histogramAlpha)
   144  		s.eventAge = metrics.NewRegisteredHistogram(MetricsKeyEventAge, r, sample)
   145  		s.dropped = metrics.NewRegisteredCounter(MetricsKeyDroppedEvents, r)
   146  	}
   147  }
   148  
   149  type queueDispatch struct {
   150  	ctx context.Context
   151  	t   time.Time
   152  	d   Dispatch
   153  }
   154  
   155  // core functionality and options for (async) schedulers
   156  type scheduler struct {
   157  	onError AsyncErrorCallback
   158  	deriver ContextDeriver
   159  
   160  	activeWorkers int64
   161  	queue         chan queueDispatch
   162  
   163  	eventAge metrics.Histogram
   164  	dropped  metrics.Counter
   165  }
   166  
   167  func (s *scheduler) safeExecute(ctx context.Context, d Dispatch) {
   168  	var err error
   169  	defer func() {
   170  		atomic.AddInt64(&s.activeWorkers, -1)
   171  		if r := recover(); r != nil {
   172  			err = HandlerPanicError{
   173  				value: r,
   174  				stack: getStack(1),
   175  			}
   176  		}
   177  		if err != nil && s.onError != nil {
   178  			s.onError(ctx, d, err)
   179  		}
   180  	}()
   181  
   182  	atomic.AddInt64(&s.activeWorkers, 1)
   183  	err = d.Execute(ctx)
   184  }
   185  
   186  func (s *scheduler) derive(ctx context.Context) context.Context {
   187  	if s.deriver == nil {
   188  		return ctx
   189  	}
   190  	return s.deriver(ctx)
   191  }
   192  
   193  // DefaultScheduler returns a scheduler that executes handlers in the go
   194  // routine of the caller and returns any error.
   195  func DefaultScheduler() Scheduler {
   196  	return &defaultScheduler{}
   197  }
   198  
   199  type defaultScheduler struct{}
   200  
   201  func (s *defaultScheduler) Schedule(ctx context.Context, d Dispatch) error {
   202  	return d.Execute(ctx)
   203  }
   204  
   205  // AsyncScheduler returns a scheduler that executes handlers in new goroutines.
   206  // Goroutines are not reused and there is no limit on the number created.
   207  func AsyncScheduler(opts ...SchedulerOption) Scheduler {
   208  	s := &asyncScheduler{
   209  		scheduler: scheduler{
   210  			deriver: DefaultContextDeriver,
   211  			onError: DefaultAsyncErrorCallback,
   212  		},
   213  	}
   214  	for _, opt := range opts {
   215  		opt(&s.scheduler)
   216  	}
   217  	return s
   218  }
   219  
   220  type asyncScheduler struct {
   221  	scheduler
   222  }
   223  
   224  func (s *asyncScheduler) Schedule(ctx context.Context, d Dispatch) error {
   225  	go s.safeExecute(s.derive(ctx), d)
   226  	return nil
   227  }
   228  
   229  // QueueAsyncScheduler returns a scheduler that executes handlers in a fixed
   230  // number of worker goroutines. If no workers are available, events queue until
   231  // the queue is full.
   232  func QueueAsyncScheduler(queueSize int, workers int, opts ...SchedulerOption) Scheduler {
   233  	if queueSize < 0 {
   234  		panic("QueueAsyncScheduler: queue size must be non-negative")
   235  	}
   236  	if workers < 1 {
   237  		panic("QueueAsyncScheduler: worker count must be positive")
   238  	}
   239  
   240  	s := &queueScheduler{
   241  		scheduler: scheduler{
   242  			deriver: DefaultContextDeriver,
   243  			onError: DefaultAsyncErrorCallback,
   244  			queue:   make(chan queueDispatch, queueSize),
   245  		},
   246  	}
   247  	for _, opt := range opts {
   248  		opt(&s.scheduler)
   249  	}
   250  
   251  	for i := 0; i < workers; i++ {
   252  		go func() {
   253  			for d := range s.queue {
   254  				if s.eventAge != nil {
   255  					s.eventAge.Update(time.Since(d.t).Milliseconds())
   256  				}
   257  				s.safeExecute(d.ctx, d.d)
   258  			}
   259  		}()
   260  	}
   261  
   262  	return s
   263  }
   264  
   265  type queueScheduler struct {
   266  	scheduler
   267  }
   268  
   269  func (s *queueScheduler) Schedule(ctx context.Context, d Dispatch) error {
   270  	select {
   271  	case s.queue <- queueDispatch{ctx: s.derive(ctx), t: time.Now(), d: d}:
   272  	default:
   273  		if s.dropped != nil {
   274  			s.dropped.Inc(1)
   275  		}
   276  		return ErrCapacityExceeded
   277  	}
   278  	return nil
   279  }
   280  

View as plain text