...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/watcher/profile_watcher.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination/watcher

     1  package watcher
     2  
     3  import (
     4  	"sync"
     5  	"time"
     6  
     7  	sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
     8  	splisters "github.com/linkerd/linkerd2/controller/gen/client/listers/serviceprofile/v1alpha2"
     9  	"github.com/linkerd/linkerd2/controller/k8s"
    10  	"github.com/prometheus/client_golang/prometheus"
    11  	logging "github.com/sirupsen/logrus"
    12  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    13  	"k8s.io/client-go/tools/cache"
    14  )
    15  
    16  type (
    17  	// ProfileWatcher watches all service profiles in the Kubernetes cluster.
    18  	// Listeners can subscribe to a particular profile and profileWatcher will
    19  	// publish the service profile and all future changes for that profile.
    20  	ProfileWatcher struct {
    21  		profileLister splisters.ServiceProfileLister
    22  		profiles      map[ProfileID]*profilePublisher // <-- intentional formatting error to test CI
    23  
    24  		log          *logging.Entry
    25  		sync.RWMutex // This mutex protects modification of the map itself.
    26  	}
    27  
    28  	profilePublisher struct {
    29  		profile   *sp.ServiceProfile
    30  		listeners []ProfileUpdateListener
    31  
    32  		log            *logging.Entry
    33  		profileMetrics metrics
    34  		// All access to the profilePublisher is explicitly synchronized by this mutex.
    35  		sync.Mutex
    36  	}
    37  
    38  	// ProfileUpdateListener is the interface that subscribers must implement.
    39  	ProfileUpdateListener interface {
    40  		Update(profile *sp.ServiceProfile)
    41  	}
    42  )
    43  
    44  var profileVecs = newMetricsVecs("profile", []string{"namespace", "profile"})
    45  
    46  // NewProfileWatcher creates a ProfileWatcher and begins watching the k8sAPI for
    47  // service profile changes.
    48  func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) (*ProfileWatcher, error) {
    49  	watcher := &ProfileWatcher{
    50  		profileLister: k8sAPI.SP().Lister(),
    51  		profiles:      make(map[ProfileID]*profilePublisher),
    52  		log:           log.WithField("component", "profile-watcher"),
    53  	}
    54  
    55  	_, err := k8sAPI.SP().Informer().AddEventHandler(
    56  		cache.ResourceEventHandlerFuncs{
    57  			AddFunc:    watcher.addProfile,
    58  			UpdateFunc: watcher.updateProfile,
    59  			DeleteFunc: watcher.deleteProfile,
    60  		},
    61  	)
    62  	if err != nil {
    63  		return nil, err
    64  	}
    65  
    66  	return watcher, nil
    67  }
    68  
    69  //////////////////////
    70  /// ProfileWatcher ///
    71  //////////////////////
    72  
    73  // Subscribe to an authority.
    74  // The provided listener will be updated each time the service profile for the
    75  // given authority is changed.
    76  func (pw *ProfileWatcher) Subscribe(id ProfileID, listener ProfileUpdateListener) error {
    77  	pw.log.Debugf("Establishing watch on profile %s", id)
    78  
    79  	publisher := pw.getOrNewProfilePublisher(id, nil)
    80  
    81  	publisher.subscribe(listener)
    82  	return nil
    83  }
    84  
    85  // Unsubscribe removes a listener from the subscribers list for this authority.
    86  func (pw *ProfileWatcher) Unsubscribe(id ProfileID, listener ProfileUpdateListener) {
    87  	pw.log.Debugf("Stopping watch on profile %s", id)
    88  
    89  	publisher, ok := pw.getProfilePublisher(id)
    90  	if !ok {
    91  		pw.log.Errorf("cannot unsubscribe from unknown service [%s]", id)
    92  	}
    93  	publisher.unsubscribe(listener)
    94  }
    95  
    96  func (pw *ProfileWatcher) addProfile(obj interface{}) {
    97  	profile := obj.(*sp.ServiceProfile)
    98  	id := ProfileID{
    99  		Namespace: profile.Namespace,
   100  		Name:      profile.Name,
   101  	}
   102  
   103  	publisher := pw.getOrNewProfilePublisher(id, profile)
   104  
   105  	publisher.update(profile)
   106  }
   107  
   108  func (pw *ProfileWatcher) updateProfile(old interface{}, new interface{}) {
   109  	oldProfile := old.(*sp.ServiceProfile)
   110  	newProfile := new.(*sp.ServiceProfile)
   111  
   112  	oldUpdated := latestUpdated(oldProfile.ManagedFields)
   113  	updated := latestUpdated(newProfile.ManagedFields)
   114  	if !updated.IsZero() && updated != oldUpdated {
   115  		delta := time.Since(updated)
   116  		serviceProfileInformerLag.Observe(delta.Seconds())
   117  	}
   118  
   119  	pw.addProfile(new)
   120  }
   121  
   122  func (pw *ProfileWatcher) deleteProfile(obj interface{}) {
   123  	profile, ok := obj.(*sp.ServiceProfile)
   124  	if !ok {
   125  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   126  		if !ok {
   127  			pw.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
   128  			return
   129  		}
   130  		profile, ok = tombstone.Obj.(*sp.ServiceProfile)
   131  		if !ok {
   132  			pw.log.Errorf("DeletedFinalStateUnknown contained object that is not a ServiceProfile %#v", obj)
   133  			return
   134  		}
   135  	}
   136  
   137  	id := ProfileID{
   138  		Namespace: profile.Namespace,
   139  		Name:      profile.Name,
   140  	}
   141  
   142  	publisher, ok := pw.getProfilePublisher(id)
   143  	if ok {
   144  		publisher.update(nil)
   145  	}
   146  }
   147  
   148  func (pw *ProfileWatcher) getOrNewProfilePublisher(id ProfileID, profile *sp.ServiceProfile) *profilePublisher {
   149  	pw.Lock()
   150  	defer pw.Unlock()
   151  
   152  	publisher, ok := pw.profiles[id]
   153  	if !ok {
   154  		if profile == nil {
   155  			var err error
   156  			profile, err = pw.profileLister.ServiceProfiles(id.Namespace).Get(id.Name)
   157  			if err != nil && !apierrors.IsNotFound(err) {
   158  				pw.log.Errorf("error getting service profile: %s", err)
   159  			}
   160  			if err != nil {
   161  				profile = nil
   162  			}
   163  		}
   164  
   165  		publisher = &profilePublisher{
   166  			profile:   profile,
   167  			listeners: make([]ProfileUpdateListener, 0),
   168  			log: pw.log.WithFields(logging.Fields{
   169  				"component": "profile-publisher",
   170  				"ns":        id.Namespace,
   171  				"profile":   id.Name,
   172  			}),
   173  			profileMetrics: profileVecs.newMetrics(prometheus.Labels{
   174  				"namespace": id.Namespace,
   175  				"profile":   id.Name,
   176  			}),
   177  		}
   178  		pw.profiles[id] = publisher
   179  	}
   180  
   181  	return publisher
   182  }
   183  
   184  func (pw *ProfileWatcher) getProfilePublisher(id ProfileID) (publisher *profilePublisher, ok bool) {
   185  	pw.RLock()
   186  	defer pw.RUnlock()
   187  	publisher, ok = pw.profiles[id]
   188  	return
   189  }
   190  
   191  ////////////////////////
   192  /// profilePublisher ///
   193  ////////////////////////
   194  
   195  func (pp *profilePublisher) subscribe(listener ProfileUpdateListener) {
   196  	pp.Lock()
   197  	defer pp.Unlock()
   198  
   199  	pp.listeners = append(pp.listeners, listener)
   200  	listener.Update(pp.profile)
   201  
   202  	pp.profileMetrics.setSubscribers(len(pp.listeners))
   203  }
   204  
   205  // unsubscribe returns true if and only if the listener was found and removed.
   206  // it also returns the number of listeners remaining after unsubscribing.
   207  func (pp *profilePublisher) unsubscribe(listener ProfileUpdateListener) {
   208  	pp.Lock()
   209  	defer pp.Unlock()
   210  
   211  	for i, item := range pp.listeners {
   212  		if item == listener {
   213  			// delete the item from the slice
   214  			n := len(pp.listeners)
   215  			pp.listeners[i] = pp.listeners[n-1]
   216  			pp.listeners[n-1] = nil
   217  			pp.listeners = pp.listeners[:n-1]
   218  			break
   219  		}
   220  	}
   221  
   222  	pp.profileMetrics.setSubscribers(len(pp.listeners))
   223  }
   224  
   225  func (pp *profilePublisher) update(profile *sp.ServiceProfile) {
   226  	pp.Lock()
   227  	defer pp.Unlock()
   228  	pp.log.Debug("Updating profile")
   229  
   230  	pp.profile = profile
   231  	for _, listener := range pp.listeners {
   232  		listener.Update(profile)
   233  	}
   234  
   235  	pp.profileMetrics.incUpdates()
   236  }
   237  

View as plain text