1
16
17 package pvprotection
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28 "k8s.io/apimachinery/pkg/util/wait"
29 coreinformers "k8s.io/client-go/informers/core/v1"
30 clientset "k8s.io/client-go/kubernetes"
31 corelisters "k8s.io/client-go/listers/core/v1"
32 "k8s.io/client-go/tools/cache"
33 "k8s.io/client-go/util/workqueue"
34 "k8s.io/klog/v2"
35 "k8s.io/kubernetes/pkg/controller/volume/protectionutil"
36 "k8s.io/kubernetes/pkg/util/slice"
37 volumeutil "k8s.io/kubernetes/pkg/volume/util"
38 )
39
40
41
42 type Controller struct {
43 client clientset.Interface
44
45 pvLister corelisters.PersistentVolumeLister
46 pvListerSynced cache.InformerSynced
47
48 queue workqueue.RateLimitingInterface
49 }
50
51
52 func NewPVProtectionController(logger klog.Logger, pvInformer coreinformers.PersistentVolumeInformer, cl clientset.Interface) *Controller {
53 e := &Controller{
54 client: cl,
55 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvprotection"),
56 }
57
58 e.pvLister = pvInformer.Lister()
59 e.pvListerSynced = pvInformer.Informer().HasSynced
60 pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
61 AddFunc: func(obj interface{}) {
62 e.pvAddedUpdated(logger, obj)
63 },
64 UpdateFunc: func(old, new interface{}) {
65 e.pvAddedUpdated(logger, new)
66 },
67 })
68
69 return e
70 }
71
72
73 func (c *Controller) Run(ctx context.Context, workers int) {
74 defer utilruntime.HandleCrash()
75 defer c.queue.ShutDown()
76
77 logger := klog.FromContext(ctx)
78 logger.Info("Starting PV protection controller")
79 defer logger.Info("Shutting down PV protection controller")
80
81 if !cache.WaitForNamedCacheSync("PV protection", ctx.Done(), c.pvListerSynced) {
82 return
83 }
84
85 for i := 0; i < workers; i++ {
86 go wait.UntilWithContext(ctx, c.runWorker, time.Second)
87 }
88
89 <-ctx.Done()
90 }
91
92 func (c *Controller) runWorker(ctx context.Context) {
93 for c.processNextWorkItem(ctx) {
94 }
95 }
96
97
98 func (c *Controller) processNextWorkItem(ctx context.Context) bool {
99 pvKey, quit := c.queue.Get()
100 if quit {
101 return false
102 }
103 defer c.queue.Done(pvKey)
104
105 pvName := pvKey.(string)
106
107 err := c.processPV(ctx, pvName)
108 if err == nil {
109 c.queue.Forget(pvKey)
110 return true
111 }
112
113 utilruntime.HandleError(fmt.Errorf("PV %v failed with : %v", pvKey, err))
114 c.queue.AddRateLimited(pvKey)
115
116 return true
117 }
118
119 func (c *Controller) processPV(ctx context.Context, pvName string) error {
120 logger := klog.FromContext(ctx)
121 logger.V(4).Info("Processing PV", "PV", klog.KRef("", pvName))
122 startTime := time.Now()
123 defer func() {
124 logger.V(4).Info("Finished processing PV", "PV", klog.KRef("", pvName), "cost", time.Since(startTime))
125 }()
126
127 pv, err := c.pvLister.Get(pvName)
128 if apierrors.IsNotFound(err) {
129 logger.V(4).Info("PV not found, ignoring", "PV", klog.KRef("", pvName))
130 return nil
131 }
132 if err != nil {
133 return err
134 }
135
136 if protectionutil.IsDeletionCandidate(pv, volumeutil.PVProtectionFinalizer) {
137
138
139 isUsed := c.isBeingUsed(pv)
140 if !isUsed {
141 return c.removeFinalizer(ctx, pv)
142 }
143 logger.V(4).Info("Keeping PV because it is being used", "PV", klog.KRef("", pvName))
144 }
145
146 if protectionutil.NeedToAddFinalizer(pv, volumeutil.PVProtectionFinalizer) {
147
148
149
150
151 return c.addFinalizer(ctx, pv)
152 }
153 return nil
154 }
155
156 func (c *Controller) addFinalizer(ctx context.Context, pv *v1.PersistentVolume) error {
157 pvClone := pv.DeepCopy()
158 pvClone.ObjectMeta.Finalizers = append(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer)
159 _, err := c.client.CoreV1().PersistentVolumes().Update(ctx, pvClone, metav1.UpdateOptions{})
160 logger := klog.FromContext(ctx)
161 if err != nil {
162 logger.V(3).Info("Error adding protection finalizer to PV", "PV", klog.KObj(pv), "err", err)
163 return err
164 }
165 logger.V(3).Info("Added protection finalizer to PV", "PV", klog.KObj(pv))
166 return nil
167 }
168
169 func (c *Controller) removeFinalizer(ctx context.Context, pv *v1.PersistentVolume) error {
170 pvClone := pv.DeepCopy()
171 pvClone.ObjectMeta.Finalizers = slice.RemoveString(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil)
172 _, err := c.client.CoreV1().PersistentVolumes().Update(ctx, pvClone, metav1.UpdateOptions{})
173 logger := klog.FromContext(ctx)
174 if err != nil {
175 logger.V(3).Info("Error removing protection finalizer from PV", "PV", klog.KObj(pv), "err", err)
176 return err
177 }
178 logger.V(3).Info("Removed protection finalizer from PV", "PV", klog.KObj(pv))
179 return nil
180 }
181
182 func (c *Controller) isBeingUsed(pv *v1.PersistentVolume) bool {
183
184
185 if pv.Status.Phase == v1.VolumeBound {
186
187 return true
188 }
189
190 return false
191 }
192
193
194 func (c *Controller) pvAddedUpdated(logger klog.Logger, obj interface{}) {
195 pv, ok := obj.(*v1.PersistentVolume)
196 if !ok {
197 utilruntime.HandleError(fmt.Errorf("PV informer returned non-PV object: %#v", obj))
198 return
199 }
200 logger.V(4).Info("Got event on PV", "PV", klog.KObj(pv))
201
202 if protectionutil.NeedToAddFinalizer(pv, volumeutil.PVProtectionFinalizer) || protectionutil.IsDeletionCandidate(pv, volumeutil.PVProtectionFinalizer) {
203 c.queue.Add(pv.Name)
204 }
205 }
206
View as plain text