1
16
17
18
19
20
21
22
23
24
25
26
27 package ttl
28
29 import (
30 "context"
31 "fmt"
32 "math"
33 "strconv"
34 "sync"
35 "time"
36
37 v1 "k8s.io/api/core/v1"
38 apierrors "k8s.io/apimachinery/pkg/api/errors"
39 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
40 "k8s.io/apimachinery/pkg/types"
41 "k8s.io/apimachinery/pkg/util/json"
42 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
43 "k8s.io/apimachinery/pkg/util/strategicpatch"
44 "k8s.io/apimachinery/pkg/util/wait"
45 informers "k8s.io/client-go/informers/core/v1"
46 clientset "k8s.io/client-go/kubernetes"
47 listers "k8s.io/client-go/listers/core/v1"
48 "k8s.io/client-go/tools/cache"
49 "k8s.io/client-go/util/workqueue"
50 "k8s.io/kubernetes/pkg/controller"
51
52 "k8s.io/klog/v2"
53 )
54
55
56 type Controller struct {
57 kubeClient clientset.Interface
58
59
60 nodeStore listers.NodeLister
61
62
63 queue workqueue.RateLimitingInterface
64
65
66 hasSynced func() bool
67
68 lock sync.RWMutex
69
70
71 nodeCount int
72
73
74 desiredTTLSeconds int
75
76
77 boundaryStep int
78 }
79
80
81 func NewTTLController(ctx context.Context, nodeInformer informers.NodeInformer, kubeClient clientset.Interface) *Controller {
82 ttlc := &Controller{
83 kubeClient: kubeClient,
84 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttlcontroller"),
85 }
86 logger := klog.FromContext(ctx)
87 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
88 AddFunc: func(obj interface{}) {
89 ttlc.addNode(logger, obj)
90 },
91 UpdateFunc: func(old, newObj interface{}) {
92 ttlc.updateNode(logger, old, newObj)
93 },
94 DeleteFunc: ttlc.deleteNode,
95 })
96
97 ttlc.nodeStore = listers.NewNodeLister(nodeInformer.Informer().GetIndexer())
98 ttlc.hasSynced = nodeInformer.Informer().HasSynced
99
100 return ttlc
101 }
102
103 type ttlBoundary struct {
104 sizeMin int
105 sizeMax int
106 ttlSeconds int
107 }
108
109 var (
110 ttlBoundaries = []ttlBoundary{
111 {sizeMin: 0, sizeMax: 100, ttlSeconds: 0},
112 {sizeMin: 90, sizeMax: 500, ttlSeconds: 15},
113 {sizeMin: 450, sizeMax: 1000, ttlSeconds: 30},
114 {sizeMin: 900, sizeMax: 2000, ttlSeconds: 60},
115 {sizeMin: 1800, sizeMax: math.MaxInt32, ttlSeconds: 300},
116 }
117 )
118
119
120 func (ttlc *Controller) Run(ctx context.Context, workers int) {
121 defer utilruntime.HandleCrash()
122 defer ttlc.queue.ShutDown()
123 logger := klog.FromContext(ctx)
124 logger.Info("Starting TTL controller")
125 defer logger.Info("Shutting down TTL controller")
126
127 if !cache.WaitForNamedCacheSync("TTL", ctx.Done(), ttlc.hasSynced) {
128 return
129 }
130
131 for i := 0; i < workers; i++ {
132 go wait.UntilWithContext(ctx, ttlc.worker, time.Second)
133 }
134
135 <-ctx.Done()
136 }
137
138 func (ttlc *Controller) addNode(logger klog.Logger, obj interface{}) {
139 node, ok := obj.(*v1.Node)
140 if !ok {
141 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
142 return
143 }
144
145 func() {
146 ttlc.lock.Lock()
147 defer ttlc.lock.Unlock()
148 ttlc.nodeCount++
149 if ttlc.nodeCount > ttlBoundaries[ttlc.boundaryStep].sizeMax {
150 ttlc.boundaryStep++
151 ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
152 }
153 }()
154 ttlc.enqueueNode(logger, node)
155 }
156
157 func (ttlc *Controller) updateNode(logger klog.Logger, _, newObj interface{}) {
158 node, ok := newObj.(*v1.Node)
159 if !ok {
160 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
161 return
162 }
163
164
165
166
167
168 ttlc.enqueueNode(logger, node)
169 }
170
171 func (ttlc *Controller) deleteNode(obj interface{}) {
172 _, ok := obj.(*v1.Node)
173 if !ok {
174 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
175 if !ok {
176 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
177 return
178 }
179 _, ok = tombstone.Obj.(*v1.Node)
180 if !ok {
181 utilruntime.HandleError(fmt.Errorf("unexpected object types: %v", obj))
182 return
183 }
184 }
185
186 func() {
187 ttlc.lock.Lock()
188 defer ttlc.lock.Unlock()
189 ttlc.nodeCount--
190 if ttlc.nodeCount < ttlBoundaries[ttlc.boundaryStep].sizeMin {
191 ttlc.boundaryStep--
192 ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
193 }
194 }()
195
196 }
197
198 func (ttlc *Controller) enqueueNode(logger klog.Logger, node *v1.Node) {
199 key, err := controller.KeyFunc(node)
200 if err != nil {
201 logger.Error(nil, "Couldn't get key for object", "object", klog.KObj(node))
202 return
203 }
204 ttlc.queue.Add(key)
205 }
206
207 func (ttlc *Controller) worker(ctx context.Context) {
208 for ttlc.processItem(ctx) {
209 }
210 }
211
212 func (ttlc *Controller) processItem(ctx context.Context) bool {
213 key, quit := ttlc.queue.Get()
214 if quit {
215 return false
216 }
217 defer ttlc.queue.Done(key)
218
219 err := ttlc.updateNodeIfNeeded(ctx, key.(string))
220 if err == nil {
221 ttlc.queue.Forget(key)
222 return true
223 }
224
225 ttlc.queue.AddRateLimited(key)
226 utilruntime.HandleError(err)
227 return true
228 }
229
230 func (ttlc *Controller) getDesiredTTLSeconds() int {
231 ttlc.lock.RLock()
232 defer ttlc.lock.RUnlock()
233 return ttlc.desiredTTLSeconds
234 }
235
236 func getIntFromAnnotation(ctx context.Context, node *v1.Node, annotationKey string) (int, bool) {
237 if node.Annotations == nil {
238 return 0, false
239 }
240 annotationValue, ok := node.Annotations[annotationKey]
241 if !ok {
242 return 0, false
243 }
244 intValue, err := strconv.Atoi(annotationValue)
245 if err != nil {
246 logger := klog.FromContext(ctx)
247 logger.Info("Could not convert the value with annotation key for the node", "annotationValue",
248 annotationValue, "annotationKey", annotationKey, "node", klog.KObj(node))
249 return 0, false
250 }
251 return intValue, true
252 }
253
254 func setIntAnnotation(node *v1.Node, annotationKey string, value int) {
255 if node.Annotations == nil {
256 node.Annotations = make(map[string]string)
257 }
258 node.Annotations[annotationKey] = strconv.Itoa(value)
259 }
260
261 func (ttlc *Controller) patchNodeWithAnnotation(ctx context.Context, node *v1.Node, annotationKey string, value int) error {
262 oldData, err := json.Marshal(node)
263 if err != nil {
264 return err
265 }
266 setIntAnnotation(node, annotationKey, value)
267 newData, err := json.Marshal(node)
268 if err != nil {
269 return err
270 }
271 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
272 if err != nil {
273 return err
274 }
275 _, err = ttlc.kubeClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
276 logger := klog.FromContext(ctx)
277 if err != nil {
278 logger.V(2).Info("Failed to change ttl annotation for node", "node", klog.KObj(node), "err", err)
279 return err
280 }
281 logger.V(2).Info("Changed ttl annotation", "node", klog.KObj(node), "TTL", time.Duration(value)*time.Second)
282 return nil
283 }
284
285 func (ttlc *Controller) updateNodeIfNeeded(ctx context.Context, key string) error {
286 node, err := ttlc.nodeStore.Get(key)
287 if err != nil {
288 if apierrors.IsNotFound(err) {
289 return nil
290 }
291 return err
292 }
293
294 desiredTTL := ttlc.getDesiredTTLSeconds()
295 currentTTL, ok := getIntFromAnnotation(ctx, node, v1.ObjectTTLAnnotationKey)
296 if ok && currentTTL == desiredTTL {
297 return nil
298 }
299
300 return ttlc.patchNodeWithAnnotation(ctx, node.DeepCopy(), v1.ObjectTTLAnnotationKey, desiredTTL)
301 }
302
View as plain text