...

Source file src/edge-infra.dev/pkg/edge/psqlinjector/receivers.go

Documentation: edge-infra.dev/pkg/edge/psqlinjector

     1  package server
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"sync"
     7  	"sync/atomic"
     8  	"time"
     9  
    10  	"edge-infra.dev/pkg/lib/fog"
    11  
    12  	"cloud.google.com/go/pubsub"
    13  	"github.com/go-logr/logr"
    14  )
    15  
    16  type ReceiverHandler func(context.Context, *pubsub.Message) error
    17  
    18  type Receiver struct {
    19  	projectID    string
    20  	handler      ReceiverHandler
    21  	subscription *pubsub.Subscription
    22  	started      chan struct{}
    23  	log          logr.Logger
    24  
    25  	// used to wait for subscriptions to exist, and used to poll for subscriptions that have been deleted.
    26  	pollSubscriptionExistsPeriod time.Duration
    27  
    28  	/*
    29  		shutdown is created using the Start function's context.
    30  
    31  		You must check that `started` is closed before calling `shutdown`
    32  	*/
    33  	shutdown context.CancelFunc
    34  
    35  	/*
    36  		--- health check fields ---
    37  
    38  		Read the comments and code at the bottom of this file to see how they're used.
    39  
    40  		TODO evaluate replacing this stuff with prometheus.
    41  	*/
    42  	healthMu       sync.Mutex
    43  	health         ReceiverHealthCheck
    44  	receiveStart   time.Time
    45  	messagesAcked  atomic.Int64
    46  	messagesNacked atomic.Int64
    47  }
    48  
    49  func NewReceiver(projectID string, subscription *pubsub.Subscription, handler ReceiverHandler, pollPeriod time.Duration) *Receiver {
    50  	return &Receiver{
    51  		projectID:                    projectID,
    52  		handler:                      handler,
    53  		subscription:                 subscription,
    54  		started:                      make(chan struct{}),
    55  		pollSubscriptionExistsPeriod: pollPeriod,
    56  	}
    57  }
    58  
    59  // Close is non-blocking and asynchronous. An error is returned if the Receiver hasn't been started.
    60  func (r *Receiver) Close() error {
    61  	select {
    62  	case <-r.started:
    63  		r.shutdown()
    64  		return nil
    65  	default:
    66  		return fmt.Errorf("close requires the receiver to be started")
    67  	}
    68  }
    69  
    70  // Start receives from the subscription.  Start is self-healing and retries when errors occur. It only returns if the provided context is canceled or Close is called.
    71  func (r *Receiver) Start(ctx context.Context) {
    72  	r.log = fog.New().WithName("receiver").WithValues("project_id", r.projectID)
    73  	ctx, r.shutdown = context.WithCancel(ctx)
    74  	close(r.started)
    75  
    76  	for {
    77  		if !r.waitUntilExists(ctx) {
    78  			return // the context is canceled.
    79  		}
    80  
    81  		if err := r.receive(ctx); err != nil {
    82  			// errors are not emitted by `pubsub.Subscription.Receive` when contexts are canceled.
    83  			r.log.Error(err, "receiver quit unexpectedly")
    84  		}
    85  
    86  		select {
    87  		case <-ctx.Done():
    88  			return
    89  		default:
    90  		}
    91  	}
    92  }
    93  func (r *Receiver) receive(ctx context.Context) error {
    94  	r.updateHealthy()
    95  	defer r.updateHealthTotals()
    96  
    97  	ctx, cancel := r.contextWhileSubscriptionExists(ctx)
    98  	defer cancel()
    99  
   100  	return r.subscription.Receive(ctx, r.handle)
   101  }
   102  
   103  func (r *Receiver) handle(ctx context.Context, msg *pubsub.Message) {
   104  	if err := r.handler(ctx, msg); err != nil {
   105  		r.messagesNacked.Add(1)
   106  		msg.Nack()
   107  		return
   108  	}
   109  	r.messagesAcked.Add(1)
   110  	msg.Ack()
   111  }
   112  
   113  // This async poll is needed to force Receive to return when a subscription is deleted.
   114  // Without a periodic check, the Receive function will never return.
   115  func (r *Receiver) contextWhileSubscriptionExists(ctx context.Context) (context.Context, context.CancelFunc) {
   116  	ctx, cancel := context.WithCancel(ctx)
   117  
   118  	var quietErrs bool
   119  
   120  	// TODO evaluate moving to a worker thread architecture to reduce goroutine count and resource usage.
   121  	go func() {
   122  		defer cancel()
   123  		for {
   124  			select {
   125  			case <-ctx.Done():
   126  				return
   127  			case <-time.After(r.pollSubscriptionExistsPeriod):
   128  			}
   129  
   130  			exists, err := r.subscription.Exists(ctx)
   131  			if err != nil {
   132  				if !quietErrs {
   133  					r.log.Error(err, "failed to probe if subscription exists")
   134  
   135  					// this prevents a flood of identical error logs from being emitted.
   136  					quietErrs = true
   137  				}
   138  				continue
   139  			}
   140  			// this allows new errors to be logged.
   141  			quietErrs = false
   142  
   143  			if !exists {
   144  				r.log.Info("receiver subscription no longer exists")
   145  				return
   146  			}
   147  		}
   148  	}()
   149  
   150  	return ctx, cancel
   151  }
   152  
   153  func (r *Receiver) waitUntilExists(ctx context.Context) bool {
   154  	// fast exit so happy path tests don't have to wait.
   155  	if exists, err := r.subscription.Exists(ctx); err != nil && exists {
   156  		return true
   157  	}
   158  
   159  	var quietInfo, quietErrs bool
   160  
   161  	for {
   162  		select {
   163  		case <-time.After(r.pollSubscriptionExistsPeriod):
   164  		case <-ctx.Done():
   165  			return false
   166  		}
   167  
   168  		exists, err := r.subscription.Exists(ctx)
   169  
   170  		if err != nil {
   171  			if !quietErrs {
   172  				r.log.Error(err, "failed to check if subscription exists at startup")
   173  
   174  				// this prevents a flood of identical error logs from being emitted.
   175  				quietErrs = true
   176  			}
   177  			continue
   178  		}
   179  		// this allows new errors to be logged.
   180  		quietErrs = false
   181  
   182  		if !exists {
   183  			if !quietInfo {
   184  				r.log.Info("subscription does not exist")
   185  
   186  				// this ensures the "subscription does not exist" log is only printed once.
   187  				quietInfo = true
   188  			}
   189  			continue
   190  		}
   191  
   192  		// done waiting for the subscription to exist!
   193  		return true
   194  	}
   195  }
   196  
   197  // ReceiverHealthCheck provides metrics for each Receiver since it was added to the BannerMux.
   198  //
   199  // NOTE:
   200  // There can be a delay for the Receiver to report itself as unhealthy.
   201  // This is due to the nature of polling.
   202  type ReceiverHealthCheck struct {
   203  	/*
   204  		`Total*` and `RestartCount` tracks info since `Start` was called.
   205  	*/
   206  
   207  	RestartCount           int
   208  	TotalMessagesProcessed int
   209  	TotalReceiveDuration   time.Duration
   210  	TotalMessagesAcked     int
   211  	TotalMessagesNacked    int
   212  
   213  	/*
   214  		`Current*` and `Healthy` tracks info since `receive` was called.
   215  
   216  		When the `receive` function returns, these values are added to their respective `Total*`, and then set to zero.
   217  	*/
   218  
   219  	Healthy                  bool // Healthy indicates the Receiver is currently receiving messages
   220  	CurrentReceiveDuration   time.Duration
   221  	CurrentMessagesProcessed int
   222  	CurrentMessagesAcked     int
   223  	CurrentMessagesNacked    int
   224  }
   225  
   226  /*
   227  	The following code is ugly, but it's tested.
   228  */
   229  
   230  // HealthCheck calculates up-to-date metrics for the receiver.
   231  func (r *Receiver) HealthCheck() ReceiverHealthCheck {
   232  	r.healthMu.Lock()
   233  	defer r.healthMu.Unlock()
   234  
   235  	var dur time.Duration
   236  	if !r.receiveStart.IsZero() {
   237  		dur = time.Since(r.receiveStart)
   238  	}
   239  
   240  	var current = struct {
   241  		Acked    int
   242  		Nacked   int
   243  		Duration time.Duration
   244  	}{
   245  		Acked:    int(r.messagesAcked.Load()),
   246  		Nacked:   int(r.messagesNacked.Load()),
   247  		Duration: dur,
   248  	}
   249  
   250  	// this is a value not a pointer!!!
   251  	var hc = r.health
   252  
   253  	hc.TotalMessagesAcked += current.Acked
   254  	hc.TotalMessagesNacked += current.Nacked
   255  	hc.TotalMessagesProcessed += current.Acked + current.Nacked
   256  	hc.TotalReceiveDuration += current.Duration
   257  
   258  	return ReceiverHealthCheck{
   259  		Healthy:      hc.Healthy,
   260  		RestartCount: hc.RestartCount,
   261  
   262  		TotalMessagesAcked:     hc.TotalMessagesAcked,
   263  		TotalMessagesNacked:    hc.TotalMessagesNacked,
   264  		TotalMessagesProcessed: hc.TotalMessagesProcessed,
   265  		TotalReceiveDuration:   hc.TotalReceiveDuration,
   266  
   267  		CurrentReceiveDuration:   current.Duration,
   268  		CurrentMessagesAcked:     current.Acked,
   269  		CurrentMessagesNacked:    current.Nacked,
   270  		CurrentMessagesProcessed: current.Acked + current.Nacked,
   271  	}
   272  }
   273  
   274  func (r *Receiver) updateHealthy() {
   275  	r.healthMu.Lock()
   276  	defer r.healthMu.Unlock()
   277  
   278  	r.receiveStart = time.Now()
   279  	r.health.Healthy = true
   280  }
   281  
   282  func (r *Receiver) updateHealthTotals() {
   283  	r.healthMu.Lock()
   284  	defer r.healthMu.Unlock()
   285  
   286  	r.health.TotalMessagesAcked += int(r.messagesAcked.Swap(0))
   287  	r.health.TotalMessagesNacked += int(r.messagesNacked.Swap(0))
   288  	r.health.TotalMessagesProcessed += r.health.TotalMessagesAcked + r.health.TotalMessagesNacked
   289  
   290  	r.health.RestartCount++
   291  	r.health.Healthy = false
   292  
   293  	r.health.TotalReceiveDuration += time.Since(r.receiveStart)
   294  	r.receiveStart = time.Time{}
   295  }
   296  

View as plain text