1
16
17 package ephemeral
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "k8s.io/klog/v2"
25
26 v1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/util/runtime"
30 "k8s.io/apimachinery/pkg/util/wait"
31 coreinformers "k8s.io/client-go/informers/core/v1"
32 clientset "k8s.io/client-go/kubernetes"
33 "k8s.io/client-go/kubernetes/scheme"
34 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
35 corelisters "k8s.io/client-go/listers/core/v1"
36 "k8s.io/client-go/tools/cache"
37 "k8s.io/client-go/tools/record"
38 "k8s.io/client-go/util/workqueue"
39 "k8s.io/component-helpers/storage/ephemeral"
40 "k8s.io/kubernetes/pkg/controller/volume/common"
41 ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics"
42 "k8s.io/kubernetes/pkg/controller/volume/events"
43 )
44
45
46 type Controller interface {
47 Run(ctx context.Context, workers int)
48 }
49
50 type ephemeralController struct {
51
52
53 kubeClient clientset.Interface
54
55
56
57
58 pvcLister corelisters.PersistentVolumeClaimLister
59 pvcsSynced cache.InformerSynced
60
61
62
63
64 podLister corelisters.PodLister
65 podSynced cache.InformerSynced
66
67
68
69 podIndexer cache.Indexer
70
71
72 recorder record.EventRecorder
73
74 queue workqueue.RateLimitingInterface
75 }
76
77
78 func NewController(
79 ctx context.Context,
80 kubeClient clientset.Interface,
81 podInformer coreinformers.PodInformer,
82 pvcInformer coreinformers.PersistentVolumeClaimInformer) (Controller, error) {
83
84 ec := &ephemeralController{
85 kubeClient: kubeClient,
86 podLister: podInformer.Lister(),
87 podIndexer: podInformer.Informer().GetIndexer(),
88 podSynced: podInformer.Informer().HasSynced,
89 pvcLister: pvcInformer.Lister(),
90 pvcsSynced: pvcInformer.Informer().HasSynced,
91 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ephemeral_volume"),
92 }
93
94 ephemeralvolumemetrics.RegisterMetrics()
95
96 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
97 eventBroadcaster.StartLogging(klog.Infof)
98 eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
99 ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ephemeral_volume"})
100
101 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
102 AddFunc: ec.enqueuePod,
103
104
105
106
107
108 })
109 pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
110 DeleteFunc: ec.onPVCDelete,
111 })
112 if err := common.AddPodPVCIndexerIfNotPresent(ec.podIndexer); err != nil {
113 return nil, fmt.Errorf("could not initialize ephemeral volume controller: %w", err)
114 }
115
116 return ec, nil
117 }
118
119 func (ec *ephemeralController) enqueuePod(obj interface{}) {
120 pod, ok := obj.(*v1.Pod)
121 if !ok {
122 return
123 }
124
125
126 if pod.DeletionTimestamp != nil {
127 return
128 }
129
130 for _, vol := range pod.Spec.Volumes {
131 if vol.Ephemeral != nil {
132
133 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod)
134 if err != nil {
135 runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pod, err))
136 return
137 }
138 ec.queue.Add(key)
139 break
140 }
141 }
142 }
143
144 func (ec *ephemeralController) onPVCDelete(obj interface{}) {
145 pvc, ok := obj.(*v1.PersistentVolumeClaim)
146 if !ok {
147 return
148 }
149
150
151
152
153
154
155
156 objs, err := ec.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name))
157 if err != nil {
158 runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err))
159 return
160 }
161 for _, obj := range objs {
162 ec.enqueuePod(obj)
163 }
164 }
165
166 func (ec *ephemeralController) Run(ctx context.Context, workers int) {
167 defer runtime.HandleCrash()
168 defer ec.queue.ShutDown()
169 logger := klog.FromContext(ctx)
170 logger.Info("Starting ephemeral volume controller")
171 defer logger.Info("Shutting down ephemeral volume controller")
172
173 if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.pvcsSynced) {
174 return
175 }
176
177 for i := 0; i < workers; i++ {
178 go wait.UntilWithContext(ctx, ec.runWorker, time.Second)
179 }
180
181 <-ctx.Done()
182 }
183
184 func (ec *ephemeralController) runWorker(ctx context.Context) {
185 for ec.processNextWorkItem(ctx) {
186 }
187 }
188
189 func (ec *ephemeralController) processNextWorkItem(ctx context.Context) bool {
190 key, shutdown := ec.queue.Get()
191 if shutdown {
192 return false
193 }
194 defer ec.queue.Done(key)
195
196 err := ec.syncHandler(ctx, key.(string))
197 if err == nil {
198 ec.queue.Forget(key)
199 return true
200 }
201
202 runtime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
203 ec.queue.AddRateLimited(key)
204
205 return true
206 }
207
208
209
210 func (ec *ephemeralController) syncHandler(ctx context.Context, key string) error {
211 namespace, name, err := cache.SplitMetaNamespaceKey(key)
212 if err != nil {
213 return err
214 }
215 pod, err := ec.podLister.Pods(namespace).Get(name)
216 logger := klog.FromContext(ctx)
217 if err != nil {
218 if errors.IsNotFound(err) {
219 logger.V(5).Info("Ephemeral: nothing to do for pod, it is gone", "podKey", key)
220 return nil
221 }
222 logger.V(5).Info("Error getting pod from informer", "pod", klog.KObj(pod), "podUID", pod.UID, "err", err)
223 return err
224 }
225
226
227 if pod.DeletionTimestamp != nil {
228 logger.V(5).Info("Ephemeral: nothing to do for pod, it is marked for deletion", "podKey", key)
229 return nil
230 }
231
232 for _, vol := range pod.Spec.Volumes {
233 if err := ec.handleVolume(ctx, pod, vol); err != nil {
234 ec.recorder.Event(pod, v1.EventTypeWarning, events.FailedBinding, fmt.Sprintf("ephemeral volume %s: %v", vol.Name, err))
235 return fmt.Errorf("pod %s, ephemeral volume %s: %v", key, vol.Name, err)
236 }
237 }
238
239 return nil
240 }
241
242
243 func (ec *ephemeralController) handleVolume(ctx context.Context, pod *v1.Pod, vol v1.Volume) error {
244 logger := klog.FromContext(ctx)
245 logger.V(5).Info("Ephemeral: checking volume", "volumeName", vol.Name)
246 if vol.Ephemeral == nil {
247 return nil
248 }
249
250 pvcName := ephemeral.VolumeClaimName(pod, &vol)
251 pvc, err := ec.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
252 if err != nil && !errors.IsNotFound(err) {
253 return err
254 }
255 if pvc != nil {
256 if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
257 return err
258 }
259
260 logger.V(5).Info("Ephemeral: PVC already created", "volumeName", vol.Name, "PVC", klog.KObj(pvc))
261 return nil
262 }
263
264
265 isTrue := true
266 pvc = &v1.PersistentVolumeClaim{
267 ObjectMeta: metav1.ObjectMeta{
268 Name: pvcName,
269 OwnerReferences: []metav1.OwnerReference{
270 {
271 APIVersion: "v1",
272 Kind: "Pod",
273 Name: pod.Name,
274 UID: pod.UID,
275 Controller: &isTrue,
276 BlockOwnerDeletion: &isTrue,
277 },
278 },
279 Annotations: vol.Ephemeral.VolumeClaimTemplate.Annotations,
280 Labels: vol.Ephemeral.VolumeClaimTemplate.Labels,
281 },
282 Spec: vol.Ephemeral.VolumeClaimTemplate.Spec,
283 }
284 ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Inc()
285 _, err = ec.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
286 if err != nil {
287 ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Inc()
288 return fmt.Errorf("create PVC %s: %v", pvcName, err)
289 }
290 return nil
291 }
292
View as plain text