package watcher import ( "strconv" "sync" "time" "github.com/linkerd/linkerd2/controller/k8s" labels "github.com/linkerd/linkerd2/pkg/k8s" "github.com/linkerd/linkerd2/pkg/util" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" ) type ( // OpaquePortsWatcher watches all the services in the cluster. If the // opaque ports annotation is added to a service, the watcher will update // listeners—if any—subscribed to that service. OpaquePortsWatcher struct { subscriptions map[ServiceID]*svcSubscriptions k8sAPI *k8s.API subscribersGauge *prometheus.GaugeVec log *logging.Entry defaultOpaquePorts map[uint32]struct{} sync.RWMutex } svcSubscriptions struct { opaquePorts map[uint32]struct{} listeners []OpaquePortsUpdateListener } // OpaquePortsUpdateListener is the interface that subscribers must implement. OpaquePortsUpdateListener interface { UpdateService(ports map[uint32]struct{}) } ) var opaquePortsMetrics = promauto.NewGaugeVec( prometheus.GaugeOpts{ Name: "service_subscribers", Help: "Number of subscribers to Service changes.", }, []string{"namespace", "name"}, ) // NewOpaquePortsWatcher creates a OpaquePortsWatcher and begins watching for // k8sAPI for service changes. func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error) { opw := &OpaquePortsWatcher{ subscriptions: make(map[ServiceID]*svcSubscriptions), k8sAPI: k8sAPI, subscribersGauge: opaquePortsMetrics, log: log.WithField("component", "opaque-ports-watcher"), defaultOpaquePorts: opaquePorts, } _, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: opw.addService, DeleteFunc: opw.deleteService, UpdateFunc: opw.updateService, }) if err != nil { return nil, err } return opw, nil } // Subscribe subscribes a listener to a service; each time the service // changes, the listener will be updated if the list of opaque ports // changes. func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdateListener) error { opw.Lock() defer opw.Unlock() svc, _ := opw.k8sAPI.Svc().Lister().Services(id.Namespace).Get(id.Name) if svc != nil && svc.Spec.Type == corev1.ServiceTypeExternalName { return invalidService(id.String()) } opw.log.Debugf("Starting watch on service %s", id) var numListeners float64 ss, ok := opw.subscriptions[id] if !ok { // If there is no watched service, create a subscription for the service // and no opaque ports opw.subscriptions[id] = &svcSubscriptions{ opaquePorts: opw.defaultOpaquePorts, listeners: []OpaquePortsUpdateListener{listener}, } numListeners = 1 } else { // There are subscriptions for this service, so add the listener to the // service listeners. If there are opaque ports for the service, update // the listener with that value. ss.listeners = append(ss.listeners, listener) listener.UpdateService(ss.opaquePorts) numListeners = float64(len(ss.listeners)) } opw.subscribersGauge.With(id.Labels()).Set(numListeners) return nil } // Unsubscribe unsubscribes a listener from service. func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpdateListener) { opw.Lock() defer opw.Unlock() opw.log.Debugf("Stopping watch on service %s", id) ss, ok := opw.subscriptions[id] if !ok { opw.log.Errorf("Cannot unsubscribe from unknown service %s", id) return } for i, l := range ss.listeners { if l == listener { n := len(ss.listeners) ss.listeners[i] = ss.listeners[n-1] ss.listeners[n-1] = nil ss.listeners = ss.listeners[:n-1] } } labels := id.Labels() if len(ss.listeners) > 0 { opw.subscribersGauge.With(labels).Set(float64(len(ss.listeners))) } else { if !opw.subscribersGauge.Delete(labels) { opw.log.Warnf("unable to delete service_subscribers metric with labels %s", labels) } delete(opw.subscriptions, id) } } func (opw *OpaquePortsWatcher) updateService(oldObj interface{}, newObj interface{}) { newSvc := newObj.(*corev1.Service) oldSvc := oldObj.(*corev1.Service) oldUpdated := latestUpdated(oldSvc.ManagedFields) updated := latestUpdated(newSvc.ManagedFields) if !updated.IsZero() && updated != oldUpdated { delta := time.Since(updated) serviceInformerLag.Observe(delta.Seconds()) } opw.addService(newObj) } func (opw *OpaquePortsWatcher) addService(obj interface{}) { opw.Lock() defer opw.Unlock() svc := obj.(*corev1.Service) id := ServiceID{ Namespace: svc.Namespace, Name: svc.Name, } opaquePorts, ok, err := getServiceOpaquePortsAnnotation(svc) if err != nil { opw.log.Errorf("failed to get %s service opaque ports annotation: %s", id, err) return } // If the opaque ports annotation was not set, then set the service's // opaque ports to the default value. if !ok { opaquePorts = opw.defaultOpaquePorts } ss, ok := opw.subscriptions[id] // If there are no subscriptions for this service, create one with the // opaque ports. if !ok { opw.subscriptions[id] = &svcSubscriptions{ opaquePorts: opaquePorts, listeners: []OpaquePortsUpdateListener{}, } return } // Do not send updates if there was no change in the opaque ports; if // there was, send an update to each listener. if portsEqual(ss.opaquePorts, opaquePorts) { return } ss.opaquePorts = opaquePorts for _, listener := range ss.listeners { listener.UpdateService(ss.opaquePorts) } } func (opw *OpaquePortsWatcher) deleteService(obj interface{}) { opw.Lock() defer opw.Unlock() service, ok := obj.(*corev1.Service) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { opw.log.Errorf("could not get object from DeletedFinalStateUnknown %#v", obj) return } service, ok = tombstone.Obj.(*corev1.Service) if !ok { opw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj) return } } id := ServiceID{ Namespace: service.Namespace, Name: service.Name, } ss, ok := opw.subscriptions[id] if !ok { return } old := ss.opaquePorts ss.opaquePorts = opw.defaultOpaquePorts // Do not send an update if the service already had the default opaque ports if portsEqual(old, ss.opaquePorts) { return } for _, listener := range ss.listeners { listener.UpdateService(ss.opaquePorts) } } func getServiceOpaquePortsAnnotation(svc *corev1.Service) (map[uint32]struct{}, bool, error) { annotation, ok := svc.Annotations[labels.ProxyOpaquePortsAnnotation] if !ok { return nil, false, nil } opaquePorts := make(map[uint32]struct{}) if annotation != "" { for _, portStr := range parseServiceOpaquePorts(annotation, svc.Spec.Ports) { port, err := strconv.ParseUint(portStr, 10, 32) if err != nil { return nil, true, err } opaquePorts[uint32(port)] = struct{}{} } } return opaquePorts, true, nil } func parseServiceOpaquePorts(annotation string, sps []corev1.ServicePort) []string { portRanges := util.GetPortRanges(annotation) var values []string for _, pr := range portRanges { port, named := isNamed(pr, sps) if named { values = append(values, strconv.Itoa(int(port))) } else { pr, err := util.ParsePortRange(pr) if err != nil { logging.Warnf("Invalid port range [%v]: %s", pr, err) continue } for i := pr.LowerBound; i <= pr.UpperBound; i++ { values = append(values, strconv.Itoa(i)) } } } return values } // isNamed checks if a port range is actually a service named port (e.g. // `123-456` is a valid name, but also is a valid range); all port names must // be checked before making it a list. func isNamed(pr string, sps []corev1.ServicePort) (int32, bool) { for _, sp := range sps { if sp.Name == pr { return sp.Port, true } } return 0, false } func portsEqual(x, y map[uint32]struct{}) bool { if len(x) != len(y) { return false } for port := range x { _, ok := y[port] if !ok { return false } } return true }