1
16
17 package ipallocator
18
19 import (
20 "fmt"
21 "net"
22 "net/netip"
23 "sync"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/labels"
30 utilerrors "k8s.io/apimachinery/pkg/util/errors"
31 "k8s.io/apimachinery/pkg/util/runtime"
32 "k8s.io/apimachinery/pkg/util/sets"
33 "k8s.io/apimachinery/pkg/util/wait"
34 networkingv1alpha1informers "k8s.io/client-go/informers/networking/v1alpha1"
35 networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
36 networkingv1alpha1listers "k8s.io/client-go/listers/networking/v1alpha1"
37 "k8s.io/client-go/tools/cache"
38 "k8s.io/client-go/util/workqueue"
39 "k8s.io/klog/v2"
40 api "k8s.io/kubernetes/pkg/apis/core"
41 "k8s.io/kubernetes/pkg/util/iptree"
42 netutils "k8s.io/utils/net"
43 )
44
45
46
47
48
49
50
51
52
53 type MetaAllocator struct {
54 client networkingv1alpha1client.NetworkingV1alpha1Interface
55 serviceCIDRLister networkingv1alpha1listers.ServiceCIDRLister
56 serviceCIDRSynced cache.InformerSynced
57 ipAddressLister networkingv1alpha1listers.IPAddressLister
58 ipAddressSynced cache.InformerSynced
59 ipAddressInformer networkingv1alpha1informers.IPAddressInformer
60 queue workqueue.RateLimitingInterface
61
62 internalStopCh chan struct{}
63
64 muTree sync.Mutex
65 tree *iptree.Tree[*Allocator]
66
67 ipFamily api.IPFamily
68 }
69
70 var _ Interface = &MetaAllocator{}
71
72
73
74
75 func NewMetaAllocator(
76 client networkingv1alpha1client.NetworkingV1alpha1Interface,
77 serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer,
78 ipAddressInformer networkingv1alpha1informers.IPAddressInformer,
79 isIPv6 bool,
80 ) (*MetaAllocator, error) {
81
82
83 family := api.IPv4Protocol
84 if isIPv6 {
85 family = api.IPv6Protocol
86 }
87
88 c := &MetaAllocator{
89 client: client,
90 serviceCIDRLister: serviceCIDRInformer.Lister(),
91 serviceCIDRSynced: serviceCIDRInformer.Informer().HasSynced,
92 ipAddressLister: ipAddressInformer.Lister(),
93 ipAddressSynced: ipAddressInformer.Informer().HasSynced,
94 ipAddressInformer: ipAddressInformer,
95 queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: ControllerName}),
96 internalStopCh: make(chan struct{}),
97 tree: iptree.New[*Allocator](),
98 ipFamily: family,
99 }
100
101 _, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
102 AddFunc: c.addServiceCIDR,
103 UpdateFunc: c.updateServiceCIDR,
104 DeleteFunc: c.deleteServiceCIDR,
105 })
106
107 go c.run()
108
109 return c, nil
110 }
111
112 func (c *MetaAllocator) addServiceCIDR(obj interface{}) {
113 key, err := cache.MetaNamespaceKeyFunc(obj)
114 if err == nil {
115 c.queue.Add(key)
116 }
117 }
118 func (c *MetaAllocator) updateServiceCIDR(old, new interface{}) {
119 key, err := cache.MetaNamespaceKeyFunc(new)
120 if err == nil {
121 c.queue.Add(key)
122 }
123 }
124
125 func (c *MetaAllocator) deleteServiceCIDR(obj interface{}) {
126 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
127 if err == nil {
128 c.queue.Add(key)
129 }
130 }
131
132 func (c *MetaAllocator) run() {
133 defer runtime.HandleCrash()
134 defer c.queue.ShutDown()
135 klog.Info("Starting ServiceCIDR Allocator Controller")
136 defer klog.Info("Stopping ServiceCIDR Allocator Controllerr")
137
138
139 if !cache.WaitForCacheSync(c.internalStopCh, c.serviceCIDRSynced, c.ipAddressSynced) {
140 runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
141 return
142 }
143
144
145 go wait.Until(c.runWorker, time.Second, c.internalStopCh)
146
147 <-c.internalStopCh
148 }
149
150 func (c *MetaAllocator) runWorker() {
151 for c.processNextItem() {
152 }
153 }
154
155 func (c *MetaAllocator) processNextItem() bool {
156
157 key, quit := c.queue.Get()
158 if quit {
159 return false
160 }
161 defer c.queue.Done(key)
162
163 err := c.syncTree()
164
165 if err != nil {
166 if c.queue.NumRequeues(key) < 5 {
167 klog.Infof("Error syncing cidr %v: %v", key, err)
168 c.queue.AddRateLimited(key)
169 return true
170 }
171 }
172 c.queue.Forget(key)
173 return true
174 }
175
176
177
178 func (c *MetaAllocator) syncTree() error {
179 now := time.Now()
180 defer func() {
181 klog.V(2).Infof("Finished sync for CIDRs took %v", time.Since(now))
182 }()
183
184 serviceCIDRs, err := c.serviceCIDRLister.List(labels.Everything())
185 if err != nil {
186 return err
187 }
188
189 cidrsSet := sets.New[string]()
190 cidrReady := map[string]bool{}
191 for _, serviceCIDR := range serviceCIDRs {
192 ready := true
193 if !isReady(serviceCIDR) || !serviceCIDR.DeletionTimestamp.IsZero() {
194 ready = false
195 }
196
197 for _, cidr := range serviceCIDR.Spec.CIDRs {
198 if c.ipFamily == api.IPFamily(convertToV1IPFamily(netutils.IPFamilyOfCIDRString(cidr))) {
199 cidrsSet.Insert(cidr)
200 cidrReady[cidr] = ready
201 }
202 }
203 }
204
205
206 treeSet := sets.New[string]()
207 c.muTree.Lock()
208 c.tree.DepthFirstWalk(c.ipFamily == api.IPv6Protocol, func(k netip.Prefix, v *Allocator) bool {
209 v.ready.Store(cidrReady[k.String()])
210 treeSet.Insert(k.String())
211 return false
212 })
213 c.muTree.Unlock()
214 cidrsToRemove := treeSet.Difference(cidrsSet)
215 cidrsToAdd := cidrsSet.Difference(treeSet)
216
217 errs := []error{}
218
219 for _, cidr := range cidrsToAdd.UnsortedList() {
220 _, ipnet, err := netutils.ParseCIDRSloppy(cidr)
221 if err != nil {
222 return err
223 }
224
225 allocator, err := NewIPAllocator(ipnet, c.client, c.ipAddressInformer)
226 if err != nil {
227 errs = append(errs, err)
228 continue
229 }
230 allocator.ready.Store(cidrReady[cidr])
231 prefix, err := netip.ParsePrefix(cidr)
232 if err != nil {
233 return err
234 }
235 c.addAllocator(prefix, allocator)
236 klog.Infof("Created ClusterIP allocator for Service CIDR %s", cidr)
237 }
238
239 for _, cidr := range cidrsToRemove.UnsortedList() {
240 prefix, err := netip.ParsePrefix(cidr)
241 if err != nil {
242 return err
243 }
244 c.deleteAllocator(prefix)
245 }
246
247 return utilerrors.NewAggregate(errs)
248 }
249
250 func (c *MetaAllocator) getAllocator(ip net.IP) (*Allocator, error) {
251 c.muTree.Lock()
252 defer c.muTree.Unlock()
253
254 address := ipToAddr(ip)
255 prefix := netip.PrefixFrom(address, address.BitLen())
256
257
258 _, allocator, ok := c.tree.ShortestPrefixMatch(prefix)
259 if !ok {
260 klog.V(2).Infof("Could not get allocator for IP %s", ip.String())
261 return nil, ErrMismatchedNetwork
262 }
263 return allocator, nil
264 }
265
266 func (c *MetaAllocator) addAllocator(cidr netip.Prefix, allocator *Allocator) {
267 c.muTree.Lock()
268 defer c.muTree.Unlock()
269 c.tree.InsertPrefix(cidr, allocator)
270 }
271
272 func (c *MetaAllocator) deleteAllocator(cidr netip.Prefix) {
273 c.muTree.Lock()
274 defer c.muTree.Unlock()
275 ok := c.tree.DeletePrefix(cidr)
276 if ok {
277 klog.V(3).Infof("CIDR %s deleted", cidr)
278 }
279 }
280
281 func (c *MetaAllocator) AllocateService(service *api.Service, ip net.IP) error {
282 allocator, err := c.getAllocator(ip)
283 if err != nil {
284 return err
285 }
286 return allocator.AllocateService(service, ip)
287 }
288
289 func (c *MetaAllocator) Allocate(ip net.IP) error {
290 allocator, err := c.getAllocator(ip)
291 if err != nil {
292 return err
293 }
294 return allocator.Allocate(ip)
295 }
296
297 func (c *MetaAllocator) AllocateNextService(service *api.Service) (net.IP, error) {
298 c.muTree.Lock()
299 defer c.muTree.Unlock()
300
301
302
303
304
305
306
307 isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
308 for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
309 ip, err := allocator.AllocateNextService(service)
310 if err == nil {
311 return ip, nil
312 }
313 }
314 return nil, ErrFull
315 }
316
317 func (c *MetaAllocator) AllocateNext() (net.IP, error) {
318 c.muTree.Lock()
319 defer c.muTree.Unlock()
320
321
322
323
324
325
326
327 isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
328 for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
329 ip, err := allocator.AllocateNext()
330 if err == nil {
331 return ip, nil
332 }
333 }
334 return nil, ErrFull
335 }
336
337 func (c *MetaAllocator) Release(ip net.IP) error {
338 allocator, err := c.getAllocator(ip)
339 if err != nil {
340 return err
341 }
342 return allocator.Release(ip)
343
344 }
345 func (c *MetaAllocator) ForEach(f func(ip net.IP)) {
346 ipLabelSelector := labels.Set(map[string]string{
347 networkingv1alpha1.LabelIPAddressFamily: string(c.IPFamily()),
348 networkingv1alpha1.LabelManagedBy: ControllerName,
349 }).AsSelectorPreValidated()
350 ips, err := c.ipAddressLister.List(ipLabelSelector)
351 if err != nil {
352 return
353 }
354 for _, ip := range ips {
355 f(netutils.ParseIPSloppy(ip.Name))
356 }
357 }
358
359 func (c *MetaAllocator) CIDR() net.IPNet {
360 return net.IPNet{}
361
362 }
363 func (c *MetaAllocator) IPFamily() api.IPFamily {
364 return c.ipFamily
365 }
366 func (c *MetaAllocator) Has(ip net.IP) bool {
367 allocator, err := c.getAllocator(ip)
368 if err != nil {
369 return false
370 }
371 return allocator.Has(ip)
372 }
373 func (c *MetaAllocator) Destroy() {
374 select {
375 case <-c.internalStopCh:
376 default:
377 close(c.internalStopCh)
378 }
379 }
380
381
382 func (c *MetaAllocator) Used() int {
383 ipLabelSelector := labels.Set(map[string]string{
384 networkingv1alpha1.LabelIPAddressFamily: string(c.IPFamily()),
385 networkingv1alpha1.LabelManagedBy: ControllerName,
386 }).AsSelectorPreValidated()
387 ips, err := c.ipAddressLister.List(ipLabelSelector)
388 if err != nil {
389 return 0
390 }
391 return len(ips)
392 }
393
394
395 func (c *MetaAllocator) Free() int {
396 c.muTree.Lock()
397 defer c.muTree.Unlock()
398
399 size := 0
400 isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
401 for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
402 size += int(allocator.size)
403 }
404 return size - c.Used()
405 }
406
407 func (c *MetaAllocator) EnableMetrics() {}
408
409
410 func (c *MetaAllocator) DryRun() Interface {
411 c.muTree.Lock()
412 defer c.muTree.Unlock()
413 isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
414 for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
415 return allocator.DryRun()
416 }
417 return &Allocator{}
418 }
419
420 func isReady(serviceCIDR *networkingv1alpha1.ServiceCIDR) bool {
421 if serviceCIDR == nil {
422 return false
423 }
424
425 for _, condition := range serviceCIDR.Status.Conditions {
426 if condition.Type == networkingv1alpha1.ServiceCIDRConditionReady {
427 return condition.Status == metav1.ConditionStatus(metav1.ConditionTrue)
428 }
429 }
430
431 return true
432 }
433
434
435
436 func ipToAddr(ip net.IP) netip.Addr {
437
438
439
440
441 bytes := ip.To4()
442 if bytes == nil {
443 bytes = ip.To16()
444 }
445
446 address, _ := netip.AddrFromSlice(bytes)
447 return address
448 }
449
450
451
452
453 func convertToV1IPFamily(ipFamily netutils.IPFamily) v1.IPFamily {
454 switch ipFamily {
455 case netutils.IPv4:
456 return v1.IPv4Protocol
457 case netutils.IPv6:
458 return v1.IPv6Protocol
459 }
460
461 return v1.IPFamilyUnknown
462 }
463
View as plain text