...

Source file src/github.com/datawire/ambassador/v2/cmd/entrypoint/testutil_fake_notifier_test.go

Documentation: github.com/datawire/ambassador/v2/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"sync"
     5  )
     6  
     7  // The Notifier struct buffers up notifications to multiple listeners. This is used as plumbing to
     8  // wire up watchers for the K8sStore and ConsulStore. A monotonically increasing changeCount field
     9  // functions as a logical clock tracking how many changes have occured. The notifyCount field tracks
    10  // how many of these changes are to be communicated to listeners. Each listener also tracks its own
    11  // count which starts at zero. This ensures that new listeners are always notified of changes that
    12  // have happened prior to the listener being created.
    13  type Notifier struct {
    14  	cond        *sync.Cond
    15  	autoNotify  bool
    16  	changeCount int // How many total changes have occurred.
    17  	notifyCount int // How many total changes are to be communicated to listeners. This must be <= changeCount.
    18  }
    19  
    20  // NewNotifier constructs a new notifier struct that is ready for use.
    21  func NewNotifier() *Notifier {
    22  	return &Notifier{
    23  		cond: sync.NewCond(&sync.Mutex{}),
    24  	}
    25  }
    26  
    27  // Changed signals that a change has occured that will eventually need to be communicated to all
    28  // listeners.
    29  func (n *Notifier) Changed() {
    30  	callNotify := false
    31  	func() {
    32  		n.cond.L.Lock()
    33  		defer n.cond.L.Unlock()
    34  		n.changeCount += 1
    35  		if n.autoNotify {
    36  			callNotify = true
    37  		}
    38  	}()
    39  
    40  	if callNotify {
    41  		n.Notify()
    42  	}
    43  }
    44  
    45  // AutoNotify controls the notification mode.
    46  func (n *Notifier) AutoNotify(enabled bool) {
    47  	func() {
    48  		n.cond.L.Lock()
    49  		defer n.cond.L.Unlock()
    50  		n.autoNotify = enabled
    51  	}()
    52  
    53  	if enabled {
    54  		n.Notify()
    55  	}
    56  }
    57  
    58  // Notify listeners of an and all outstanding changes.
    59  func (n *Notifier) Notify() {
    60  	n.cond.L.Lock()
    61  	defer n.cond.L.Unlock()
    62  	n.notifyCount = n.changeCount
    63  	n.cond.Broadcast()
    64  }
    65  
    66  type StopFunc func()
    67  
    68  // Listen will invoke the supplied function whenever a change is signaled. Changes will be coalesced
    69  // if they happen quickly enough. A stop function is returned that when invoked will prevent future
    70  // changes from notifying the Listener.
    71  func (n *Notifier) Listen(onChange func()) StopFunc {
    72  	stopped := false
    73  	go func() {
    74  		n.cond.L.Lock()
    75  		defer n.cond.L.Unlock()
    76  		count := 0
    77  		for {
    78  			if stopped {
    79  				return
    80  			}
    81  			if count < n.notifyCount {
    82  				onChange()
    83  				count = n.notifyCount
    84  			}
    85  			n.cond.Wait()
    86  		}
    87  	}()
    88  
    89  	return func() {
    90  		n.cond.L.Lock()
    91  		defer n.cond.L.Unlock()
    92  		stopped = true
    93  		n.cond.Broadcast()
    94  	}
    95  }
    96  

View as plain text