...

Source file src/github.com/coreos/go-systemd/v22/dbus/subscription.go

Documentation: github.com/coreos/go-systemd/v22/dbus

     1  // Copyright 2015 CoreOS, Inc.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package dbus
    16  
    17  import (
    18  	"errors"
    19  	"log"
    20  	"time"
    21  
    22  	"github.com/godbus/dbus/v5"
    23  )
    24  
    25  const (
    26  	cleanIgnoreInterval = int64(10 * time.Second)
    27  	ignoreInterval      = int64(30 * time.Millisecond)
    28  )
    29  
    30  // Subscribe sets up this connection to subscribe to all systemd dbus events.
    31  // This is required before calling SubscribeUnits. When the connection closes
    32  // systemd will automatically stop sending signals so there is no need to
    33  // explicitly call Unsubscribe().
    34  func (c *Conn) Subscribe() error {
    35  	c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
    36  		"type='signal',interface='org.freedesktop.systemd1.Manager',member='UnitNew'")
    37  	c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
    38  		"type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'")
    39  
    40  	return c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store()
    41  }
    42  
    43  // Unsubscribe this connection from systemd dbus events.
    44  func (c *Conn) Unsubscribe() error {
    45  	return c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store()
    46  }
    47  
    48  func (c *Conn) dispatch() {
    49  	ch := make(chan *dbus.Signal, signalBuffer)
    50  
    51  	c.sigconn.Signal(ch)
    52  
    53  	go func() {
    54  		for {
    55  			signal, ok := <-ch
    56  			if !ok {
    57  				return
    58  			}
    59  
    60  			if signal.Name == "org.freedesktop.systemd1.Manager.JobRemoved" {
    61  				c.jobComplete(signal)
    62  			}
    63  
    64  			if c.subStateSubscriber.updateCh == nil &&
    65  				c.propertiesSubscriber.updateCh == nil {
    66  				continue
    67  			}
    68  
    69  			var unitPath dbus.ObjectPath
    70  			switch signal.Name {
    71  			case "org.freedesktop.systemd1.Manager.JobRemoved":
    72  				unitName := signal.Body[2].(string)
    73  				c.sysobj.Call("org.freedesktop.systemd1.Manager.GetUnit", 0, unitName).Store(&unitPath)
    74  			case "org.freedesktop.systemd1.Manager.UnitNew":
    75  				unitPath = signal.Body[1].(dbus.ObjectPath)
    76  			case "org.freedesktop.DBus.Properties.PropertiesChanged":
    77  				if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" {
    78  					unitPath = signal.Path
    79  
    80  					if len(signal.Body) >= 2 {
    81  						if changed, ok := signal.Body[1].(map[string]dbus.Variant); ok {
    82  							c.sendPropertiesUpdate(unitPath, changed)
    83  						}
    84  					}
    85  				}
    86  			}
    87  
    88  			if unitPath == dbus.ObjectPath("") {
    89  				continue
    90  			}
    91  
    92  			c.sendSubStateUpdate(unitPath)
    93  		}
    94  	}()
    95  }
    96  
    97  // SubscribeUnits returns two unbuffered channels which will receive all changed units every
    98  // interval.  Deleted units are sent as nil.
    99  func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
   100  	return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
   101  }
   102  
   103  // SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer
   104  // size of the channels, the comparison function for detecting changes and a filter
   105  // function for cutting down on the noise that your channel receives.
   106  func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
   107  	old := make(map[string]*UnitStatus)
   108  	statusChan := make(chan map[string]*UnitStatus, buffer)
   109  	errChan := make(chan error, buffer)
   110  
   111  	go func() {
   112  		for {
   113  			timerChan := time.After(interval)
   114  
   115  			units, err := c.ListUnits()
   116  			if err == nil {
   117  				cur := make(map[string]*UnitStatus)
   118  				for i := range units {
   119  					if filterUnit != nil && filterUnit(units[i].Name) {
   120  						continue
   121  					}
   122  					cur[units[i].Name] = &units[i]
   123  				}
   124  
   125  				// add all new or changed units
   126  				changed := make(map[string]*UnitStatus)
   127  				for n, u := range cur {
   128  					if oldU, ok := old[n]; !ok || isChanged(oldU, u) {
   129  						changed[n] = u
   130  					}
   131  					delete(old, n)
   132  				}
   133  
   134  				// add all deleted units
   135  				for oldN := range old {
   136  					changed[oldN] = nil
   137  				}
   138  
   139  				old = cur
   140  
   141  				if len(changed) != 0 {
   142  					statusChan <- changed
   143  				}
   144  			} else {
   145  				errChan <- err
   146  			}
   147  
   148  			<-timerChan
   149  		}
   150  	}()
   151  
   152  	return statusChan, errChan
   153  }
   154  
   155  type SubStateUpdate struct {
   156  	UnitName string
   157  	SubState string
   158  }
   159  
   160  // SetSubStateSubscriber writes to updateCh when any unit's substate changes.
   161  // Although this writes to updateCh on every state change, the reported state
   162  // may be more recent than the change that generated it (due to an unavoidable
   163  // race in the systemd dbus interface).  That is, this method provides a good
   164  // way to keep a current view of all units' states, but is not guaranteed to
   165  // show every state transition they go through.  Furthermore, state changes
   166  // will only be written to the channel with non-blocking writes.  If updateCh
   167  // is full, it attempts to write an error to errCh; if errCh is full, the error
   168  // passes silently.
   169  func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) {
   170  	if c == nil {
   171  		msg := "nil receiver"
   172  		select {
   173  		case errCh <- errors.New(msg):
   174  		default:
   175  			log.Printf("full error channel while reporting: %s\n", msg)
   176  		}
   177  		return
   178  	}
   179  
   180  	c.subStateSubscriber.Lock()
   181  	defer c.subStateSubscriber.Unlock()
   182  	c.subStateSubscriber.updateCh = updateCh
   183  	c.subStateSubscriber.errCh = errCh
   184  }
   185  
   186  func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {
   187  	c.subStateSubscriber.Lock()
   188  	defer c.subStateSubscriber.Unlock()
   189  
   190  	if c.subStateSubscriber.updateCh == nil {
   191  		return
   192  	}
   193  
   194  	isIgnored := c.shouldIgnore(unitPath)
   195  	defer c.cleanIgnore()
   196  	if isIgnored {
   197  		return
   198  	}
   199  
   200  	info, err := c.GetUnitPathProperties(unitPath)
   201  	if err != nil {
   202  		select {
   203  		case c.subStateSubscriber.errCh <- err:
   204  		default:
   205  			log.Printf("full error channel while reporting: %s\n", err)
   206  		}
   207  		return
   208  	}
   209  	defer c.updateIgnore(unitPath, info)
   210  
   211  	name, ok := info["Id"].(string)
   212  	if !ok {
   213  		msg := "failed to cast info.Id"
   214  		select {
   215  		case c.subStateSubscriber.errCh <- errors.New(msg):
   216  		default:
   217  			log.Printf("full error channel while reporting: %s\n", err)
   218  		}
   219  		return
   220  	}
   221  	substate, ok := info["SubState"].(string)
   222  	if !ok {
   223  		msg := "failed to cast info.SubState"
   224  		select {
   225  		case c.subStateSubscriber.errCh <- errors.New(msg):
   226  		default:
   227  			log.Printf("full error channel while reporting: %s\n", msg)
   228  		}
   229  		return
   230  	}
   231  
   232  	update := &SubStateUpdate{name, substate}
   233  	select {
   234  	case c.subStateSubscriber.updateCh <- update:
   235  	default:
   236  		msg := "update channel is full"
   237  		select {
   238  		case c.subStateSubscriber.errCh <- errors.New(msg):
   239  		default:
   240  			log.Printf("full error channel while reporting: %s\n", msg)
   241  		}
   242  		return
   243  	}
   244  }
   245  
   246  // The ignore functions work around a wart in the systemd dbus interface.
   247  // Requesting the properties of an unloaded unit will cause systemd to send a
   248  // pair of UnitNew/UnitRemoved signals.  Because we need to get a unit's
   249  // properties on UnitNew (as that's the only indication of a new unit coming up
   250  // for the first time), we would enter an infinite loop if we did not attempt
   251  // to detect and ignore these spurious signals.  The signal themselves are
   252  // indistinguishable from relevant ones, so we (somewhat hackishly) ignore an
   253  // unloaded unit's signals for a short time after requesting its properties.
   254  // This means that we will miss e.g. a transient unit being restarted
   255  // *immediately* upon failure and also a transient unit being started
   256  // immediately after requesting its status (with systemctl status, for example,
   257  // because this causes a UnitNew signal to be sent which then causes us to fetch
   258  // the properties).
   259  
   260  func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool {
   261  	t, ok := c.subStateSubscriber.ignore[path]
   262  	return ok && t >= time.Now().UnixNano()
   263  }
   264  
   265  func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) {
   266  	loadState, ok := info["LoadState"].(string)
   267  	if !ok {
   268  		return
   269  	}
   270  
   271  	// unit is unloaded - it will trigger bad systemd dbus behavior
   272  	if loadState == "not-found" {
   273  		c.subStateSubscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
   274  	}
   275  }
   276  
   277  // without this, ignore would grow unboundedly over time
   278  func (c *Conn) cleanIgnore() {
   279  	now := time.Now().UnixNano()
   280  	if c.subStateSubscriber.cleanIgnore < now {
   281  		c.subStateSubscriber.cleanIgnore = now + cleanIgnoreInterval
   282  
   283  		for p, t := range c.subStateSubscriber.ignore {
   284  			if t < now {
   285  				delete(c.subStateSubscriber.ignore, p)
   286  			}
   287  		}
   288  	}
   289  }
   290  
   291  // PropertiesUpdate holds a map of a unit's changed properties
   292  type PropertiesUpdate struct {
   293  	UnitName string
   294  	Changed  map[string]dbus.Variant
   295  }
   296  
   297  // SetPropertiesSubscriber writes to updateCh when any unit's properties
   298  // change. Every property change reported by systemd will be sent; that is, no
   299  // transitions will be "missed" (as they might be with SetSubStateSubscriber).
   300  // However, state changes will only be written to the channel with non-blocking
   301  // writes.  If updateCh is full, it attempts to write an error to errCh; if
   302  // errCh is full, the error passes silently.
   303  func (c *Conn) SetPropertiesSubscriber(updateCh chan<- *PropertiesUpdate, errCh chan<- error) {
   304  	c.propertiesSubscriber.Lock()
   305  	defer c.propertiesSubscriber.Unlock()
   306  	c.propertiesSubscriber.updateCh = updateCh
   307  	c.propertiesSubscriber.errCh = errCh
   308  }
   309  
   310  // we don't need to worry about shouldIgnore() here because
   311  // sendPropertiesUpdate doesn't call GetProperties()
   312  func (c *Conn) sendPropertiesUpdate(unitPath dbus.ObjectPath, changedProps map[string]dbus.Variant) {
   313  	c.propertiesSubscriber.Lock()
   314  	defer c.propertiesSubscriber.Unlock()
   315  
   316  	if c.propertiesSubscriber.updateCh == nil {
   317  		return
   318  	}
   319  
   320  	update := &PropertiesUpdate{unitName(unitPath), changedProps}
   321  
   322  	select {
   323  	case c.propertiesSubscriber.updateCh <- update:
   324  	default:
   325  		msg := "update channel is full"
   326  		select {
   327  		case c.propertiesSubscriber.errCh <- errors.New(msg):
   328  		default:
   329  			log.Printf("full error channel while reporting: %s\n", msg)
   330  		}
   331  		return
   332  	}
   333  }
   334  

View as plain text