1
16
17 package proxy
18
19 import (
20 "reflect"
21 "sync"
22
23 v1 "k8s.io/api/core/v1"
24 "k8s.io/apimachinery/pkg/types"
25 "k8s.io/apimachinery/pkg/util/sets"
26 "k8s.io/client-go/tools/events"
27 "k8s.io/klog/v2"
28 "k8s.io/kubernetes/pkg/proxy/metrics"
29 proxyutil "k8s.io/kubernetes/pkg/proxy/util"
30 )
31
32
33
34 type ServiceChangeTracker struct {
35
36 lock sync.Mutex
37
38 items map[types.NamespacedName]*serviceChange
39
40
41
42 makeServiceInfo makeServicePortFunc
43
44
45
46 processServiceMapChange processServiceMapChangeFunc
47
48 ipFamily v1.IPFamily
49 recorder events.EventRecorder
50 }
51
52 type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort
53 type processServiceMapChangeFunc func(previous, current ServicePortMap)
54
55
56
57
58 type serviceChange struct {
59 previous ServicePortMap
60 current ServicePortMap
61 }
62
63
64 func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker {
65 return &ServiceChangeTracker{
66 items: make(map[types.NamespacedName]*serviceChange),
67 makeServiceInfo: makeServiceInfo,
68 recorder: recorder,
69 ipFamily: ipFamily,
70 processServiceMapChange: processServiceMapChange,
71 }
72 }
73
74
75
76
77
78
79 func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
80
81 if previous == nil && current == nil {
82 return false
83 }
84
85 svc := current
86 if svc == nil {
87 svc = previous
88 }
89 metrics.ServiceChangesTotal.Inc()
90 namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
91
92 sct.lock.Lock()
93 defer sct.lock.Unlock()
94
95 change, exists := sct.items[namespacedName]
96 if !exists {
97 change = &serviceChange{}
98 change.previous = sct.serviceToServiceMap(previous)
99 sct.items[namespacedName] = change
100 }
101 change.current = sct.serviceToServiceMap(current)
102
103 if reflect.DeepEqual(change.previous, change.current) {
104 delete(sct.items, namespacedName)
105 } else {
106 klog.V(4).InfoS("Service updated ports", "service", klog.KObj(svc), "portCount", len(change.current))
107 }
108 metrics.ServiceChangesPending.Set(float64(len(sct.items)))
109 return len(sct.items) > 0
110 }
111
112
113 type ServicePortMap map[ServicePortName]ServicePort
114
115
116 type UpdateServiceMapResult struct {
117
118
119 UpdatedServices sets.Set[types.NamespacedName]
120
121
122
123
124 DeletedUDPClusterIPs sets.Set[string]
125 }
126
127
128
129 func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 {
130
131
132 ports := make(map[types.NamespacedName]uint16)
133 for svcPortName, info := range sm {
134 if info.HealthCheckNodePort() != 0 {
135 ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
136 }
137 }
138 return ports
139 }
140
141
142
143
144 func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap {
145 if service == nil {
146 return nil
147 }
148
149 if proxyutil.ShouldSkipService(service) {
150 return nil
151 }
152
153 clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service)
154 if clusterIP == "" {
155 return nil
156 }
157
158 svcPortMap := make(ServicePortMap)
159 svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
160 for i := range service.Spec.Ports {
161 servicePort := &service.Spec.Ports[i]
162 svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol}
163 baseSvcInfo := newBaseServiceInfo(service, sct.ipFamily, servicePort)
164 if sct.makeServiceInfo != nil {
165 svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo)
166 } else {
167 svcPortMap[svcPortName] = baseSvcInfo
168 }
169 }
170 return svcPortMap
171 }
172
173
174
175
176 func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult {
177 sct.lock.Lock()
178 defer sct.lock.Unlock()
179
180 result := UpdateServiceMapResult{
181 UpdatedServices: sets.New[types.NamespacedName](),
182 DeletedUDPClusterIPs: sets.New[string](),
183 }
184
185 for nn, change := range sct.items {
186 if sct.processServiceMapChange != nil {
187 sct.processServiceMapChange(change.previous, change.current)
188 }
189 result.UpdatedServices.Insert(nn)
190
191 sm.merge(change.current)
192
193
194 change.previous.filter(change.current)
195 sm.unmerge(change.previous, result.DeletedUDPClusterIPs)
196 }
197
198 sct.items = make(map[types.NamespacedName]*serviceChange)
199 metrics.ServiceChangesPending.Set(0)
200
201 return result
202 }
203
204
205
206
207 func (sm *ServicePortMap) merge(other ServicePortMap) {
208 for svcPortName, info := range other {
209 _, exists := (*sm)[svcPortName]
210 if !exists {
211 klog.V(4).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info)
212 } else {
213 klog.V(4).InfoS("Updating existing service port", "portName", svcPortName, "servicePort", info)
214 }
215 (*sm)[svcPortName] = info
216 }
217 }
218
219
220 func (sm *ServicePortMap) filter(other ServicePortMap) {
221 for svcPortName := range *sm {
222
223 if _, ok := other[svcPortName]; ok {
224 delete(*sm, svcPortName)
225 }
226 }
227 }
228
229
230
231 func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.Set[string]) {
232 for svcPortName := range other {
233 info, exists := (*sm)[svcPortName]
234 if exists {
235 klog.V(4).InfoS("Removing service port", "portName", svcPortName)
236 if info.Protocol() == v1.ProtocolUDP {
237 deletedUDPClusterIPs.Insert(info.ClusterIP().String())
238 }
239 delete(*sm, svcPortName)
240 } else {
241 klog.ErrorS(nil, "Service port does not exists", "portName", svcPortName)
242 }
243 }
244 }
245
View as plain text