1 package externalworkload
2
3 import (
4 "reflect"
5
6 ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
7 discoveryv1 "k8s.io/api/discovery/v1"
8 "k8s.io/apimachinery/pkg/labels"
9 "k8s.io/apimachinery/pkg/util/sets"
10 "k8s.io/client-go/tools/cache"
11 )
12
13 func (ec *EndpointsController) getServicesToUpdateOnExternalWorkloadChange(old, cur interface{}) sets.Set[string] {
14 newEw, newEwOk := cur.(*ewv1beta1.ExternalWorkload)
15 oldEw, oldEwOk := old.(*ewv1beta1.ExternalWorkload)
16
17 if !oldEwOk {
18 ec.log.Errorf("Expected (cur) to be an EndpointSlice in getServicesToUpdateOnExternalWorkloadChange(), got type: %T", cur)
19 return sets.Set[string]{}
20 }
21
22 if !newEwOk {
23 ec.log.Errorf("Expected (old) to be an EndpointSlice in getServicesToUpdateOnExternalWorkloadChange(), got type: %T", old)
24 return sets.Set[string]{}
25 }
26
27 if newEw.ResourceVersion == oldEw.ResourceVersion {
28
29
30 return sets.Set[string]{}
31 }
32
33 ewChanged, labelsChanged := ewEndpointsChanged(oldEw, newEw)
34 if !ewChanged && !labelsChanged {
35 ec.log.Errorf("skipping update; nothing has changed between old rv %s and new rv %s", oldEw.ResourceVersion, newEw.ResourceVersion)
36 return sets.Set[string]{}
37 }
38
39 services, err := ec.getExternalWorkloadSvcMembership(newEw)
40 if err != nil {
41 ec.log.Errorf("unable to get pod %s/%s's service memberships: %v", newEw.Namespace, newEw.Name, err)
42 return sets.Set[string]{}
43 }
44
45 if labelsChanged {
46 oldServices, err := ec.getExternalWorkloadSvcMembership(oldEw)
47 if err != nil {
48 ec.log.Errorf("unable to get pod %s/%s's service memberships: %v", oldEw.Namespace, oldEw.Name, err)
49 }
50 services = determineNeededServiceUpdates(oldServices, services, ewChanged)
51 }
52
53 return services
54 }
55
56 func determineNeededServiceUpdates(oldServices, services sets.Set[string], specChanged bool) sets.Set[string] {
57 if specChanged {
58
59 services = services.Union(oldServices)
60 } else {
61
62
63 services = services.Difference(oldServices).Union(oldServices.Difference(services))
64 }
65 return services
66 }
67
68
69
70
71 func (ec *EndpointsController) getExternalWorkloadSvcMembership(workload *ewv1beta1.ExternalWorkload) (sets.Set[string], error) {
72 keys := sets.Set[string]{}
73 services, err := ec.k8sAPI.Svc().Lister().Services(workload.Namespace).List(labels.Everything())
74 if err != nil {
75 return keys, err
76 }
77
78 for _, svc := range services {
79 if svc.Spec.Selector == nil {
80 continue
81 }
82
83
84
85
86
87 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(svc)
88 if err != nil {
89 return sets.Set[string]{}, err
90 }
91
92
93 if labels.ValidatedSetSelector(svc.Spec.Selector).Matches(labels.Set(workload.Labels)) {
94 keys.Insert(key)
95 }
96 }
97
98 return keys, nil
99 }
100
101
102 func (ec *EndpointsController) getEndpointSliceFromDeleteAction(obj interface{}) *discoveryv1.EndpointSlice {
103 if endpointSlice, ok := obj.(*discoveryv1.EndpointSlice); ok {
104
105
106 return endpointSlice
107 }
108
109 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
110 if !ok {
111 ec.log.Errorf("Couldn't get object from tombstone")
112 return nil
113 }
114 endpointSlice, ok := tombstone.Obj.(*discoveryv1.EndpointSlice)
115 if !ok {
116 ec.log.Errorf("Tombstone contained object that is not a EndpointSlice")
117 return nil
118 }
119 return endpointSlice
120 }
121
122
123 func (ec *EndpointsController) getExternalWorkloadFromDeleteAction(obj interface{}) *ewv1beta1.ExternalWorkload {
124 if ew, ok := obj.(*ewv1beta1.ExternalWorkload); ok {
125 return ew
126 }
127
128
129 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
130 if !ok {
131 ec.log.Errorf("couldn't get object from tombstone %#v", obj)
132 return nil
133 }
134
135 ew, ok := tombstone.Obj.(*ewv1beta1.ExternalWorkload)
136 if !ok {
137 ec.log.Errorf("tombstone contained object that is not a ExternalWorkload: %#v", obj)
138 return nil
139 }
140 return ew
141 }
142
143
144
145
146 func ewEndpointsChanged(oldEw, newEw *ewv1beta1.ExternalWorkload) (bool, bool) {
147
148
149 labelsChanged := false
150 if !reflect.DeepEqual(newEw.Labels, oldEw.Labels) {
151 labelsChanged = true
152 }
153
154
155 if newEw.DeletionTimestamp != oldEw.DeletionTimestamp {
156 return true, labelsChanged
157 }
158
159
160
161
162 if IsEwReady(oldEw) != IsEwReady(newEw) {
163 return true, labelsChanged
164 }
165
166
167 if len(oldEw.Spec.WorkloadIPs) != len(newEw.Spec.WorkloadIPs) {
168 return true, labelsChanged
169 }
170 for i := range oldEw.Spec.WorkloadIPs {
171 if oldEw.Spec.WorkloadIPs[i].Ip != newEw.Spec.WorkloadIPs[i].Ip {
172 return true, labelsChanged
173 }
174 }
175
176
177 if len(oldEw.Spec.Ports) != len(newEw.Spec.Ports) {
178 return true, labelsChanged
179 }
180
181
182 portSet := make(map[int32]ewv1beta1.PortSpec)
183 for _, ps := range newEw.Spec.Ports {
184 portSet[ps.Port] = ps
185 }
186
187 for _, oldPs := range oldEw.Spec.Ports {
188
189
190 newPs, ok := portSet[oldPs.Port]
191 if !ok {
192 return true, labelsChanged
193 }
194
195
196
197 if newPs.Name != oldPs.Name || newPs.Protocol != oldPs.Protocol {
198 return true, labelsChanged
199 }
200 }
201
202 return false, labelsChanged
203 }
204
205 func managedByController(es *discoveryv1.EndpointSlice) bool {
206 esManagedBy := es.Labels[discoveryv1.LabelManagedBy]
207 return managedBy == esManagedBy
208 }
209
210 func managedByChanged(endpointSlice1, endpointSlice2 *discoveryv1.EndpointSlice) bool {
211 return managedByController(endpointSlice1) != managedByController(endpointSlice2)
212 }
213
214 func IsEwReady(ew *ewv1beta1.ExternalWorkload) bool {
215 if len(ew.Status.Conditions) == 0 {
216 return false
217 }
218
219
220
221 for i := range ew.Status.Conditions {
222 cond := ew.Status.Conditions[i]
223
224
225 if cond.Type == ewv1beta1.WorkloadReady && cond.Status == ewv1beta1.ConditionTrue {
226 return true
227 }
228 }
229
230 return false
231 }
232
View as plain text