1
16
17
18 package reconcilers
19
20 import (
21 "net"
22 "sync"
23
24 corev1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/client-go/util/retry"
28 "k8s.io/klog/v2"
29 endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
30 )
31
32
33
34 type masterCountEndpointReconciler struct {
35 masterCount int
36 epAdapter EndpointsAdapter
37 stopReconcilingCalled bool
38 reconcilingLock sync.Mutex
39 }
40
41
42
43 func NewMasterCountEndpointReconciler(masterCount int, epAdapter EndpointsAdapter) EndpointReconciler {
44 return &masterCountEndpointReconciler{
45 masterCount: masterCount,
46 epAdapter: epAdapter,
47 }
48 }
49
50
51
52
53
54
55
56
57
58
59
60
61
62 func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
63 r.reconcilingLock.Lock()
64 defer r.reconcilingLock.Unlock()
65
66 if r.stopReconcilingCalled {
67 return nil
68 }
69
70 e, err := r.epAdapter.Get(metav1.NamespaceDefault, serviceName, metav1.GetOptions{})
71 if err != nil {
72 e = &corev1.Endpoints{
73 ObjectMeta: metav1.ObjectMeta{
74 Name: serviceName,
75 Namespace: metav1.NamespaceDefault,
76 },
77 }
78 }
79
80
81
82 skipMirrorChanged := setSkipMirrorTrue(e)
83
84 if errors.IsNotFound(err) {
85
86 e.Subsets = []corev1.EndpointSubset{{
87 Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
88 Ports: endpointPorts,
89 }}
90 _, err = r.epAdapter.Create(metav1.NamespaceDefault, e)
91 return err
92 }
93
94
95
96 formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts)
97 if !formatCorrect {
98
99 e.Subsets = []corev1.EndpointSubset{{
100 Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
101 Ports: endpointPorts,
102 }}
103 klog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e)
104 _, err = r.epAdapter.Update(metav1.NamespaceDefault, e)
105 return err
106 }
107
108 if !skipMirrorChanged && ipCorrect && portsCorrect {
109 return r.epAdapter.EnsureEndpointSliceFromEndpoints(metav1.NamespaceDefault, e)
110 }
111 if !ipCorrect {
112
113 e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, corev1.EndpointAddress{IP: ip.String()})
114
115
116 e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
117
118
119
120
121
122 if addrs := &e.Subsets[0].Addresses; len(*addrs) > r.masterCount {
123
124 for i, addr := range *addrs {
125 if addr.IP == ip.String() {
126 for len(*addrs) > r.masterCount {
127
128 remove := (i + 1) % len(*addrs)
129 *addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...)
130 }
131 break
132 }
133 }
134 }
135 }
136 if !portsCorrect {
137
138 e.Subsets[0].Ports = endpointPorts
139 }
140 klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
141 _, err = r.epAdapter.Update(metav1.NamespaceDefault, e)
142 return err
143 }
144
145 func (r *masterCountEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
146 r.reconcilingLock.Lock()
147 defer r.reconcilingLock.Unlock()
148
149 e, err := r.epAdapter.Get(metav1.NamespaceDefault, serviceName, metav1.GetOptions{})
150 if err != nil {
151 if errors.IsNotFound(err) {
152
153 return nil
154 }
155 return err
156 }
157
158 if len(e.Subsets) == 0 {
159
160 return nil
161 }
162
163 new := []corev1.EndpointAddress{}
164 for _, addr := range e.Subsets[0].Addresses {
165 if addr.IP != ip.String() {
166 new = append(new, addr)
167 }
168 }
169 e.Subsets[0].Addresses = new
170 e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
171 err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
172 _, err := r.epAdapter.Update(metav1.NamespaceDefault, e)
173 return err
174 })
175 return err
176 }
177
178 func (r *masterCountEndpointReconciler) StopReconciling() {
179 r.reconcilingLock.Lock()
180 defer r.reconcilingLock.Unlock()
181 r.stopReconcilingCalled = true
182 }
183
184 func (r *masterCountEndpointReconciler) Destroy() {
185 }
186
187
188
189
190
191
192
193
194
195 func checkEndpointSubsetFormat(e *corev1.Endpoints, ip string, ports []corev1.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) {
196 if len(e.Subsets) != 1 {
197 return false, false, false
198 }
199 sub := &e.Subsets[0]
200 portsCorrect = true
201 if reconcilePorts {
202 if len(sub.Ports) != len(ports) {
203 portsCorrect = false
204 }
205 for i, port := range ports {
206 if len(sub.Ports) <= i || port != sub.Ports[i] {
207 portsCorrect = false
208 break
209 }
210 }
211 }
212 for _, addr := range sub.Addresses {
213 if addr.IP == ip {
214 ipCorrect = len(sub.Addresses) <= count
215 break
216 }
217 }
218 return true, ipCorrect, portsCorrect
219 }
220
View as plain text