1 package destination
2
3 import (
4 "errors"
5 "fmt"
6 "net"
7 "time"
8
9 "github.com/golang/protobuf/ptypes/duration"
10 pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
11 sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
12 "github.com/linkerd/linkerd2/pkg/profiles"
13 "github.com/linkerd/linkerd2/pkg/util"
14 "github.com/prometheus/client_golang/prometheus"
15 "github.com/prometheus/client_golang/prometheus/promauto"
16 logging "github.com/sirupsen/logrus"
17 )
18
19 const millisPerDecimilli = 10
20
21
22 type profileTranslator struct {
23 fullyQualifiedName string
24 port uint32
25
26 stream pb.Destination_GetProfileServer
27 endStream chan struct{}
28 log *logging.Entry
29 overflowCounter prometheus.Counter
30
31 updates chan *sp.ServiceProfile
32 stop chan struct{}
33 }
34
35 var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec(
36 prometheus.CounterOpts{
37 Name: "profile_updates_queue_overflow",
38 Help: "A counter incremented whenever the profile updates queue overflows",
39 },
40 []string{
41 "fqn",
42 "port",
43 },
44 )
45
46 func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) *profileTranslator {
47 return &profileTranslator{
48 fullyQualifiedName: fqn,
49 port: port,
50
51 stream: stream,
52 endStream: endStream,
53 log: log.WithField("component", "profile-translator"),
54 overflowCounter: profileUpdatesQueueOverflowCounter.With(prometheus.Labels{"fqn": fqn, "port": fmt.Sprintf("%d", port)}),
55 updates: make(chan *sp.ServiceProfile, updateQueueCapacity),
56 stop: make(chan struct{}),
57 }
58 }
59
60
61
62
63
64
65 func (pt *profileTranslator) Update(profile *sp.ServiceProfile) {
66 select {
67 case pt.updates <- profile:
68
69 default:
70
71
72 pt.overflowCounter.Inc()
73 select {
74 case <-pt.endStream:
75
76
77 default:
78 pt.log.Error("profile update queue full; aborting stream")
79 close(pt.endStream)
80 }
81 }
82 }
83
84
85
86
87
88 func (pt *profileTranslator) Start() {
89 go func() {
90 for {
91 select {
92 case update := <-pt.updates:
93 pt.update(update)
94 case <-pt.stop:
95 return
96 }
97 }
98 }()
99 }
100
101
102 func (pt *profileTranslator) Stop() {
103 close(pt.stop)
104 }
105
106 func (pt *profileTranslator) update(profile *sp.ServiceProfile) {
107 if profile == nil {
108 pt.log.Debugf("Sending default profile")
109 if err := pt.stream.Send(pt.defaultServiceProfile()); err != nil {
110 pt.log.Errorf("failed to send default service profile: %s", err)
111 }
112 return
113 }
114
115 destinationProfile, err := pt.createDestinationProfile(profile)
116 if err != nil {
117 pt.log.Error(err)
118 return
119 }
120 pt.log.Debugf("Sending profile update: %+v", destinationProfile)
121 if err := pt.stream.Send(destinationProfile); err != nil {
122 pt.log.Errorf("failed to send profile update: %s", err)
123 }
124 }
125
126 func (pt *profileTranslator) defaultServiceProfile() *pb.DestinationProfile {
127 return &pb.DestinationProfile{
128 Routes: []*pb.Route{},
129 RetryBudget: defaultRetryBudget(),
130 FullyQualifiedName: pt.fullyQualifiedName,
131 }
132 }
133
134 func defaultRetryBudget() *pb.RetryBudget {
135 return &pb.RetryBudget{
136 MinRetriesPerSecond: 10,
137 RetryRatio: 0.2,
138 Ttl: &duration.Duration{
139 Seconds: 10,
140 },
141 }
142 }
143
144 func toDuration(d time.Duration) *duration.Duration {
145 if d == 0 {
146 return nil
147 }
148 return &duration.Duration{
149 Seconds: int64(d / time.Second),
150 Nanos: int32(d % time.Second),
151 }
152 }
153
154
155
156 func (pt *profileTranslator) createDestinationProfile(profile *sp.ServiceProfile) (*pb.DestinationProfile, error) {
157 routes := make([]*pb.Route, 0)
158 for _, route := range profile.Spec.Routes {
159 pbRoute, err := toRoute(profile, route)
160 if err != nil {
161 return nil, err
162 }
163 routes = append(routes, pbRoute)
164 }
165 budget := defaultRetryBudget()
166 if profile.Spec.RetryBudget != nil {
167 budget.MinRetriesPerSecond = profile.Spec.RetryBudget.MinRetriesPerSecond
168 budget.RetryRatio = profile.Spec.RetryBudget.RetryRatio
169 ttl, err := time.ParseDuration(profile.Spec.RetryBudget.TTL)
170 if err != nil {
171 return nil, err
172 }
173 budget.Ttl = toDuration(ttl)
174 }
175 var opaqueProtocol bool
176 if profile.Spec.OpaquePorts != nil {
177 _, opaqueProtocol = profile.Spec.OpaquePorts[pt.port]
178 }
179 return &pb.DestinationProfile{
180 Routes: routes,
181 RetryBudget: budget,
182 DstOverrides: toDstOverrides(profile.Spec.DstOverrides, pt.port),
183 FullyQualifiedName: pt.fullyQualifiedName,
184 OpaqueProtocol: opaqueProtocol,
185 }, nil
186 }
187
188 func toDstOverrides(dsts []*sp.WeightedDst, port uint32) []*pb.WeightedDst {
189 pbDsts := []*pb.WeightedDst{}
190 for _, dst := range dsts {
191 authority := dst.Authority
192
193 if _, _, err := net.SplitHostPort(authority); err != nil {
194 authority = net.JoinHostPort(authority, fmt.Sprintf("%d", port))
195 }
196
197 pbDst := &pb.WeightedDst{
198 Authority: authority,
199
200 Weight: uint32(dst.Weight.MilliValue() * millisPerDecimilli),
201 }
202 pbDsts = append(pbDsts, pbDst)
203 }
204 return pbDsts
205 }
206
207
208 func toRoute(profile *sp.ServiceProfile, route *sp.RouteSpec) (*pb.Route, error) {
209 cond, err := toRequestMatch(route.Condition)
210 if err != nil {
211 return nil, err
212 }
213 rcs := make([]*pb.ResponseClass, 0)
214 for _, rc := range route.ResponseClasses {
215 pbRc, err := toResponseClass(rc)
216 if err != nil {
217 return nil, err
218 }
219 rcs = append(rcs, pbRc)
220 }
221 var timeout time.Duration
222 if route.Timeout != "" {
223 timeout, err = time.ParseDuration(route.Timeout)
224 if err != nil {
225 logging.Errorf(
226 "failed to parse duration for route '%s' in service profile '%s' in namespace '%s': %s",
227 route.Name,
228 profile.Name,
229 profile.Namespace,
230 err,
231 )
232 }
233 }
234 return &pb.Route{
235 Condition: cond,
236 ResponseClasses: rcs,
237 MetricsLabels: map[string]string{"route": route.Name},
238 IsRetryable: route.IsRetryable,
239 Timeout: toDuration(timeout),
240 }, nil
241 }
242
243
244
245 func toResponseClass(rc *sp.ResponseClass) (*pb.ResponseClass, error) {
246 cond, err := toResponseMatch(rc.Condition)
247 if err != nil {
248 return nil, err
249 }
250 return &pb.ResponseClass{
251 Condition: cond,
252 IsFailure: rc.IsFailure,
253 }, nil
254 }
255
256
257
258 func toResponseMatch(rspMatch *sp.ResponseMatch) (*pb.ResponseMatch, error) {
259 if rspMatch == nil {
260 return nil, errors.New("missing response match")
261 }
262 err := profiles.ValidateResponseMatch(rspMatch)
263 if err != nil {
264 return nil, err
265 }
266
267 matches := make([]*pb.ResponseMatch, 0)
268
269 if rspMatch.All != nil {
270 all := make([]*pb.ResponseMatch, 0)
271 for _, m := range rspMatch.All {
272 pbM, err := toResponseMatch(m)
273 if err != nil {
274 return nil, err
275 }
276 all = append(all, pbM)
277 }
278 matches = append(matches, &pb.ResponseMatch{
279 Match: &pb.ResponseMatch_All{
280 All: &pb.ResponseMatch_Seq{
281 Matches: all,
282 },
283 },
284 })
285 }
286
287 if rspMatch.Any != nil {
288 any := make([]*pb.ResponseMatch, 0)
289 for _, m := range rspMatch.Any {
290 pbM, err := toResponseMatch(m)
291 if err != nil {
292 return nil, err
293 }
294 any = append(any, pbM)
295 }
296 matches = append(matches, &pb.ResponseMatch{
297 Match: &pb.ResponseMatch_Any{
298 Any: &pb.ResponseMatch_Seq{
299 Matches: any,
300 },
301 },
302 })
303 }
304
305 if rspMatch.Status != nil {
306 matches = append(matches, &pb.ResponseMatch{
307 Match: &pb.ResponseMatch_Status{
308 Status: &pb.HttpStatusRange{
309 Max: rspMatch.Status.Max,
310 Min: rspMatch.Status.Min,
311 },
312 },
313 })
314 }
315
316 if rspMatch.Not != nil {
317 not, err := toResponseMatch(rspMatch.Not)
318 if err != nil {
319 return nil, err
320 }
321 matches = append(matches, &pb.ResponseMatch{
322 Match: &pb.ResponseMatch_Not{
323 Not: not,
324 },
325 })
326 }
327
328 if len(matches) == 0 {
329 return nil, errors.New("a response match must have a field set")
330 }
331 if len(matches) == 1 {
332 return matches[0], nil
333 }
334 return &pb.ResponseMatch{
335 Match: &pb.ResponseMatch_All{
336 All: &pb.ResponseMatch_Seq{
337 Matches: matches,
338 },
339 },
340 }, nil
341 }
342
343
344
345 func toRequestMatch(reqMatch *sp.RequestMatch) (*pb.RequestMatch, error) {
346 if reqMatch == nil {
347 return nil, errors.New("missing request match")
348 }
349 err := profiles.ValidateRequestMatch(reqMatch)
350 if err != nil {
351 return nil, err
352 }
353
354 matches := make([]*pb.RequestMatch, 0)
355
356 if reqMatch.All != nil {
357 all := make([]*pb.RequestMatch, 0)
358 for _, m := range reqMatch.All {
359 pbM, err := toRequestMatch(m)
360 if err != nil {
361 return nil, err
362 }
363 all = append(all, pbM)
364 }
365 matches = append(matches, &pb.RequestMatch{
366 Match: &pb.RequestMatch_All{
367 All: &pb.RequestMatch_Seq{
368 Matches: all,
369 },
370 },
371 })
372 }
373
374 if reqMatch.Any != nil {
375 any := make([]*pb.RequestMatch, 0)
376 for _, m := range reqMatch.Any {
377 pbM, err := toRequestMatch(m)
378 if err != nil {
379 return nil, err
380 }
381 any = append(any, pbM)
382 }
383 matches = append(matches, &pb.RequestMatch{
384 Match: &pb.RequestMatch_Any{
385 Any: &pb.RequestMatch_Seq{
386 Matches: any,
387 },
388 },
389 })
390 }
391
392 if reqMatch.Method != "" {
393 matches = append(matches, &pb.RequestMatch{
394 Match: &pb.RequestMatch_Method{
395 Method: util.ParseMethod(reqMatch.Method),
396 },
397 })
398 }
399
400 if reqMatch.Not != nil {
401 not, err := toRequestMatch(reqMatch.Not)
402 if err != nil {
403 return nil, err
404 }
405 matches = append(matches, &pb.RequestMatch{
406 Match: &pb.RequestMatch_Not{
407 Not: not,
408 },
409 })
410 }
411
412 if reqMatch.PathRegex != "" {
413 matches = append(matches, &pb.RequestMatch{
414 Match: &pb.RequestMatch_Path{
415 Path: &pb.PathMatch{
416 Regex: reqMatch.PathRegex,
417 },
418 },
419 })
420 }
421
422 if len(matches) == 0 {
423 return nil, errors.New("a request match must have a field set")
424 }
425 if len(matches) == 1 {
426 return matches[0], nil
427 }
428 return &pb.RequestMatch{
429 Match: &pb.RequestMatch_All{
430 All: &pb.RequestMatch_Seq{
431 Matches: matches,
432 },
433 },
434 }, nil
435 }
436
View as plain text