1 package watcher
2
3 import (
4 "strconv"
5 "sync"
6 "time"
7
8 "github.com/linkerd/linkerd2/controller/k8s"
9 labels "github.com/linkerd/linkerd2/pkg/k8s"
10 "github.com/linkerd/linkerd2/pkg/util"
11 "github.com/prometheus/client_golang/prometheus"
12 "github.com/prometheus/client_golang/prometheus/promauto"
13 logging "github.com/sirupsen/logrus"
14 corev1 "k8s.io/api/core/v1"
15 "k8s.io/client-go/tools/cache"
16 )
17
18 type (
19
20
21
22 OpaquePortsWatcher struct {
23 subscriptions map[ServiceID]*svcSubscriptions
24 k8sAPI *k8s.API
25 subscribersGauge *prometheus.GaugeVec
26 log *logging.Entry
27 defaultOpaquePorts map[uint32]struct{}
28 sync.RWMutex
29 }
30
31 svcSubscriptions struct {
32 opaquePorts map[uint32]struct{}
33 listeners []OpaquePortsUpdateListener
34 }
35
36
37 OpaquePortsUpdateListener interface {
38 UpdateService(ports map[uint32]struct{})
39 }
40 )
41
42 var opaquePortsMetrics = promauto.NewGaugeVec(
43 prometheus.GaugeOpts{
44 Name: "service_subscribers",
45 Help: "Number of subscribers to Service changes.",
46 },
47 []string{"namespace", "name"},
48 )
49
50
51
52 func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error) {
53 opw := &OpaquePortsWatcher{
54 subscriptions: make(map[ServiceID]*svcSubscriptions),
55 k8sAPI: k8sAPI,
56 subscribersGauge: opaquePortsMetrics,
57 log: log.WithField("component", "opaque-ports-watcher"),
58 defaultOpaquePorts: opaquePorts,
59 }
60 _, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
61 AddFunc: opw.addService,
62 DeleteFunc: opw.deleteService,
63 UpdateFunc: opw.updateService,
64 })
65 if err != nil {
66 return nil, err
67 }
68
69 return opw, nil
70 }
71
72
73
74
75 func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdateListener) error {
76 opw.Lock()
77 defer opw.Unlock()
78 svc, _ := opw.k8sAPI.Svc().Lister().Services(id.Namespace).Get(id.Name)
79 if svc != nil && svc.Spec.Type == corev1.ServiceTypeExternalName {
80 return invalidService(id.String())
81 }
82 opw.log.Debugf("Starting watch on service %s", id)
83 var numListeners float64
84 ss, ok := opw.subscriptions[id]
85 if !ok {
86
87
88 opw.subscriptions[id] = &svcSubscriptions{
89 opaquePorts: opw.defaultOpaquePorts,
90 listeners: []OpaquePortsUpdateListener{listener},
91 }
92 numListeners = 1
93 } else {
94
95
96
97 ss.listeners = append(ss.listeners, listener)
98 listener.UpdateService(ss.opaquePorts)
99 numListeners = float64(len(ss.listeners))
100 }
101
102 opw.subscribersGauge.With(id.Labels()).Set(numListeners)
103
104 return nil
105 }
106
107
108 func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpdateListener) {
109 opw.Lock()
110 defer opw.Unlock()
111 opw.log.Debugf("Stopping watch on service %s", id)
112 ss, ok := opw.subscriptions[id]
113 if !ok {
114 opw.log.Errorf("Cannot unsubscribe from unknown service %s", id)
115 return
116 }
117 for i, l := range ss.listeners {
118 if l == listener {
119 n := len(ss.listeners)
120 ss.listeners[i] = ss.listeners[n-1]
121 ss.listeners[n-1] = nil
122 ss.listeners = ss.listeners[:n-1]
123 }
124 }
125
126 labels := id.Labels()
127 if len(ss.listeners) > 0 {
128 opw.subscribersGauge.With(labels).Set(float64(len(ss.listeners)))
129 } else {
130 if !opw.subscribersGauge.Delete(labels) {
131 opw.log.Warnf("unable to delete service_subscribers metric with labels %s", labels)
132 }
133 delete(opw.subscriptions, id)
134 }
135 }
136
137 func (opw *OpaquePortsWatcher) updateService(oldObj interface{}, newObj interface{}) {
138 newSvc := newObj.(*corev1.Service)
139 oldSvc := oldObj.(*corev1.Service)
140
141 oldUpdated := latestUpdated(oldSvc.ManagedFields)
142 updated := latestUpdated(newSvc.ManagedFields)
143 if !updated.IsZero() && updated != oldUpdated {
144 delta := time.Since(updated)
145 serviceInformerLag.Observe(delta.Seconds())
146 }
147 opw.addService(newObj)
148 }
149
150 func (opw *OpaquePortsWatcher) addService(obj interface{}) {
151 opw.Lock()
152 defer opw.Unlock()
153 svc := obj.(*corev1.Service)
154 id := ServiceID{
155 Namespace: svc.Namespace,
156 Name: svc.Name,
157 }
158 opaquePorts, ok, err := getServiceOpaquePortsAnnotation(svc)
159 if err != nil {
160 opw.log.Errorf("failed to get %s service opaque ports annotation: %s", id, err)
161 return
162 }
163
164
165 if !ok {
166 opaquePorts = opw.defaultOpaquePorts
167 }
168 ss, ok := opw.subscriptions[id]
169
170
171 if !ok {
172 opw.subscriptions[id] = &svcSubscriptions{
173 opaquePorts: opaquePorts,
174 listeners: []OpaquePortsUpdateListener{},
175 }
176 return
177 }
178
179
180 if portsEqual(ss.opaquePorts, opaquePorts) {
181 return
182 }
183 ss.opaquePorts = opaquePorts
184 for _, listener := range ss.listeners {
185 listener.UpdateService(ss.opaquePorts)
186 }
187 }
188
189 func (opw *OpaquePortsWatcher) deleteService(obj interface{}) {
190 opw.Lock()
191 defer opw.Unlock()
192 service, ok := obj.(*corev1.Service)
193 if !ok {
194 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
195 if !ok {
196 opw.log.Errorf("could not get object from DeletedFinalStateUnknown %#v", obj)
197 return
198 }
199 service, ok = tombstone.Obj.(*corev1.Service)
200 if !ok {
201 opw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj)
202 return
203 }
204 }
205 id := ServiceID{
206 Namespace: service.Namespace,
207 Name: service.Name,
208 }
209 ss, ok := opw.subscriptions[id]
210 if !ok {
211 return
212 }
213 old := ss.opaquePorts
214 ss.opaquePorts = opw.defaultOpaquePorts
215
216 if portsEqual(old, ss.opaquePorts) {
217 return
218 }
219 for _, listener := range ss.listeners {
220 listener.UpdateService(ss.opaquePorts)
221 }
222 }
223
224 func getServiceOpaquePortsAnnotation(svc *corev1.Service) (map[uint32]struct{}, bool, error) {
225 annotation, ok := svc.Annotations[labels.ProxyOpaquePortsAnnotation]
226 if !ok {
227 return nil, false, nil
228 }
229 opaquePorts := make(map[uint32]struct{})
230 if annotation != "" {
231 for _, portStr := range parseServiceOpaquePorts(annotation, svc.Spec.Ports) {
232 port, err := strconv.ParseUint(portStr, 10, 32)
233 if err != nil {
234 return nil, true, err
235 }
236 opaquePorts[uint32(port)] = struct{}{}
237 }
238 }
239 return opaquePorts, true, nil
240 }
241
242 func parseServiceOpaquePorts(annotation string, sps []corev1.ServicePort) []string {
243 portRanges := util.GetPortRanges(annotation)
244 var values []string
245 for _, pr := range portRanges {
246 port, named := isNamed(pr, sps)
247 if named {
248 values = append(values, strconv.Itoa(int(port)))
249 } else {
250 pr, err := util.ParsePortRange(pr)
251 if err != nil {
252 logging.Warnf("Invalid port range [%v]: %s", pr, err)
253 continue
254 }
255 for i := pr.LowerBound; i <= pr.UpperBound; i++ {
256 values = append(values, strconv.Itoa(i))
257 }
258 }
259 }
260 return values
261 }
262
263
264
265
266 func isNamed(pr string, sps []corev1.ServicePort) (int32, bool) {
267 for _, sp := range sps {
268 if sp.Name == pr {
269 return sp.Port, true
270 }
271 }
272 return 0, false
273 }
274
275 func portsEqual(x, y map[uint32]struct{}) bool {
276 if len(x) != len(y) {
277 return false
278 }
279 for port := range x {
280 _, ok := y[port]
281 if !ok {
282 return false
283 }
284 }
285 return true
286 }
287
View as plain text