1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package dbus
16
17 import (
18 "errors"
19 "log"
20 "time"
21
22 "github.com/godbus/dbus/v5"
23 )
24
25 const (
26 cleanIgnoreInterval = int64(10 * time.Second)
27 ignoreInterval = int64(30 * time.Millisecond)
28 )
29
30
31
32
33
34 func (c *Conn) Subscribe() error {
35 c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
36 "type='signal',interface='org.freedesktop.systemd1.Manager',member='UnitNew'")
37 c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
38 "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'")
39
40 return c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store()
41 }
42
43
44 func (c *Conn) Unsubscribe() error {
45 return c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store()
46 }
47
48 func (c *Conn) dispatch() {
49 ch := make(chan *dbus.Signal, signalBuffer)
50
51 c.sigconn.Signal(ch)
52
53 go func() {
54 for {
55 signal, ok := <-ch
56 if !ok {
57 return
58 }
59
60 if signal.Name == "org.freedesktop.systemd1.Manager.JobRemoved" {
61 c.jobComplete(signal)
62 }
63
64 if c.subStateSubscriber.updateCh == nil &&
65 c.propertiesSubscriber.updateCh == nil {
66 continue
67 }
68
69 var unitPath dbus.ObjectPath
70 switch signal.Name {
71 case "org.freedesktop.systemd1.Manager.JobRemoved":
72 unitName := signal.Body[2].(string)
73 c.sysobj.Call("org.freedesktop.systemd1.Manager.GetUnit", 0, unitName).Store(&unitPath)
74 case "org.freedesktop.systemd1.Manager.UnitNew":
75 unitPath = signal.Body[1].(dbus.ObjectPath)
76 case "org.freedesktop.DBus.Properties.PropertiesChanged":
77 if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" {
78 unitPath = signal.Path
79
80 if len(signal.Body) >= 2 {
81 if changed, ok := signal.Body[1].(map[string]dbus.Variant); ok {
82 c.sendPropertiesUpdate(unitPath, changed)
83 }
84 }
85 }
86 }
87
88 if unitPath == dbus.ObjectPath("") {
89 continue
90 }
91
92 c.sendSubStateUpdate(unitPath)
93 }
94 }()
95 }
96
97
98
99 func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
100 return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
101 }
102
103
104
105
106 func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
107 old := make(map[string]*UnitStatus)
108 statusChan := make(chan map[string]*UnitStatus, buffer)
109 errChan := make(chan error, buffer)
110
111 go func() {
112 for {
113 timerChan := time.After(interval)
114
115 units, err := c.ListUnits()
116 if err == nil {
117 cur := make(map[string]*UnitStatus)
118 for i := range units {
119 if filterUnit != nil && filterUnit(units[i].Name) {
120 continue
121 }
122 cur[units[i].Name] = &units[i]
123 }
124
125
126 changed := make(map[string]*UnitStatus)
127 for n, u := range cur {
128 if oldU, ok := old[n]; !ok || isChanged(oldU, u) {
129 changed[n] = u
130 }
131 delete(old, n)
132 }
133
134
135 for oldN := range old {
136 changed[oldN] = nil
137 }
138
139 old = cur
140
141 if len(changed) != 0 {
142 statusChan <- changed
143 }
144 } else {
145 errChan <- err
146 }
147
148 <-timerChan
149 }
150 }()
151
152 return statusChan, errChan
153 }
154
155 type SubStateUpdate struct {
156 UnitName string
157 SubState string
158 }
159
160
161
162
163
164
165
166
167
168
169 func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) {
170 if c == nil {
171 msg := "nil receiver"
172 select {
173 case errCh <- errors.New(msg):
174 default:
175 log.Printf("full error channel while reporting: %s\n", msg)
176 }
177 return
178 }
179
180 c.subStateSubscriber.Lock()
181 defer c.subStateSubscriber.Unlock()
182 c.subStateSubscriber.updateCh = updateCh
183 c.subStateSubscriber.errCh = errCh
184 }
185
186 func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {
187 c.subStateSubscriber.Lock()
188 defer c.subStateSubscriber.Unlock()
189
190 if c.subStateSubscriber.updateCh == nil {
191 return
192 }
193
194 isIgnored := c.shouldIgnore(unitPath)
195 defer c.cleanIgnore()
196 if isIgnored {
197 return
198 }
199
200 info, err := c.GetUnitPathProperties(unitPath)
201 if err != nil {
202 select {
203 case c.subStateSubscriber.errCh <- err:
204 default:
205 log.Printf("full error channel while reporting: %s\n", err)
206 }
207 return
208 }
209 defer c.updateIgnore(unitPath, info)
210
211 name, ok := info["Id"].(string)
212 if !ok {
213 msg := "failed to cast info.Id"
214 select {
215 case c.subStateSubscriber.errCh <- errors.New(msg):
216 default:
217 log.Printf("full error channel while reporting: %s\n", err)
218 }
219 return
220 }
221 substate, ok := info["SubState"].(string)
222 if !ok {
223 msg := "failed to cast info.SubState"
224 select {
225 case c.subStateSubscriber.errCh <- errors.New(msg):
226 default:
227 log.Printf("full error channel while reporting: %s\n", msg)
228 }
229 return
230 }
231
232 update := &SubStateUpdate{name, substate}
233 select {
234 case c.subStateSubscriber.updateCh <- update:
235 default:
236 msg := "update channel is full"
237 select {
238 case c.subStateSubscriber.errCh <- errors.New(msg):
239 default:
240 log.Printf("full error channel while reporting: %s\n", msg)
241 }
242 return
243 }
244 }
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260 func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool {
261 t, ok := c.subStateSubscriber.ignore[path]
262 return ok && t >= time.Now().UnixNano()
263 }
264
265 func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) {
266 loadState, ok := info["LoadState"].(string)
267 if !ok {
268 return
269 }
270
271
272 if loadState == "not-found" {
273 c.subStateSubscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
274 }
275 }
276
277
278 func (c *Conn) cleanIgnore() {
279 now := time.Now().UnixNano()
280 if c.subStateSubscriber.cleanIgnore < now {
281 c.subStateSubscriber.cleanIgnore = now + cleanIgnoreInterval
282
283 for p, t := range c.subStateSubscriber.ignore {
284 if t < now {
285 delete(c.subStateSubscriber.ignore, p)
286 }
287 }
288 }
289 }
290
291
292 type PropertiesUpdate struct {
293 UnitName string
294 Changed map[string]dbus.Variant
295 }
296
297
298
299
300
301
302
303 func (c *Conn) SetPropertiesSubscriber(updateCh chan<- *PropertiesUpdate, errCh chan<- error) {
304 c.propertiesSubscriber.Lock()
305 defer c.propertiesSubscriber.Unlock()
306 c.propertiesSubscriber.updateCh = updateCh
307 c.propertiesSubscriber.errCh = errCh
308 }
309
310
311
312 func (c *Conn) sendPropertiesUpdate(unitPath dbus.ObjectPath, changedProps map[string]dbus.Variant) {
313 c.propertiesSubscriber.Lock()
314 defer c.propertiesSubscriber.Unlock()
315
316 if c.propertiesSubscriber.updateCh == nil {
317 return
318 }
319
320 update := &PropertiesUpdate{unitName(unitPath), changedProps}
321
322 select {
323 case c.propertiesSubscriber.updateCh <- update:
324 default:
325 msg := "update channel is full"
326 select {
327 case c.propertiesSubscriber.errCh <- errors.New(msg):
328 default:
329 log.Printf("full error channel while reporting: %s\n", msg)
330 }
331 return
332 }
333 }
334
View as plain text