1 package watcher
2
3 import (
4 "sync"
5 "time"
6
7 sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
8 splisters "github.com/linkerd/linkerd2/controller/gen/client/listers/serviceprofile/v1alpha2"
9 "github.com/linkerd/linkerd2/controller/k8s"
10 "github.com/prometheus/client_golang/prometheus"
11 logging "github.com/sirupsen/logrus"
12 apierrors "k8s.io/apimachinery/pkg/api/errors"
13 "k8s.io/client-go/tools/cache"
14 )
15
16 type (
17
18
19
20 ProfileWatcher struct {
21 profileLister splisters.ServiceProfileLister
22 profiles map[ProfileID]*profilePublisher
23
24 log *logging.Entry
25 sync.RWMutex
26 }
27
28 profilePublisher struct {
29 profile *sp.ServiceProfile
30 listeners []ProfileUpdateListener
31
32 log *logging.Entry
33 profileMetrics metrics
34
35 sync.Mutex
36 }
37
38
39 ProfileUpdateListener interface {
40 Update(profile *sp.ServiceProfile)
41 }
42 )
43
44 var profileVecs = newMetricsVecs("profile", []string{"namespace", "profile"})
45
46
47
48 func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) (*ProfileWatcher, error) {
49 watcher := &ProfileWatcher{
50 profileLister: k8sAPI.SP().Lister(),
51 profiles: make(map[ProfileID]*profilePublisher),
52 log: log.WithField("component", "profile-watcher"),
53 }
54
55 _, err := k8sAPI.SP().Informer().AddEventHandler(
56 cache.ResourceEventHandlerFuncs{
57 AddFunc: watcher.addProfile,
58 UpdateFunc: watcher.updateProfile,
59 DeleteFunc: watcher.deleteProfile,
60 },
61 )
62 if err != nil {
63 return nil, err
64 }
65
66 return watcher, nil
67 }
68
69
70
71
72
73
74
75
76 func (pw *ProfileWatcher) Subscribe(id ProfileID, listener ProfileUpdateListener) error {
77 pw.log.Debugf("Establishing watch on profile %s", id)
78
79 publisher := pw.getOrNewProfilePublisher(id, nil)
80
81 publisher.subscribe(listener)
82 return nil
83 }
84
85
86 func (pw *ProfileWatcher) Unsubscribe(id ProfileID, listener ProfileUpdateListener) {
87 pw.log.Debugf("Stopping watch on profile %s", id)
88
89 publisher, ok := pw.getProfilePublisher(id)
90 if !ok {
91 pw.log.Errorf("cannot unsubscribe from unknown service [%s]", id)
92 }
93 publisher.unsubscribe(listener)
94 }
95
96 func (pw *ProfileWatcher) addProfile(obj interface{}) {
97 profile := obj.(*sp.ServiceProfile)
98 id := ProfileID{
99 Namespace: profile.Namespace,
100 Name: profile.Name,
101 }
102
103 publisher := pw.getOrNewProfilePublisher(id, profile)
104
105 publisher.update(profile)
106 }
107
108 func (pw *ProfileWatcher) updateProfile(old interface{}, new interface{}) {
109 oldProfile := old.(*sp.ServiceProfile)
110 newProfile := new.(*sp.ServiceProfile)
111
112 oldUpdated := latestUpdated(oldProfile.ManagedFields)
113 updated := latestUpdated(newProfile.ManagedFields)
114 if !updated.IsZero() && updated != oldUpdated {
115 delta := time.Since(updated)
116 serviceProfileInformerLag.Observe(delta.Seconds())
117 }
118
119 pw.addProfile(new)
120 }
121
122 func (pw *ProfileWatcher) deleteProfile(obj interface{}) {
123 profile, ok := obj.(*sp.ServiceProfile)
124 if !ok {
125 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
126 if !ok {
127 pw.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
128 return
129 }
130 profile, ok = tombstone.Obj.(*sp.ServiceProfile)
131 if !ok {
132 pw.log.Errorf("DeletedFinalStateUnknown contained object that is not a ServiceProfile %#v", obj)
133 return
134 }
135 }
136
137 id := ProfileID{
138 Namespace: profile.Namespace,
139 Name: profile.Name,
140 }
141
142 publisher, ok := pw.getProfilePublisher(id)
143 if ok {
144 publisher.update(nil)
145 }
146 }
147
148 func (pw *ProfileWatcher) getOrNewProfilePublisher(id ProfileID, profile *sp.ServiceProfile) *profilePublisher {
149 pw.Lock()
150 defer pw.Unlock()
151
152 publisher, ok := pw.profiles[id]
153 if !ok {
154 if profile == nil {
155 var err error
156 profile, err = pw.profileLister.ServiceProfiles(id.Namespace).Get(id.Name)
157 if err != nil && !apierrors.IsNotFound(err) {
158 pw.log.Errorf("error getting service profile: %s", err)
159 }
160 if err != nil {
161 profile = nil
162 }
163 }
164
165 publisher = &profilePublisher{
166 profile: profile,
167 listeners: make([]ProfileUpdateListener, 0),
168 log: pw.log.WithFields(logging.Fields{
169 "component": "profile-publisher",
170 "ns": id.Namespace,
171 "profile": id.Name,
172 }),
173 profileMetrics: profileVecs.newMetrics(prometheus.Labels{
174 "namespace": id.Namespace,
175 "profile": id.Name,
176 }),
177 }
178 pw.profiles[id] = publisher
179 }
180
181 return publisher
182 }
183
184 func (pw *ProfileWatcher) getProfilePublisher(id ProfileID) (publisher *profilePublisher, ok bool) {
185 pw.RLock()
186 defer pw.RUnlock()
187 publisher, ok = pw.profiles[id]
188 return
189 }
190
191
192
193
194
195 func (pp *profilePublisher) subscribe(listener ProfileUpdateListener) {
196 pp.Lock()
197 defer pp.Unlock()
198
199 pp.listeners = append(pp.listeners, listener)
200 listener.Update(pp.profile)
201
202 pp.profileMetrics.setSubscribers(len(pp.listeners))
203 }
204
205
206
207 func (pp *profilePublisher) unsubscribe(listener ProfileUpdateListener) {
208 pp.Lock()
209 defer pp.Unlock()
210
211 for i, item := range pp.listeners {
212 if item == listener {
213
214 n := len(pp.listeners)
215 pp.listeners[i] = pp.listeners[n-1]
216 pp.listeners[n-1] = nil
217 pp.listeners = pp.listeners[:n-1]
218 break
219 }
220 }
221
222 pp.profileMetrics.setSubscribers(len(pp.listeners))
223 }
224
225 func (pp *profilePublisher) update(profile *sp.ServiceProfile) {
226 pp.Lock()
227 defer pp.Unlock()
228 pp.log.Debug("Updating profile")
229
230 pp.profile = profile
231 for _, listener := range pp.listeners {
232 listener.Update(profile)
233 }
234
235 pp.profileMetrics.incUpdates()
236 }
237
View as plain text