...
1 package dbus
2
3 import (
4 "sync"
5 )
6
7
8
9
10
11 func NewSequentialSignalHandler() SignalHandler {
12 return &sequentialSignalHandler{}
13 }
14
15 type sequentialSignalHandler struct {
16 mu sync.RWMutex
17 closed bool
18 signals []*sequentialSignalChannelData
19 }
20
21 func (sh *sequentialSignalHandler) DeliverSignal(intf, name string, signal *Signal) {
22 sh.mu.RLock()
23 defer sh.mu.RUnlock()
24 if sh.closed {
25 return
26 }
27 for _, scd := range sh.signals {
28 scd.deliver(signal)
29 }
30 }
31
32 func (sh *sequentialSignalHandler) Terminate() {
33 sh.mu.Lock()
34 defer sh.mu.Unlock()
35 if sh.closed {
36 return
37 }
38
39 for _, scd := range sh.signals {
40 scd.close()
41 close(scd.ch)
42 }
43 sh.closed = true
44 sh.signals = nil
45 }
46
47 func (sh *sequentialSignalHandler) AddSignal(ch chan<- *Signal) {
48 sh.mu.Lock()
49 defer sh.mu.Unlock()
50 if sh.closed {
51 return
52 }
53 sh.signals = append(sh.signals, newSequentialSignalChannelData(ch))
54 }
55
56 func (sh *sequentialSignalHandler) RemoveSignal(ch chan<- *Signal) {
57 sh.mu.Lock()
58 defer sh.mu.Unlock()
59 if sh.closed {
60 return
61 }
62 for i := len(sh.signals) - 1; i >= 0; i-- {
63 if ch == sh.signals[i].ch {
64 sh.signals[i].close()
65 copy(sh.signals[i:], sh.signals[i+1:])
66 sh.signals[len(sh.signals)-1] = nil
67 sh.signals = sh.signals[:len(sh.signals)-1]
68 }
69 }
70 }
71
72 type sequentialSignalChannelData struct {
73 ch chan<- *Signal
74 in chan *Signal
75 done chan struct{}
76 }
77
78 func newSequentialSignalChannelData(ch chan<- *Signal) *sequentialSignalChannelData {
79 scd := &sequentialSignalChannelData{
80 ch: ch,
81 in: make(chan *Signal),
82 done: make(chan struct{}),
83 }
84 go scd.bufferSignals()
85 return scd
86 }
87
88 func (scd *sequentialSignalChannelData) bufferSignals() {
89 defer close(scd.done)
90
91
92
93 var queue []*Signal
94 for {
95 if len(queue) == 0 {
96 signal, ok := <- scd.in
97 if !ok {
98 return
99 }
100 queue = append(queue, signal)
101 }
102 select {
103 case scd.ch <- queue[0]:
104 copy(queue, queue[1:])
105 queue[len(queue)-1] = nil
106 queue = queue[:len(queue)-1]
107 case signal, ok := <-scd.in:
108 if !ok {
109 return
110 }
111 queue = append(queue, signal)
112 }
113 }
114 }
115
116 func (scd *sequentialSignalChannelData) deliver(signal *Signal) {
117 scd.in <- signal
118 }
119
120 func (scd *sequentialSignalChannelData) close() {
121 close(scd.in)
122
123
124 <-scd.done
125 }
126
View as plain text