...
1
16
17 package apiserverleasegc
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 v1 "k8s.io/api/coordination/v1"
25 "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/labels"
28 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29 "k8s.io/apimachinery/pkg/util/wait"
30 informers "k8s.io/client-go/informers/coordination/v1"
31 "k8s.io/client-go/kubernetes"
32 listers "k8s.io/client-go/listers/coordination/v1"
33 "k8s.io/client-go/tools/cache"
34
35 "k8s.io/klog/v2"
36 )
37
38
39 type Controller struct {
40 kubeclientset kubernetes.Interface
41
42 leaseLister listers.LeaseLister
43 leaseInformer cache.SharedIndexInformer
44 leasesSynced cache.InformerSynced
45
46 leaseNamespace string
47
48 gcCheckPeriod time.Duration
49 }
50
51
52 func NewAPIServerLeaseGC(clientset kubernetes.Interface, gcCheckPeriod time.Duration, leaseNamespace, leaseLabelSelector string) *Controller {
53
54
55 leaseInformer := informers.NewFilteredLeaseInformer(
56 clientset,
57 leaseNamespace,
58 0,
59 cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
60 func(listOptions *metav1.ListOptions) {
61 listOptions.LabelSelector = leaseLabelSelector
62 })
63 return &Controller{
64 kubeclientset: clientset,
65 leaseLister: listers.NewLeaseLister(leaseInformer.GetIndexer()),
66 leaseInformer: leaseInformer,
67 leasesSynced: leaseInformer.HasSynced,
68 leaseNamespace: leaseNamespace,
69 gcCheckPeriod: gcCheckPeriod,
70 }
71 }
72
73
74 func (c *Controller) Run(stopCh <-chan struct{}) {
75 defer utilruntime.HandleCrash()
76 defer klog.Infof("Shutting down apiserver lease garbage collector")
77
78 klog.Infof("Starting apiserver lease garbage collector")
79
80
81 go c.leaseInformer.Run(stopCh)
82
83 if !cache.WaitForCacheSync(stopCh, c.leasesSynced) {
84 utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
85 return
86 }
87
88 go wait.Until(c.gc, c.gcCheckPeriod, stopCh)
89
90 <-stopCh
91 }
92
93 func (c *Controller) gc() {
94 leases, err := c.leaseLister.Leases(c.leaseNamespace).List(labels.Everything())
95 if err != nil {
96 klog.ErrorS(err, "Error while listing apiserver leases")
97 return
98 }
99 for _, lease := range leases {
100
101 if !isLeaseExpired(lease) {
102 continue
103 }
104
105 lease, err := c.kubeclientset.CoordinationV1().Leases(c.leaseNamespace).Get(context.TODO(), lease.Name, metav1.GetOptions{})
106 if err != nil && !errors.IsNotFound(err) {
107 klog.ErrorS(err, "Error getting lease")
108 continue
109 }
110 if errors.IsNotFound(err) || lease == nil {
111
112
113
114 klog.V(4).InfoS("Cannot find apiserver lease", "err", err)
115 continue
116 }
117
118 if !isLeaseExpired(lease) {
119 continue
120 }
121 if err := c.kubeclientset.CoordinationV1().Leases(c.leaseNamespace).Delete(
122 context.TODO(), lease.Name, metav1.DeleteOptions{}); err != nil {
123 if errors.IsNotFound(err) {
124
125
126
127 klog.V(4).InfoS("Apiserver lease is gone already", "err", err)
128 } else {
129 klog.ErrorS(err, "Error deleting lease")
130 }
131 }
132 }
133 }
134
135 func isLeaseExpired(lease *v1.Lease) bool {
136 currentTime := time.Now()
137
138
139
140 return lease.Spec.RenewTime == nil ||
141 lease.Spec.LeaseDurationSeconds == nil ||
142 lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second).Before(currentTime)
143 }
144
View as plain text