...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/watcher/opaque_ports_watcher.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination/watcher

     1  package watcher
     2  
     3  import (
     4  	"strconv"
     5  	"sync"
     6  	"time"
     7  
     8  	"github.com/linkerd/linkerd2/controller/k8s"
     9  	labels "github.com/linkerd/linkerd2/pkg/k8s"
    10  	"github.com/linkerd/linkerd2/pkg/util"
    11  	"github.com/prometheus/client_golang/prometheus"
    12  	"github.com/prometheus/client_golang/prometheus/promauto"
    13  	logging "github.com/sirupsen/logrus"
    14  	corev1 "k8s.io/api/core/v1"
    15  	"k8s.io/client-go/tools/cache"
    16  )
    17  
    18  type (
    19  	// OpaquePortsWatcher watches all the services in the cluster. If the
    20  	// opaque ports annotation is added to a service, the watcher will update
    21  	// listeners—if any—subscribed to that service.
    22  	OpaquePortsWatcher struct {
    23  		subscriptions      map[ServiceID]*svcSubscriptions
    24  		k8sAPI             *k8s.API
    25  		subscribersGauge   *prometheus.GaugeVec
    26  		log                *logging.Entry
    27  		defaultOpaquePorts map[uint32]struct{}
    28  		sync.RWMutex
    29  	}
    30  
    31  	svcSubscriptions struct {
    32  		opaquePorts map[uint32]struct{}
    33  		listeners   []OpaquePortsUpdateListener
    34  	}
    35  
    36  	// OpaquePortsUpdateListener is the interface that subscribers must implement.
    37  	OpaquePortsUpdateListener interface {
    38  		UpdateService(ports map[uint32]struct{})
    39  	}
    40  )
    41  
    42  var opaquePortsMetrics = promauto.NewGaugeVec(
    43  	prometheus.GaugeOpts{
    44  		Name: "service_subscribers",
    45  		Help: "Number of subscribers to Service changes.",
    46  	},
    47  	[]string{"namespace", "name"},
    48  )
    49  
    50  // NewOpaquePortsWatcher creates a OpaquePortsWatcher and begins watching for
    51  // k8sAPI for service changes.
    52  func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error) {
    53  	opw := &OpaquePortsWatcher{
    54  		subscriptions:      make(map[ServiceID]*svcSubscriptions),
    55  		k8sAPI:             k8sAPI,
    56  		subscribersGauge:   opaquePortsMetrics,
    57  		log:                log.WithField("component", "opaque-ports-watcher"),
    58  		defaultOpaquePorts: opaquePorts,
    59  	}
    60  	_, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    61  		AddFunc:    opw.addService,
    62  		DeleteFunc: opw.deleteService,
    63  		UpdateFunc: opw.updateService,
    64  	})
    65  	if err != nil {
    66  		return nil, err
    67  	}
    68  
    69  	return opw, nil
    70  }
    71  
    72  // Subscribe subscribes a listener to a service; each time the service
    73  // changes, the listener will be updated if the list of opaque ports
    74  // changes.
    75  func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdateListener) error {
    76  	opw.Lock()
    77  	defer opw.Unlock()
    78  	svc, _ := opw.k8sAPI.Svc().Lister().Services(id.Namespace).Get(id.Name)
    79  	if svc != nil && svc.Spec.Type == corev1.ServiceTypeExternalName {
    80  		return invalidService(id.String())
    81  	}
    82  	opw.log.Debugf("Starting watch on service %s", id)
    83  	var numListeners float64
    84  	ss, ok := opw.subscriptions[id]
    85  	if !ok {
    86  		// If there is no watched service, create a subscription for the service
    87  		// and no opaque ports
    88  		opw.subscriptions[id] = &svcSubscriptions{
    89  			opaquePorts: opw.defaultOpaquePorts,
    90  			listeners:   []OpaquePortsUpdateListener{listener},
    91  		}
    92  		numListeners = 1
    93  	} else {
    94  		// There are subscriptions for this service, so add the listener to the
    95  		// service listeners. If there are opaque ports for the service, update
    96  		// the listener with that value.
    97  		ss.listeners = append(ss.listeners, listener)
    98  		listener.UpdateService(ss.opaquePorts)
    99  		numListeners = float64(len(ss.listeners))
   100  	}
   101  
   102  	opw.subscribersGauge.With(id.Labels()).Set(numListeners)
   103  
   104  	return nil
   105  }
   106  
   107  // Unsubscribe unsubscribes a listener from service.
   108  func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpdateListener) {
   109  	opw.Lock()
   110  	defer opw.Unlock()
   111  	opw.log.Debugf("Stopping watch on service %s", id)
   112  	ss, ok := opw.subscriptions[id]
   113  	if !ok {
   114  		opw.log.Errorf("Cannot unsubscribe from unknown service %s", id)
   115  		return
   116  	}
   117  	for i, l := range ss.listeners {
   118  		if l == listener {
   119  			n := len(ss.listeners)
   120  			ss.listeners[i] = ss.listeners[n-1]
   121  			ss.listeners[n-1] = nil
   122  			ss.listeners = ss.listeners[:n-1]
   123  		}
   124  	}
   125  
   126  	labels := id.Labels()
   127  	if len(ss.listeners) > 0 {
   128  		opw.subscribersGauge.With(labels).Set(float64(len(ss.listeners)))
   129  	} else {
   130  		if !opw.subscribersGauge.Delete(labels) {
   131  			opw.log.Warnf("unable to delete service_subscribers metric with labels %s", labels)
   132  		}
   133  		delete(opw.subscriptions, id)
   134  	}
   135  }
   136  
   137  func (opw *OpaquePortsWatcher) updateService(oldObj interface{}, newObj interface{}) {
   138  	newSvc := newObj.(*corev1.Service)
   139  	oldSvc := oldObj.(*corev1.Service)
   140  
   141  	oldUpdated := latestUpdated(oldSvc.ManagedFields)
   142  	updated := latestUpdated(newSvc.ManagedFields)
   143  	if !updated.IsZero() && updated != oldUpdated {
   144  		delta := time.Since(updated)
   145  		serviceInformerLag.Observe(delta.Seconds())
   146  	}
   147  	opw.addService(newObj)
   148  }
   149  
   150  func (opw *OpaquePortsWatcher) addService(obj interface{}) {
   151  	opw.Lock()
   152  	defer opw.Unlock()
   153  	svc := obj.(*corev1.Service)
   154  	id := ServiceID{
   155  		Namespace: svc.Namespace,
   156  		Name:      svc.Name,
   157  	}
   158  	opaquePorts, ok, err := getServiceOpaquePortsAnnotation(svc)
   159  	if err != nil {
   160  		opw.log.Errorf("failed to get %s service opaque ports annotation: %s", id, err)
   161  		return
   162  	}
   163  	// If the opaque ports annotation was not set, then set the service's
   164  	// opaque ports to the default value.
   165  	if !ok {
   166  		opaquePorts = opw.defaultOpaquePorts
   167  	}
   168  	ss, ok := opw.subscriptions[id]
   169  	// If there are no subscriptions for this service, create one with the
   170  	// opaque ports.
   171  	if !ok {
   172  		opw.subscriptions[id] = &svcSubscriptions{
   173  			opaquePorts: opaquePorts,
   174  			listeners:   []OpaquePortsUpdateListener{},
   175  		}
   176  		return
   177  	}
   178  	// Do not send updates if there was no change in the opaque ports; if
   179  	// there was, send an update to each listener.
   180  	if portsEqual(ss.opaquePorts, opaquePorts) {
   181  		return
   182  	}
   183  	ss.opaquePorts = opaquePorts
   184  	for _, listener := range ss.listeners {
   185  		listener.UpdateService(ss.opaquePorts)
   186  	}
   187  }
   188  
   189  func (opw *OpaquePortsWatcher) deleteService(obj interface{}) {
   190  	opw.Lock()
   191  	defer opw.Unlock()
   192  	service, ok := obj.(*corev1.Service)
   193  	if !ok {
   194  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   195  		if !ok {
   196  			opw.log.Errorf("could not get object from DeletedFinalStateUnknown %#v", obj)
   197  			return
   198  		}
   199  		service, ok = tombstone.Obj.(*corev1.Service)
   200  		if !ok {
   201  			opw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj)
   202  			return
   203  		}
   204  	}
   205  	id := ServiceID{
   206  		Namespace: service.Namespace,
   207  		Name:      service.Name,
   208  	}
   209  	ss, ok := opw.subscriptions[id]
   210  	if !ok {
   211  		return
   212  	}
   213  	old := ss.opaquePorts
   214  	ss.opaquePorts = opw.defaultOpaquePorts
   215  	// Do not send an update if the service already had the default opaque ports
   216  	if portsEqual(old, ss.opaquePorts) {
   217  		return
   218  	}
   219  	for _, listener := range ss.listeners {
   220  		listener.UpdateService(ss.opaquePorts)
   221  	}
   222  }
   223  
   224  func getServiceOpaquePortsAnnotation(svc *corev1.Service) (map[uint32]struct{}, bool, error) {
   225  	annotation, ok := svc.Annotations[labels.ProxyOpaquePortsAnnotation]
   226  	if !ok {
   227  		return nil, false, nil
   228  	}
   229  	opaquePorts := make(map[uint32]struct{})
   230  	if annotation != "" {
   231  		for _, portStr := range parseServiceOpaquePorts(annotation, svc.Spec.Ports) {
   232  			port, err := strconv.ParseUint(portStr, 10, 32)
   233  			if err != nil {
   234  				return nil, true, err
   235  			}
   236  			opaquePorts[uint32(port)] = struct{}{}
   237  		}
   238  	}
   239  	return opaquePorts, true, nil
   240  }
   241  
   242  func parseServiceOpaquePorts(annotation string, sps []corev1.ServicePort) []string {
   243  	portRanges := util.GetPortRanges(annotation)
   244  	var values []string
   245  	for _, pr := range portRanges {
   246  		port, named := isNamed(pr, sps)
   247  		if named {
   248  			values = append(values, strconv.Itoa(int(port)))
   249  		} else {
   250  			pr, err := util.ParsePortRange(pr)
   251  			if err != nil {
   252  				logging.Warnf("Invalid port range [%v]: %s", pr, err)
   253  				continue
   254  			}
   255  			for i := pr.LowerBound; i <= pr.UpperBound; i++ {
   256  				values = append(values, strconv.Itoa(i))
   257  			}
   258  		}
   259  	}
   260  	return values
   261  }
   262  
   263  // isNamed checks if a port range is actually a service named port (e.g.
   264  // `123-456` is a valid name, but also is a valid range); all port names must
   265  // be checked before making it a list.
   266  func isNamed(pr string, sps []corev1.ServicePort) (int32, bool) {
   267  	for _, sp := range sps {
   268  		if sp.Name == pr {
   269  			return sp.Port, true
   270  		}
   271  	}
   272  	return 0, false
   273  }
   274  
   275  func portsEqual(x, y map[uint32]struct{}) bool {
   276  	if len(x) != len(y) {
   277  		return false
   278  	}
   279  	for port := range x {
   280  		_, ok := y[port]
   281  		if !ok {
   282  			return false
   283  		}
   284  	}
   285  	return true
   286  }
   287  

View as plain text