1 package destination
2
3 import (
4 "fmt"
5
6 pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
7 "github.com/linkerd/linkerd2/controller/api/destination/watcher"
8 "github.com/prometheus/client_golang/prometheus"
9 "github.com/prometheus/client_golang/prometheus/promauto"
10 logging "github.com/sirupsen/logrus"
11 "google.golang.org/protobuf/proto"
12 )
13
14 type endpointProfileTranslator struct {
15 enableH2Upgrade bool
16 controllerNS string
17 identityTrustDomain string
18 defaultOpaquePorts map[uint32]struct{}
19
20 meshedHttp2ClientParams *pb.Http2ClientParams
21
22 stream pb.Destination_GetProfileServer
23 endStream chan struct{}
24
25 updates chan *watcher.Address
26 stop chan struct{}
27
28 current *pb.DestinationProfile
29
30 log *logging.Entry
31 }
32
33
34
35
36
37 var endpointProfileUpdatesQueueOverflowCounter = promauto.NewCounter(
38 prometheus.CounterOpts{
39 Name: "endpoint_profile_updates_queue_overflow",
40 Help: "A counter incremented whenever the endpoint profile updates queue overflows",
41 },
42 )
43
44
45
46 func newEndpointProfileTranslator(
47 enableH2Upgrade bool,
48 controllerNS,
49 identityTrustDomain string,
50 defaultOpaquePorts map[uint32]struct{},
51 meshedHTTP2ClientParams *pb.Http2ClientParams,
52 stream pb.Destination_GetProfileServer,
53 endStream chan struct{},
54 log *logging.Entry,
55 ) *endpointProfileTranslator {
56 return &endpointProfileTranslator{
57 enableH2Upgrade: enableH2Upgrade,
58 controllerNS: controllerNS,
59 identityTrustDomain: identityTrustDomain,
60 defaultOpaquePorts: defaultOpaquePorts,
61
62 meshedHttp2ClientParams: meshedHTTP2ClientParams,
63
64 stream: stream,
65 endStream: endStream,
66 updates: make(chan *watcher.Address, updateQueueCapacity),
67 stop: make(chan struct{}),
68
69 log: log.WithField("component", "endpoint-profile-translator"),
70 }
71 }
72
73
74
75
76
77 func (ept *endpointProfileTranslator) Start() {
78 go func() {
79 for {
80 select {
81 case update := <-ept.updates:
82 ept.update(update)
83 case <-ept.stop:
84 return
85 }
86 }
87 }()
88 }
89
90
91 func (ept *endpointProfileTranslator) Stop() {
92 close(ept.stop)
93 }
94
95
96
97 func (ept *endpointProfileTranslator) Update(address *watcher.Address) error {
98 select {
99 case ept.updates <- address:
100
101 return nil
102 default:
103 select {
104 case <-ept.endStream:
105
106
107 return fmt.Errorf("profile update stream closed")
108 default:
109
110
111 endpointProfileUpdatesQueueOverflowCounter.Inc()
112 close(ept.endStream)
113 return fmt.Errorf("profile update queue full; aborting stream")
114 }
115 }
116 }
117
118 func (ept *endpointProfileTranslator) queueLen() int {
119 return len(ept.updates)
120 }
121
122 func (ept *endpointProfileTranslator) update(address *watcher.Address) {
123 var opaquePorts map[uint32]struct{}
124 if address.Pod != nil {
125 opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, ept.defaultOpaquePorts)
126 } else {
127 opaquePorts = watcher.GetAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, ept.defaultOpaquePorts)
128 }
129 endpoint, err := ept.createEndpoint(*address, opaquePorts)
130 if err != nil {
131 ept.log.Errorf("Failed to create endpoint for %s:%d: %s",
132 address.IP, address.Port, err)
133 return
134 }
135 ept.log.Debugf("Created endpoint: %+v", endpoint)
136
137 _, opaqueProtocol := opaquePorts[address.Port]
138 profile := &pb.DestinationProfile{
139 RetryBudget: defaultRetryBudget(),
140 Endpoint: endpoint,
141 OpaqueProtocol: opaqueProtocol || address.OpaqueProtocol,
142 }
143 if proto.Equal(profile, ept.current) {
144 ept.log.Debugf("Ignoring redundant profile update: %+v", profile)
145 return
146 }
147
148 ept.log.Debugf("Sending profile update: %+v", profile)
149 if err := ept.stream.Send(profile); err != nil {
150 ept.log.Errorf("failed to send profile update: %s", err)
151 return
152 }
153
154 ept.current = profile
155 }
156
157 func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) {
158 var weightedAddr *pb.WeightedAddr
159 var err error
160 if address.ExternalWorkload != nil {
161 weightedAddr, err = createWeightedAddrForExternalWorkload(address, opaquePorts, ept.meshedHttp2ClientParams)
162 } else {
163 weightedAddr, err = createWeightedAddr(address, opaquePorts,
164 ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS, ept.meshedHttp2ClientParams)
165 }
166 if err != nil {
167 return nil, err
168 }
169
170
171
172 if address.Pod != nil {
173 weightedAddr.MetricLabels["namespace"] = address.Pod.Namespace
174 } else if address.ExternalWorkload != nil {
175 weightedAddr.MetricLabels["namespace"] = address.ExternalWorkload.Namespace
176 }
177
178 return weightedAddr, err
179 }
180
View as plain text