...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/endpoint_profile_translator.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination

     1  package destination
     2  
     3  import (
     4  	"fmt"
     5  
     6  	pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
     7  	"github.com/linkerd/linkerd2/controller/api/destination/watcher"
     8  	"github.com/prometheus/client_golang/prometheus"
     9  	"github.com/prometheus/client_golang/prometheus/promauto"
    10  	logging "github.com/sirupsen/logrus"
    11  	"google.golang.org/protobuf/proto"
    12  )
    13  
    14  type endpointProfileTranslator struct {
    15  	enableH2Upgrade     bool
    16  	controllerNS        string
    17  	identityTrustDomain string
    18  	defaultOpaquePorts  map[uint32]struct{}
    19  
    20  	meshedHttp2ClientParams *pb.Http2ClientParams
    21  
    22  	stream    pb.Destination_GetProfileServer
    23  	endStream chan struct{}
    24  
    25  	updates chan *watcher.Address
    26  	stop    chan struct{}
    27  
    28  	current *pb.DestinationProfile
    29  
    30  	log *logging.Entry
    31  }
    32  
    33  // endpointProfileUpdatesQueueOverflowCounter is a prometheus counter that is incremented
    34  // whenever the profile updates queue overflows.
    35  //
    36  // We omit ip and port labels because they are high cardinality.
    37  var endpointProfileUpdatesQueueOverflowCounter = promauto.NewCounter(
    38  	prometheus.CounterOpts{
    39  		Name: "endpoint_profile_updates_queue_overflow",
    40  		Help: "A counter incremented whenever the endpoint profile updates queue overflows",
    41  	},
    42  )
    43  
    44  // newEndpointProfileTranslator translates pod updates and profile updates to
    45  // DestinationProfiles for endpoints
    46  func newEndpointProfileTranslator(
    47  	enableH2Upgrade bool,
    48  	controllerNS,
    49  	identityTrustDomain string,
    50  	defaultOpaquePorts map[uint32]struct{},
    51  	meshedHTTP2ClientParams *pb.Http2ClientParams,
    52  	stream pb.Destination_GetProfileServer,
    53  	endStream chan struct{},
    54  	log *logging.Entry,
    55  ) *endpointProfileTranslator {
    56  	return &endpointProfileTranslator{
    57  		enableH2Upgrade:     enableH2Upgrade,
    58  		controllerNS:        controllerNS,
    59  		identityTrustDomain: identityTrustDomain,
    60  		defaultOpaquePorts:  defaultOpaquePorts,
    61  
    62  		meshedHttp2ClientParams: meshedHTTP2ClientParams,
    63  
    64  		stream:    stream,
    65  		endStream: endStream,
    66  		updates:   make(chan *watcher.Address, updateQueueCapacity),
    67  		stop:      make(chan struct{}),
    68  
    69  		log: log.WithField("component", "endpoint-profile-translator"),
    70  	}
    71  }
    72  
    73  // Start initiates a goroutine which processes update events off of the
    74  // endpointProfileTranslator's internal queue and sends to the grpc stream as
    75  // appropriate. The goroutine calls non-thread-safe Send, therefore Start must
    76  // not be called more than once.
    77  func (ept *endpointProfileTranslator) Start() {
    78  	go func() {
    79  		for {
    80  			select {
    81  			case update := <-ept.updates:
    82  				ept.update(update)
    83  			case <-ept.stop:
    84  				return
    85  			}
    86  		}
    87  	}()
    88  }
    89  
    90  // Stop terminates the goroutine started by Start.
    91  func (ept *endpointProfileTranslator) Stop() {
    92  	close(ept.stop)
    93  }
    94  
    95  // Update enqueues an address update to be translated into a DestinationProfile.
    96  // An error is returned if the update cannot be enqueued.
    97  func (ept *endpointProfileTranslator) Update(address *watcher.Address) error {
    98  	select {
    99  	case ept.updates <- address:
   100  		// Update has been successfully enqueued.
   101  		return nil
   102  	default:
   103  		select {
   104  		case <-ept.endStream:
   105  			// The endStream channel has already been closed so no action is
   106  			// necessary.
   107  			return fmt.Errorf("profile update stream closed")
   108  		default:
   109  			// We are unable to enqueue because the channel does not have capacity.
   110  			// The stream has fallen too far behind and should be closed.
   111  			endpointProfileUpdatesQueueOverflowCounter.Inc()
   112  			close(ept.endStream)
   113  			return fmt.Errorf("profile update queue full; aborting stream")
   114  		}
   115  	}
   116  }
   117  
   118  func (ept *endpointProfileTranslator) queueLen() int {
   119  	return len(ept.updates)
   120  }
   121  
   122  func (ept *endpointProfileTranslator) update(address *watcher.Address) {
   123  	var opaquePorts map[uint32]struct{}
   124  	if address.Pod != nil {
   125  		opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, ept.defaultOpaquePorts)
   126  	} else {
   127  		opaquePorts = watcher.GetAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, ept.defaultOpaquePorts)
   128  	}
   129  	endpoint, err := ept.createEndpoint(*address, opaquePorts)
   130  	if err != nil {
   131  		ept.log.Errorf("Failed to create endpoint for %s:%d: %s",
   132  			address.IP, address.Port, err)
   133  		return
   134  	}
   135  	ept.log.Debugf("Created endpoint: %+v", endpoint)
   136  
   137  	_, opaqueProtocol := opaquePorts[address.Port]
   138  	profile := &pb.DestinationProfile{
   139  		RetryBudget:    defaultRetryBudget(),
   140  		Endpoint:       endpoint,
   141  		OpaqueProtocol: opaqueProtocol || address.OpaqueProtocol,
   142  	}
   143  	if proto.Equal(profile, ept.current) {
   144  		ept.log.Debugf("Ignoring redundant profile update: %+v", profile)
   145  		return
   146  	}
   147  
   148  	ept.log.Debugf("Sending profile update: %+v", profile)
   149  	if err := ept.stream.Send(profile); err != nil {
   150  		ept.log.Errorf("failed to send profile update: %s", err)
   151  		return
   152  	}
   153  
   154  	ept.current = profile
   155  }
   156  
   157  func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) {
   158  	var weightedAddr *pb.WeightedAddr
   159  	var err error
   160  	if address.ExternalWorkload != nil {
   161  		weightedAddr, err = createWeightedAddrForExternalWorkload(address, opaquePorts, ept.meshedHttp2ClientParams)
   162  	} else {
   163  		weightedAddr, err = createWeightedAddr(address, opaquePorts,
   164  			ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS, ept.meshedHttp2ClientParams)
   165  	}
   166  	if err != nil {
   167  		return nil, err
   168  	}
   169  
   170  	// `Get` doesn't include the namespace in the per-endpoint
   171  	// metadata, so it needs to be special-cased.
   172  	if address.Pod != nil {
   173  		weightedAddr.MetricLabels["namespace"] = address.Pod.Namespace
   174  	} else if address.ExternalWorkload != nil {
   175  		weightedAddr.MetricLabels["namespace"] = address.ExternalWorkload.Namespace
   176  	}
   177  
   178  	return weightedAddr, err
   179  }
   180  

View as plain text