1
2
3
4
19
20 package ipam
21
22 import (
23 "context"
24 "fmt"
25 "net"
26 "sync"
27 "time"
28
29 "k8s.io/klog/v2"
30 netutils "k8s.io/utils/net"
31
32 v1 "k8s.io/api/core/v1"
33 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
34 informers "k8s.io/client-go/informers/core/v1"
35 clientset "k8s.io/client-go/kubernetes"
36 "k8s.io/client-go/tools/cache"
37 cloudprovider "k8s.io/cloud-provider"
38 "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
39 nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync"
40 controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
41 "k8s.io/legacy-cloud-providers/gce"
42 )
43
44
45 type Config struct {
46
47 Resync time.Duration
48
49 MaxBackoff time.Duration
50
51 InitialRetry time.Duration
52
53 Mode nodesync.NodeSyncMode
54 }
55
56
57
58 type Controller struct {
59 config *Config
60 adapter *adapter
61
62 lock sync.Mutex
63 syncers map[string]*nodesync.NodeSync
64
65 set *cidrset.CidrSet
66 }
67
68
69 func NewController(
70 ctx context.Context,
71 config *Config,
72 kubeClient clientset.Interface,
73 cloud cloudprovider.Interface,
74 clusterCIDR, serviceCIDR *net.IPNet,
75 nodeCIDRMaskSize int) (*Controller, error) {
76
77 if !nodesync.IsValidMode(config.Mode) {
78 return nil, fmt.Errorf("invalid IPAM controller mode %q", config.Mode)
79 }
80
81 gceCloud, ok := cloud.(*gce.Cloud)
82 if !ok {
83 return nil, fmt.Errorf("cloud IPAM controller does not support %q provider", cloud.ProviderName())
84 }
85
86 set, err := cidrset.NewCIDRSet(clusterCIDR, nodeCIDRMaskSize)
87 if err != nil {
88 return nil, err
89 }
90
91 c := &Controller{
92 config: config,
93 adapter: newAdapter(ctx, kubeClient, gceCloud),
94 syncers: make(map[string]*nodesync.NodeSync),
95 set: set,
96 }
97
98 if err := occupyServiceCIDR(c.set, clusterCIDR, serviceCIDR); err != nil {
99 return nil, err
100 }
101
102
103 cidr, err := c.set.AllocateNext()
104 switch err {
105 case cidrset.ErrCIDRRangeNoCIDRsRemaining:
106 return nil, fmt.Errorf("failed after occupy serviceCIDR: %v", err)
107 case nil:
108 err := c.set.Release(cidr)
109 return c, err
110 default:
111 return nil, fmt.Errorf("unexpected error when check remaining CIDR range: %v", err)
112 }
113 }
114
115
116
117
118 func (c *Controller) Start(logger klog.Logger, nodeInformer informers.NodeInformer) error {
119 logger.Info("Starting IPAM controller", "config", c.config)
120
121 nodes, err := listNodes(logger, c.adapter.k8s)
122 if err != nil {
123 return err
124 }
125 for _, node := range nodes.Items {
126 if node.Spec.PodCIDR != "" {
127 _, cidrRange, err := netutils.ParseCIDRSloppy(node.Spec.PodCIDR)
128 if err == nil {
129 c.set.Occupy(cidrRange)
130 logger.V(3).Info("Occupying CIDR for node", "CIDR", node.Spec.PodCIDR, "node", klog.KObj(&node))
131 } else {
132 logger.Error(err, "Node has an invalid CIDR", "node", klog.KObj(&node), "CIDR", node.Spec.PodCIDR)
133 }
134 }
135
136 func() {
137 c.lock.Lock()
138 defer c.lock.Unlock()
139
140
141 syncer := c.newSyncer(node.Name)
142 c.syncers[node.Name] = syncer
143 go syncer.Loop(logger, nil)
144 }()
145 }
146
147 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
148 AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
149 return c.onAdd(logger, node)
150 }),
151 UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
152 return c.onUpdate(logger, newNode)
153 }),
154 DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
155 return c.onDelete(logger, node)
156 }),
157 })
158
159 return nil
160 }
161
162 func (c *Controller) Run(ctx context.Context) {
163 defer utilruntime.HandleCrash()
164
165 go c.adapter.Run(ctx)
166 <-ctx.Done()
167 }
168
169 type nodeState struct {
170 t Timeout
171 }
172
173 func (ns *nodeState) ReportResult(err error) {
174 ns.t.Update(err == nil)
175 }
176
177 func (ns *nodeState) ResyncTimeout() time.Duration {
178 return ns.t.Next()
179 }
180
181 func (c *Controller) newSyncer(name string) *nodesync.NodeSync {
182 ns := &nodeState{
183 Timeout{
184 Resync: c.config.Resync,
185 MaxBackoff: c.config.MaxBackoff,
186 InitialRetry: c.config.InitialRetry,
187 },
188 }
189 return nodesync.New(ns, c.adapter, c.adapter, c.config.Mode, name, c.set)
190 }
191
192 func (c *Controller) onAdd(logger klog.Logger, node *v1.Node) error {
193 c.lock.Lock()
194 defer c.lock.Unlock()
195
196 syncer, ok := c.syncers[node.Name]
197 if !ok {
198 syncer = c.newSyncer(node.Name)
199 c.syncers[node.Name] = syncer
200 go syncer.Loop(logger, nil)
201 } else {
202 logger.Info("Add for node that already exists", "node", klog.KObj(node))
203 }
204 syncer.Update(node)
205
206 return nil
207 }
208
209 func (c *Controller) onUpdate(logger klog.Logger, node *v1.Node) error {
210 c.lock.Lock()
211 defer c.lock.Unlock()
212
213 if sync, ok := c.syncers[node.Name]; ok {
214 sync.Update(node)
215 } else {
216 logger.Error(nil, "Received update for non-existent node", "node", klog.KObj(node))
217 return fmt.Errorf("unknown node %q", node.Name)
218 }
219
220 return nil
221 }
222
223 func (c *Controller) onDelete(logger klog.Logger, node *v1.Node) error {
224 c.lock.Lock()
225 defer c.lock.Unlock()
226
227 if syncer, ok := c.syncers[node.Name]; ok {
228 syncer.Delete(node)
229 delete(c.syncers, node.Name)
230 } else {
231 logger.Info("Node was already deleted", "node", klog.KObj(node))
232 }
233
234 return nil
235 }
236
View as plain text