...

Source file src/github.com/godbus/dbus/v5/sequential_handler.go

Documentation: github.com/godbus/dbus/v5

     1  package dbus
     2  
     3  import (
     4  	"sync"
     5  )
     6  
     7  // NewSequentialSignalHandler returns an instance of a new
     8  // signal handler that guarantees sequential processing of signals. It is a
     9  // guarantee of this signal handler that signals will be written to
    10  // channels in the order they are received on the DBus connection.
    11  func NewSequentialSignalHandler() SignalHandler {
    12  	return &sequentialSignalHandler{}
    13  }
    14  
    15  type sequentialSignalHandler struct {
    16  	mu      sync.RWMutex
    17  	closed  bool
    18  	signals []*sequentialSignalChannelData
    19  }
    20  
    21  func (sh *sequentialSignalHandler) DeliverSignal(intf, name string, signal *Signal) {
    22  	sh.mu.RLock()
    23  	defer sh.mu.RUnlock()
    24  	if sh.closed {
    25  		return
    26  	}
    27  	for _, scd := range sh.signals {
    28  		scd.deliver(signal)
    29  	}
    30  }
    31  
    32  func (sh *sequentialSignalHandler) Terminate() {
    33  	sh.mu.Lock()
    34  	defer sh.mu.Unlock()
    35  	if sh.closed {
    36  		return
    37  	}
    38  
    39  	for _, scd := range sh.signals {
    40  		scd.close()
    41  		close(scd.ch)
    42  	}
    43  	sh.closed = true
    44  	sh.signals = nil
    45  }
    46  
    47  func (sh *sequentialSignalHandler) AddSignal(ch chan<- *Signal) {
    48  	sh.mu.Lock()
    49  	defer sh.mu.Unlock()
    50  	if sh.closed {
    51  		return
    52  	}
    53  	sh.signals = append(sh.signals, newSequentialSignalChannelData(ch))
    54  }
    55  
    56  func (sh *sequentialSignalHandler) RemoveSignal(ch chan<- *Signal) {
    57  	sh.mu.Lock()
    58  	defer sh.mu.Unlock()
    59  	if sh.closed {
    60  		return
    61  	}
    62  	for i := len(sh.signals) - 1; i >= 0; i-- {
    63  		if ch == sh.signals[i].ch {
    64  			sh.signals[i].close()
    65  			copy(sh.signals[i:], sh.signals[i+1:])
    66  			sh.signals[len(sh.signals)-1] = nil
    67  			sh.signals = sh.signals[:len(sh.signals)-1]
    68  		}
    69  	}
    70  }
    71  
    72  type sequentialSignalChannelData struct {
    73  	ch   chan<- *Signal
    74  	in   chan *Signal
    75  	done chan struct{}
    76  }
    77  
    78  func newSequentialSignalChannelData(ch chan<- *Signal) *sequentialSignalChannelData {
    79  	scd := &sequentialSignalChannelData{
    80  		ch:   ch,
    81  		in:   make(chan *Signal),
    82  		done: make(chan struct{}),
    83  	}
    84  	go scd.bufferSignals()
    85  	return scd
    86  }
    87  
    88  func (scd *sequentialSignalChannelData) bufferSignals() {
    89  	defer close(scd.done)
    90  
    91  	// Ensure that signals are delivered to scd.ch in the same
    92  	// order they are received from scd.in.
    93  	var queue []*Signal
    94  	for {
    95  		if len(queue) == 0 {
    96  			signal, ok := <- scd.in
    97  			if !ok {
    98  				return
    99  			}
   100  			queue = append(queue, signal)
   101  		}
   102  		select {
   103  		case scd.ch <- queue[0]:
   104  			copy(queue, queue[1:])
   105  			queue[len(queue)-1] = nil
   106  			queue = queue[:len(queue)-1]
   107  		case signal, ok := <-scd.in:
   108  			if !ok {
   109  				return
   110  			}
   111  			queue = append(queue, signal)
   112  		}
   113  	}
   114  }
   115  
   116  func (scd *sequentialSignalChannelData) deliver(signal *Signal) {
   117  	scd.in <- signal
   118  }
   119  
   120  func (scd *sequentialSignalChannelData) close() {
   121  	close(scd.in)
   122  	// Ensure that bufferSignals() has exited and won't attempt
   123  	// any future sends on scd.ch
   124  	<-scd.done
   125  }
   126  

View as plain text