1
16
17 package reconcilers
18
19
23
24 import (
25 "fmt"
26 "net"
27 "path"
28 "sync"
29 "sync/atomic"
30 "time"
31
32 "k8s.io/klog/v2"
33
34 corev1 "k8s.io/api/core/v1"
35 "k8s.io/apimachinery/pkg/api/errors"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 kruntime "k8s.io/apimachinery/pkg/runtime"
38 apirequest "k8s.io/apiserver/pkg/endpoints/request"
39 "k8s.io/apiserver/pkg/registry/rest"
40 "k8s.io/apiserver/pkg/storage"
41 "k8s.io/apiserver/pkg/storage/storagebackend"
42 storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
43 endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
44 )
45
46
47 type Leases interface {
48
49 ListLeases() ([]string, error)
50
51
52 UpdateLease(ip string) error
53
54
55 RemoveLease(ip string) error
56
57
58 Destroy()
59 }
60
61 type storageLeases struct {
62 storage storage.Interface
63 destroyFn func()
64 baseKey string
65 leaseTime time.Duration
66 }
67
68 var _ Leases = &storageLeases{}
69
70
71 func (s *storageLeases) ListLeases() ([]string, error) {
72 ipInfoList := &corev1.EndpointsList{}
73 storageOpts := storage.ListOptions{
74 ResourceVersion: "0",
75 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
76 Predicate: storage.Everything,
77 Recursive: true,
78 }
79 if err := s.storage.GetList(apirequest.NewDefaultContext(), s.baseKey, storageOpts, ipInfoList); err != nil {
80 return nil, err
81 }
82
83 ipList := make([]string, 0, len(ipInfoList.Items))
84 for _, ip := range ipInfoList.Items {
85 if len(ip.Subsets) > 0 && len(ip.Subsets[0].Addresses) > 0 && len(ip.Subsets[0].Addresses[0].IP) > 0 {
86 ipList = append(ipList, ip.Subsets[0].Addresses[0].IP)
87 }
88 }
89
90 klog.V(6).Infof("Current master IPs listed in storage are %v", ipList)
91
92 return ipList, nil
93 }
94
95
96
97 func (s *storageLeases) UpdateLease(ip string) error {
98 key := path.Join(s.baseKey, ip)
99 return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
100
101 existing := input.(*corev1.Endpoints)
102 existing.Subsets = []corev1.EndpointSubset{
103 {
104 Addresses: []corev1.EndpointAddress{{IP: ip}},
105 },
106 }
107
108
109 leaseTime := uint64(s.leaseTime / time.Second)
110
111
112
113
114
115 existing.Generation++
116
117 klog.V(6).Infof("Resetting TTL on master IP %q listed in storage to %v", ip, leaseTime)
118
119 return existing, &leaseTime, nil
120 }, nil)
121 }
122
123
124 func (s *storageLeases) RemoveLease(ip string) error {
125 key := path.Join(s.baseKey, ip)
126 return s.storage.Delete(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc, nil)
127 }
128
129 func (s *storageLeases) Destroy() {
130 s.destroyFn()
131 }
132
133
134 func NewLeases(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (Leases, error) {
135
136
137 leaseStorage, destroyFn, err := storagefactory.Create(*config, nil, nil, "")
138 if err != nil {
139 return nil, fmt.Errorf("error creating storage factory: %v", err)
140 }
141 var once sync.Once
142 return &storageLeases{
143 storage: leaseStorage,
144 destroyFn: func() { once.Do(destroyFn) },
145 baseKey: baseKey,
146 leaseTime: leaseTime,
147 }, nil
148 }
149
150 type leaseEndpointReconciler struct {
151 epAdapter EndpointsAdapter
152 masterLeases Leases
153 stopReconcilingCalled atomic.Bool
154 reconcilingLock sync.Mutex
155 }
156
157
158 func NewLeaseEndpointReconciler(epAdapter EndpointsAdapter, masterLeases Leases) EndpointReconciler {
159 return &leaseEndpointReconciler{
160 epAdapter: epAdapter,
161 masterLeases: masterLeases,
162 }
163 }
164
165
166
167
168
169
170
171
172 func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
173
174 if r.stopReconcilingCalled.Load() {
175 return nil
176 }
177
178
179 r.reconcilingLock.Lock()
180 defer r.reconcilingLock.Unlock()
181
182
183
184
185 if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
186 return err
187 }
188
189 return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
190 }
191
192
193
194 func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
195 e, err := r.epAdapter.Get(corev1.NamespaceDefault, serviceName, metav1.GetOptions{})
196 shouldCreate := false
197 if err != nil {
198 if !errors.IsNotFound(err) {
199 return err
200 }
201
202
203 if r.stopReconcilingCalled.Load() {
204 return nil
205 }
206
207 shouldCreate = true
208 e = &corev1.Endpoints{
209 ObjectMeta: metav1.ObjectMeta{
210 Name: serviceName,
211 Namespace: corev1.NamespaceDefault,
212 },
213 }
214 }
215
216
217 masterIPs, err := r.masterLeases.ListLeases()
218 if err != nil {
219 return err
220 }
221
222
223
224
225
226
227 if !r.stopReconcilingCalled.Load() && len(masterIPs) == 0 {
228 return fmt.Errorf("no API server IP addresses were listed in storage, refusing to erase all endpoints for the kubernetes Service")
229 }
230
231
232
233 skipMirrorChanged := setSkipMirrorTrue(e)
234
235
236 formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
237 if !skipMirrorChanged && formatCorrect && ipCorrect && portsCorrect {
238 return r.epAdapter.EnsureEndpointSliceFromEndpoints(corev1.NamespaceDefault, e)
239 }
240
241 if !formatCorrect {
242
243 e.Subsets = []corev1.EndpointSubset{{
244 Addresses: []corev1.EndpointAddress{},
245 Ports: endpointPorts,
246 }}
247 }
248
249 if !formatCorrect || !ipCorrect {
250
251 e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
252 for ind, ip := range masterIPs {
253 e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
254 }
255
256
257 e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
258 }
259
260 if len(e.Subsets) != 0 && !portsCorrect {
261
262 e.Subsets[0].Ports = endpointPorts
263 }
264
265 klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs)
266 if shouldCreate {
267 if _, err = r.epAdapter.Create(corev1.NamespaceDefault, e); errors.IsAlreadyExists(err) {
268 err = nil
269 }
270 } else {
271 _, err = r.epAdapter.Update(corev1.NamespaceDefault, e)
272 }
273 return err
274 }
275
276
277
278
279
280
281
282
283
284 func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []string, ports []corev1.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) {
285 if len(e.Subsets) != 1 {
286 return false, false, false
287 }
288 sub := &e.Subsets[0]
289 portsCorrect = true
290 if reconcilePorts {
291 if len(sub.Ports) != len(ports) {
292 portsCorrect = false
293 } else {
294 for i, port := range ports {
295 if port != sub.Ports[i] {
296 portsCorrect = false
297 break
298 }
299 }
300 }
301 }
302
303 ipsCorrect = true
304 if len(sub.Addresses) != len(expectedIPs) {
305 ipsCorrect = false
306 } else {
307
308
309
310 presentAddrs := make(map[string]bool, len(expectedIPs))
311 for _, ip := range expectedIPs {
312 presentAddrs[ip] = false
313 }
314
315
316 for _, addr := range sub.Addresses {
317 if alreadySeen, ok := presentAddrs[addr.IP]; alreadySeen || !ok {
318 ipsCorrect = false
319 break
320 }
321
322 presentAddrs[addr.IP] = true
323 }
324 }
325
326 return true, ipsCorrect, portsCorrect
327 }
328
329 func (r *leaseEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
330
331 r.reconcilingLock.Lock()
332 defer r.reconcilingLock.Unlock()
333
334 if err := r.masterLeases.RemoveLease(ip.String()); err != nil {
335 return err
336 }
337
338 return r.doReconcile(serviceName, endpointPorts, true)
339 }
340
341 func (r *leaseEndpointReconciler) StopReconciling() {
342 r.stopReconcilingCalled.Store(true)
343 }
344
345 func (r *leaseEndpointReconciler) Destroy() {
346 r.masterLeases.Destroy()
347 }
348
View as plain text