1
16
17 package controller
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "sync"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/util/runtime"
30 "k8s.io/apimachinery/pkg/util/wait"
31 corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
32 eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
33 "k8s.io/client-go/tools/events"
34 "k8s.io/client-go/util/retry"
35 "k8s.io/kubernetes/pkg/api/legacyscheme"
36 api "k8s.io/kubernetes/pkg/apis/core"
37 "k8s.io/kubernetes/pkg/apis/core/v1/helper"
38 "k8s.io/kubernetes/pkg/registry/core/rangeallocation"
39 "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
40 netutils "k8s.io/utils/net"
41 )
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 type Repair struct {
59 interval time.Duration
60 serviceClient corev1client.ServicesGetter
61
62 networkByFamily map[v1.IPFamily]*net.IPNet
63 allocatorByFamily map[v1.IPFamily]rangeallocation.RangeRegistry
64
65 leaksByFamily map[v1.IPFamily]map[string]int
66 broadcaster events.EventBroadcaster
67 recorder events.EventRecorder
68 }
69
70
71
72 const numRepairsBeforeLeakCleanup = 3
73
74
75
76 func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient eventsv1client.EventsV1Interface, network *net.IPNet, alloc rangeallocation.RangeRegistry, secondaryNetwork *net.IPNet, secondaryAlloc rangeallocation.RangeRegistry) *Repair {
77 eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient})
78 recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "ipallocator-repair-controller")
79
80
81 networkByFamily := make(map[v1.IPFamily]*net.IPNet)
82 allocatorByFamily := make(map[v1.IPFamily]rangeallocation.RangeRegistry)
83 leaksByFamily := make(map[v1.IPFamily]map[string]int)
84
85 primary := v1.IPv4Protocol
86 secondary := v1.IPv6Protocol
87 if netutils.IsIPv6(network.IP) {
88 primary = v1.IPv6Protocol
89 }
90
91 networkByFamily[primary] = network
92 allocatorByFamily[primary] = alloc
93 leaksByFamily[primary] = make(map[string]int)
94
95 if secondaryNetwork != nil && secondaryNetwork.IP != nil {
96 if primary == v1.IPv6Protocol {
97 secondary = v1.IPv4Protocol
98 }
99 networkByFamily[secondary] = secondaryNetwork
100 allocatorByFamily[secondary] = secondaryAlloc
101 leaksByFamily[secondary] = make(map[string]int)
102 }
103
104 registerMetrics()
105
106 return &Repair{
107 interval: interval,
108 serviceClient: serviceClient,
109
110 networkByFamily: networkByFamily,
111 allocatorByFamily: allocatorByFamily,
112
113 leaksByFamily: leaksByFamily,
114 broadcaster: eventBroadcaster,
115 recorder: recorder,
116 }
117 }
118
119
120 func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
121 c.broadcaster.StartRecordingToSink(stopCh)
122 defer c.broadcaster.Shutdown()
123
124 var once sync.Once
125 wait.Until(func() {
126 if err := c.runOnce(); err != nil {
127 runtime.HandleError(err)
128 return
129 }
130 once.Do(onFirstSuccess)
131 }, c.interval, stopCh)
132 }
133
134
135 func (c *Repair) runOnce() error {
136 return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
137 err := c.doRunOnce()
138 if err != nil {
139 clusterIPRepairReconcileErrors.Inc()
140 }
141 return err
142 })
143 }
144
145
146 func (c *Repair) doRunOnce() error {
147
148
149
150
151
152
153
154
155
156 snapshotByFamily := make(map[v1.IPFamily]*api.RangeAllocation)
157 storedByFamily := make(map[v1.IPFamily]ipallocator.Interface)
158
159 err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
160 for family, allocator := range c.allocatorByFamily {
161
162 if _, ok := snapshotByFamily[family]; !ok {
163 snapshot, err := allocator.Get()
164 if err != nil {
165 return false, err
166 }
167
168 snapshotByFamily[family] = snapshot
169 }
170 }
171 return true, nil
172 })
173
174 if err != nil {
175 return fmt.Errorf("unable to refresh the service IP block: %v", err)
176 }
177
178
179 for family, snapshot := range snapshotByFamily {
180 if snapshot.Range == "" {
181 snapshot.Range = c.networkByFamily[family].String()
182 }
183 }
184
185
186 for family, snapshot := range snapshotByFamily {
187 stored, err := ipallocator.NewFromSnapshot(snapshot)
188 if err != nil {
189 return fmt.Errorf("unable to rebuild allocator from snapshots for family:%v with error:%v", family, err)
190 }
191
192 storedByFamily[family] = stored
193 }
194
195 rebuiltByFamily := make(map[v1.IPFamily]*ipallocator.Range)
196
197 for family, network := range c.networkByFamily {
198 rebuilt, err := ipallocator.NewInMemory(network)
199 if err != nil {
200 return fmt.Errorf("unable to create CIDR range for family %v: %v", family, err)
201 }
202
203 rebuiltByFamily[family] = rebuilt
204 }
205
206
207
208
209
210 list, err := c.serviceClient.Services(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
211 if err != nil {
212 return fmt.Errorf("unable to refresh the service IP block: %v", err)
213 }
214
215 getFamilyByIP := func(ip net.IP) v1.IPFamily {
216 if netutils.IsIPv6(ip) {
217 return v1.IPv6Protocol
218 }
219 return v1.IPv4Protocol
220 }
221
222
223 for _, svc := range list.Items {
224 if !helper.IsServiceIPSet(&svc) {
225
226 continue
227 }
228
229 for _, ip := range svc.Spec.ClusterIPs {
230 ip := netutils.ParseIPSloppy(ip)
231 if ip == nil {
232
233 clusterIPRepairIPErrors.WithLabelValues("invalid").Inc()
234 c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate service", ip)
235 runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", ip, svc.Name, svc.Namespace))
236 continue
237 }
238
239 family := getFamilyByIP(ip)
240 if _, ok := rebuiltByFamily[family]; !ok {
241
242 clusterIPRepairIPErrors.WithLabelValues("invalid").Inc()
243 c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate service", ip, family)
244 runtime.HandleError(fmt.Errorf("the cluster IP %s(%s) for service %s/%s is of ip family that is no longer configured on cluster; please recreate", ip, family, svc.Name, svc.Namespace))
245 continue
246 }
247
248
249 actualAlloc := rebuiltByFamily[family]
250 switch err := actualAlloc.Allocate(ip); err {
251 case nil:
252 actualStored := storedByFamily[family]
253 if actualStored.Has(ip) {
254
255 actualStored.Release(ip)
256 } else {
257
258 clusterIPRepairIPErrors.WithLabelValues("repair").Inc()
259 c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s is not allocated; repairing", family, ip)
260 runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not allocated; repairing", family, ip, svc.Name, svc.Namespace))
261 }
262 delete(c.leaksByFamily[family], ip.String())
263 case ipallocator.ErrAllocated:
264
265 clusterIPRepairIPErrors.WithLabelValues("duplicate").Inc()
266 c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip)
267 runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to multiple services; please recreate", family, ip, svc.Name, svc.Namespace))
268 case err.(*ipallocator.ErrNotInRange):
269
270 clusterIPRepairIPErrors.WithLabelValues("outOfRange").Inc()
271 c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]:%s is not within the service CIDR %s; please recreate service", family, ip, c.networkByFamily[family])
272 runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not within the service CIDR %s; please recreate", family, ip, svc.Name, svc.Namespace, c.networkByFamily[family]))
273 case ipallocator.ErrFull:
274
275 clusterIPRepairIPErrors.WithLabelValues("full").Inc()
276 cidr := actualAlloc.CIDR()
277 c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ServiceCIDRFull", "ClusterIPAllocation", "Service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
278 return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
279 default:
280 clusterIPRepairIPErrors.WithLabelValues("unknown").Inc()
281 c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate cluster IP [%v]:%s due to an unknown error", family, ip)
282 return fmt.Errorf("unable to allocate cluster IP [%v]:%s for service %s/%s due to an unknown error, exiting: %v", family, ip, svc.Name, svc.Namespace, err)
283 }
284 }
285 }
286
287
288 for family, leaks := range c.leaksByFamily {
289 c.checkLeaked(leaks, storedByFamily[family], rebuiltByFamily[family])
290 }
291
292
293
294 for family, rebuilt := range rebuiltByFamily {
295 err = c.saveSnapShot(rebuilt, c.allocatorByFamily[family], snapshotByFamily[family])
296 if err != nil {
297 return err
298 }
299 }
300
301 return nil
302 }
303
304 func (c *Repair) saveSnapShot(rebuilt *ipallocator.Range, alloc rangeallocation.RangeRegistry, snapshot *api.RangeAllocation) error {
305 if err := rebuilt.Snapshot(snapshot); err != nil {
306 return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err)
307 }
308 if err := alloc.CreateOrUpdate(snapshot); err != nil {
309 if errors.IsConflict(err) {
310 return err
311 }
312 return fmt.Errorf("unable to persist the updated service IP allocations: %v", err)
313 }
314
315 return nil
316 }
317
318 func (c *Repair) checkLeaked(leaks map[string]int, stored ipallocator.Interface, rebuilt *ipallocator.Range) {
319
320 stored.ForEach(func(ip net.IP) {
321 count, found := leaks[ip.String()]
322 switch {
323 case !found:
324
325 runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked: flagging for later clean up", ip))
326 count = numRepairsBeforeLeakCleanup - 1
327 fallthrough
328 case count > 0:
329
330 leaks[ip.String()] = count - 1
331 if err := rebuilt.Allocate(ip); err != nil {
332
333 runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err))
334 }
335 default:
336 clusterIPRepairIPErrors.WithLabelValues("leak").Inc()
337
338 runtime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip))
339 }
340 })
341 }
342
View as plain text