...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/profile_translator.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination

     1  package destination
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"net"
     7  	"time"
     8  
     9  	"github.com/golang/protobuf/ptypes/duration"
    10  	pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
    11  	sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
    12  	"github.com/linkerd/linkerd2/pkg/profiles"
    13  	"github.com/linkerd/linkerd2/pkg/util"
    14  	"github.com/prometheus/client_golang/prometheus"
    15  	"github.com/prometheus/client_golang/prometheus/promauto"
    16  	logging "github.com/sirupsen/logrus"
    17  )
    18  
    19  const millisPerDecimilli = 10
    20  
    21  // implements the ProfileUpdateListener interface
    22  type profileTranslator struct {
    23  	fullyQualifiedName string
    24  	port               uint32
    25  
    26  	stream          pb.Destination_GetProfileServer
    27  	endStream       chan struct{}
    28  	log             *logging.Entry
    29  	overflowCounter prometheus.Counter
    30  
    31  	updates chan *sp.ServiceProfile
    32  	stop    chan struct{}
    33  }
    34  
    35  var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec(
    36  	prometheus.CounterOpts{
    37  		Name: "profile_updates_queue_overflow",
    38  		Help: "A counter incremented whenever the profile updates queue overflows",
    39  	},
    40  	[]string{
    41  		"fqn",
    42  		"port",
    43  	},
    44  )
    45  
    46  func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) *profileTranslator {
    47  	return &profileTranslator{
    48  		fullyQualifiedName: fqn,
    49  		port:               port,
    50  
    51  		stream:          stream,
    52  		endStream:       endStream,
    53  		log:             log.WithField("component", "profile-translator"),
    54  		overflowCounter: profileUpdatesQueueOverflowCounter.With(prometheus.Labels{"fqn": fqn, "port": fmt.Sprintf("%d", port)}),
    55  		updates:         make(chan *sp.ServiceProfile, updateQueueCapacity),
    56  		stop:            make(chan struct{}),
    57  	}
    58  }
    59  
    60  // Update is called from a client-go informer callback and therefore must not
    61  // We enqueue an update in a channel so that it can be processed asyncronously.
    62  // To ensure that enqueuing does not block, we first check to see if there is
    63  // capacity in the buffered channel. If there is not, we drop the update and
    64  // signal to the stream that it has fallen too far behind and should be closed.
    65  func (pt *profileTranslator) Update(profile *sp.ServiceProfile) {
    66  	select {
    67  	case pt.updates <- profile:
    68  		// Update has been successfully enqueued.
    69  	default:
    70  		// We are unable to enqueue because the channel does not have capacity.
    71  		// The stream has fallen too far behind and should be closed.
    72  		pt.overflowCounter.Inc()
    73  		select {
    74  		case <-pt.endStream:
    75  			// The endStream channel has already been closed so no action is
    76  			// necessary.
    77  		default:
    78  			pt.log.Error("profile update queue full; aborting stream")
    79  			close(pt.endStream)
    80  		}
    81  	}
    82  }
    83  
    84  // Start initiates a goroutine which processes update events off of the
    85  // profileTranslator's internal queue and sends to the grpc stream as
    86  // appropriate. The goroutine calls non-thread-safe Send, therefore Start must
    87  // not be called more than once.
    88  func (pt *profileTranslator) Start() {
    89  	go func() {
    90  		for {
    91  			select {
    92  			case update := <-pt.updates:
    93  				pt.update(update)
    94  			case <-pt.stop:
    95  				return
    96  			}
    97  		}
    98  	}()
    99  }
   100  
   101  // Stop terminates the goroutine started by Start.
   102  func (pt *profileTranslator) Stop() {
   103  	close(pt.stop)
   104  }
   105  
   106  func (pt *profileTranslator) update(profile *sp.ServiceProfile) {
   107  	if profile == nil {
   108  		pt.log.Debugf("Sending default profile")
   109  		if err := pt.stream.Send(pt.defaultServiceProfile()); err != nil {
   110  			pt.log.Errorf("failed to send default service profile: %s", err)
   111  		}
   112  		return
   113  	}
   114  
   115  	destinationProfile, err := pt.createDestinationProfile(profile)
   116  	if err != nil {
   117  		pt.log.Error(err)
   118  		return
   119  	}
   120  	pt.log.Debugf("Sending profile update: %+v", destinationProfile)
   121  	if err := pt.stream.Send(destinationProfile); err != nil {
   122  		pt.log.Errorf("failed to send profile update: %s", err)
   123  	}
   124  }
   125  
   126  func (pt *profileTranslator) defaultServiceProfile() *pb.DestinationProfile {
   127  	return &pb.DestinationProfile{
   128  		Routes:             []*pb.Route{},
   129  		RetryBudget:        defaultRetryBudget(),
   130  		FullyQualifiedName: pt.fullyQualifiedName,
   131  	}
   132  }
   133  
   134  func defaultRetryBudget() *pb.RetryBudget {
   135  	return &pb.RetryBudget{
   136  		MinRetriesPerSecond: 10,
   137  		RetryRatio:          0.2,
   138  		Ttl: &duration.Duration{
   139  			Seconds: 10,
   140  		},
   141  	}
   142  }
   143  
   144  func toDuration(d time.Duration) *duration.Duration {
   145  	if d == 0 {
   146  		return nil
   147  	}
   148  	return &duration.Duration{
   149  		Seconds: int64(d / time.Second),
   150  		Nanos:   int32(d % time.Second),
   151  	}
   152  }
   153  
   154  // createDestinationProfile returns a Proxy API DestinationProfile, given a
   155  // ServiceProfile.
   156  func (pt *profileTranslator) createDestinationProfile(profile *sp.ServiceProfile) (*pb.DestinationProfile, error) {
   157  	routes := make([]*pb.Route, 0)
   158  	for _, route := range profile.Spec.Routes {
   159  		pbRoute, err := toRoute(profile, route)
   160  		if err != nil {
   161  			return nil, err
   162  		}
   163  		routes = append(routes, pbRoute)
   164  	}
   165  	budget := defaultRetryBudget()
   166  	if profile.Spec.RetryBudget != nil {
   167  		budget.MinRetriesPerSecond = profile.Spec.RetryBudget.MinRetriesPerSecond
   168  		budget.RetryRatio = profile.Spec.RetryBudget.RetryRatio
   169  		ttl, err := time.ParseDuration(profile.Spec.RetryBudget.TTL)
   170  		if err != nil {
   171  			return nil, err
   172  		}
   173  		budget.Ttl = toDuration(ttl)
   174  	}
   175  	var opaqueProtocol bool
   176  	if profile.Spec.OpaquePorts != nil {
   177  		_, opaqueProtocol = profile.Spec.OpaquePorts[pt.port]
   178  	}
   179  	return &pb.DestinationProfile{
   180  		Routes:             routes,
   181  		RetryBudget:        budget,
   182  		DstOverrides:       toDstOverrides(profile.Spec.DstOverrides, pt.port),
   183  		FullyQualifiedName: pt.fullyQualifiedName,
   184  		OpaqueProtocol:     opaqueProtocol,
   185  	}, nil
   186  }
   187  
   188  func toDstOverrides(dsts []*sp.WeightedDst, port uint32) []*pb.WeightedDst {
   189  	pbDsts := []*pb.WeightedDst{}
   190  	for _, dst := range dsts {
   191  		authority := dst.Authority
   192  		// If the authority does not parse as a host:port, add the port provided by `GetProfile`.
   193  		if _, _, err := net.SplitHostPort(authority); err != nil {
   194  			authority = net.JoinHostPort(authority, fmt.Sprintf("%d", port))
   195  		}
   196  
   197  		pbDst := &pb.WeightedDst{
   198  			Authority: authority,
   199  			// Weights are expressed in decimillis: 10_000 represents 100%
   200  			Weight: uint32(dst.Weight.MilliValue() * millisPerDecimilli),
   201  		}
   202  		pbDsts = append(pbDsts, pbDst)
   203  	}
   204  	return pbDsts
   205  }
   206  
   207  // toRoute returns a Proxy API Route, given a ServiceProfile Route.
   208  func toRoute(profile *sp.ServiceProfile, route *sp.RouteSpec) (*pb.Route, error) {
   209  	cond, err := toRequestMatch(route.Condition)
   210  	if err != nil {
   211  		return nil, err
   212  	}
   213  	rcs := make([]*pb.ResponseClass, 0)
   214  	for _, rc := range route.ResponseClasses {
   215  		pbRc, err := toResponseClass(rc)
   216  		if err != nil {
   217  			return nil, err
   218  		}
   219  		rcs = append(rcs, pbRc)
   220  	}
   221  	var timeout time.Duration // No default timeout
   222  	if route.Timeout != "" {
   223  		timeout, err = time.ParseDuration(route.Timeout)
   224  		if err != nil {
   225  			logging.Errorf(
   226  				"failed to parse duration for route '%s' in service profile '%s' in namespace '%s': %s",
   227  				route.Name,
   228  				profile.Name,
   229  				profile.Namespace,
   230  				err,
   231  			)
   232  		}
   233  	}
   234  	return &pb.Route{
   235  		Condition:       cond,
   236  		ResponseClasses: rcs,
   237  		MetricsLabels:   map[string]string{"route": route.Name},
   238  		IsRetryable:     route.IsRetryable,
   239  		Timeout:         toDuration(timeout),
   240  	}, nil
   241  }
   242  
   243  // toResponseClass returns a Proxy API ResponseClass, given a ServiceProfile
   244  // ResponseClass.
   245  func toResponseClass(rc *sp.ResponseClass) (*pb.ResponseClass, error) {
   246  	cond, err := toResponseMatch(rc.Condition)
   247  	if err != nil {
   248  		return nil, err
   249  	}
   250  	return &pb.ResponseClass{
   251  		Condition: cond,
   252  		IsFailure: rc.IsFailure,
   253  	}, nil
   254  }
   255  
   256  // toResponseMatch returns a Proxy API ResponseMatch, given a ServiceProfile
   257  // ResponseMatch.
   258  func toResponseMatch(rspMatch *sp.ResponseMatch) (*pb.ResponseMatch, error) {
   259  	if rspMatch == nil {
   260  		return nil, errors.New("missing response match")
   261  	}
   262  	err := profiles.ValidateResponseMatch(rspMatch)
   263  	if err != nil {
   264  		return nil, err
   265  	}
   266  
   267  	matches := make([]*pb.ResponseMatch, 0)
   268  
   269  	if rspMatch.All != nil {
   270  		all := make([]*pb.ResponseMatch, 0)
   271  		for _, m := range rspMatch.All {
   272  			pbM, err := toResponseMatch(m)
   273  			if err != nil {
   274  				return nil, err
   275  			}
   276  			all = append(all, pbM)
   277  		}
   278  		matches = append(matches, &pb.ResponseMatch{
   279  			Match: &pb.ResponseMatch_All{
   280  				All: &pb.ResponseMatch_Seq{
   281  					Matches: all,
   282  				},
   283  			},
   284  		})
   285  	}
   286  
   287  	if rspMatch.Any != nil {
   288  		any := make([]*pb.ResponseMatch, 0)
   289  		for _, m := range rspMatch.Any {
   290  			pbM, err := toResponseMatch(m)
   291  			if err != nil {
   292  				return nil, err
   293  			}
   294  			any = append(any, pbM)
   295  		}
   296  		matches = append(matches, &pb.ResponseMatch{
   297  			Match: &pb.ResponseMatch_Any{
   298  				Any: &pb.ResponseMatch_Seq{
   299  					Matches: any,
   300  				},
   301  			},
   302  		})
   303  	}
   304  
   305  	if rspMatch.Status != nil {
   306  		matches = append(matches, &pb.ResponseMatch{
   307  			Match: &pb.ResponseMatch_Status{
   308  				Status: &pb.HttpStatusRange{
   309  					Max: rspMatch.Status.Max,
   310  					Min: rspMatch.Status.Min,
   311  				},
   312  			},
   313  		})
   314  	}
   315  
   316  	if rspMatch.Not != nil {
   317  		not, err := toResponseMatch(rspMatch.Not)
   318  		if err != nil {
   319  			return nil, err
   320  		}
   321  		matches = append(matches, &pb.ResponseMatch{
   322  			Match: &pb.ResponseMatch_Not{
   323  				Not: not,
   324  			},
   325  		})
   326  	}
   327  
   328  	if len(matches) == 0 {
   329  		return nil, errors.New("a response match must have a field set")
   330  	}
   331  	if len(matches) == 1 {
   332  		return matches[0], nil
   333  	}
   334  	return &pb.ResponseMatch{
   335  		Match: &pb.ResponseMatch_All{
   336  			All: &pb.ResponseMatch_Seq{
   337  				Matches: matches,
   338  			},
   339  		},
   340  	}, nil
   341  }
   342  
   343  // toRequestMatch returns a Proxy API RequestMatch, given a ServiceProfile
   344  // RequestMatch.
   345  func toRequestMatch(reqMatch *sp.RequestMatch) (*pb.RequestMatch, error) {
   346  	if reqMatch == nil {
   347  		return nil, errors.New("missing request match")
   348  	}
   349  	err := profiles.ValidateRequestMatch(reqMatch)
   350  	if err != nil {
   351  		return nil, err
   352  	}
   353  
   354  	matches := make([]*pb.RequestMatch, 0)
   355  
   356  	if reqMatch.All != nil {
   357  		all := make([]*pb.RequestMatch, 0)
   358  		for _, m := range reqMatch.All {
   359  			pbM, err := toRequestMatch(m)
   360  			if err != nil {
   361  				return nil, err
   362  			}
   363  			all = append(all, pbM)
   364  		}
   365  		matches = append(matches, &pb.RequestMatch{
   366  			Match: &pb.RequestMatch_All{
   367  				All: &pb.RequestMatch_Seq{
   368  					Matches: all,
   369  				},
   370  			},
   371  		})
   372  	}
   373  
   374  	if reqMatch.Any != nil {
   375  		any := make([]*pb.RequestMatch, 0)
   376  		for _, m := range reqMatch.Any {
   377  			pbM, err := toRequestMatch(m)
   378  			if err != nil {
   379  				return nil, err
   380  			}
   381  			any = append(any, pbM)
   382  		}
   383  		matches = append(matches, &pb.RequestMatch{
   384  			Match: &pb.RequestMatch_Any{
   385  				Any: &pb.RequestMatch_Seq{
   386  					Matches: any,
   387  				},
   388  			},
   389  		})
   390  	}
   391  
   392  	if reqMatch.Method != "" {
   393  		matches = append(matches, &pb.RequestMatch{
   394  			Match: &pb.RequestMatch_Method{
   395  				Method: util.ParseMethod(reqMatch.Method),
   396  			},
   397  		})
   398  	}
   399  
   400  	if reqMatch.Not != nil {
   401  		not, err := toRequestMatch(reqMatch.Not)
   402  		if err != nil {
   403  			return nil, err
   404  		}
   405  		matches = append(matches, &pb.RequestMatch{
   406  			Match: &pb.RequestMatch_Not{
   407  				Not: not,
   408  			},
   409  		})
   410  	}
   411  
   412  	if reqMatch.PathRegex != "" {
   413  		matches = append(matches, &pb.RequestMatch{
   414  			Match: &pb.RequestMatch_Path{
   415  				Path: &pb.PathMatch{
   416  					Regex: reqMatch.PathRegex,
   417  				},
   418  			},
   419  		})
   420  	}
   421  
   422  	if len(matches) == 0 {
   423  		return nil, errors.New("a request match must have a field set")
   424  	}
   425  	if len(matches) == 1 {
   426  		return matches[0], nil
   427  	}
   428  	return &pb.RequestMatch{
   429  		Match: &pb.RequestMatch_All{
   430  			All: &pb.RequestMatch_Seq{
   431  				Matches: matches,
   432  			},
   433  		},
   434  	}, nil
   435  }
   436  

View as plain text