1
16
17 package recyclerclient
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23
24 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/fields"
28 "k8s.io/apimachinery/pkg/watch"
29 clientset "k8s.io/client-go/kubernetes"
30 "k8s.io/klog/v2"
31 )
32
33
34 type RecycleEventRecorder func(eventtype, message string)
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, kubeClient clientset.Interface, recorder RecycleEventRecorder) error {
50 return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient, recorder))
51 }
52
53
54
55 func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, recyclerClient recyclerClient) error {
56 klog.V(5).Infof("creating recycler pod for volume %s\n", pod.Name)
57
58
59
60
61 pod.Name = "recycler-for-" + pvName
62 pod.GenerateName = ""
63
64 stopChannel := make(chan struct{})
65 defer close(stopChannel)
66 podCh, err := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel)
67 if err != nil {
68 klog.V(4).Infof("cannot start watcher for pod %s/%s: %v", pod.Namespace, pod.Name, err)
69 return err
70 }
71
72
73 _, err = recyclerClient.CreatePod(pod)
74 if err != nil {
75 if errors.IsAlreadyExists(err) {
76 deleteErr := recyclerClient.DeletePod(pod.Name, pod.Namespace)
77 if deleteErr != nil {
78 return fmt.Errorf("failed to delete old recycler pod %s/%s: %s", pod.Namespace, pod.Name, deleteErr)
79 }
80
81
82 return fmt.Errorf("old recycler pod found, will retry later")
83 }
84 return fmt.Errorf("unexpected error creating recycler pod: %+v", err)
85 }
86 err = waitForPod(pod, recyclerClient, podCh)
87
88
89 klog.V(2).Infof("deleting recycler pod %s/%s", pod.Namespace, pod.Name)
90 deleteErr := recyclerClient.DeletePod(pod.Name, pod.Namespace)
91 if deleteErr != nil {
92 klog.Errorf("failed to delete recycler pod %s/%s: %v", pod.Namespace, pod.Name, err)
93 }
94
95
96
97 if err != nil {
98 return fmt.Errorf("failed to recycle volume: %s", err)
99 }
100
101
102
103 if deleteErr != nil {
104 return fmt.Errorf("failed to delete recycler pod: %s", deleteErr)
105 }
106
107 return nil
108 }
109
110
111
112 func waitForPod(pod *v1.Pod, recyclerClient recyclerClient, podCh <-chan watch.Event) error {
113 for {
114 event, ok := <-podCh
115 if !ok {
116 return fmt.Errorf("recycler pod %q watch channel had been closed", pod.Name)
117 }
118 switch event.Object.(type) {
119 case *v1.Pod:
120
121 pod := event.Object.(*v1.Pod)
122 klog.V(4).Infof("recycler pod update received: %s %s/%s %s", event.Type, pod.Namespace, pod.Name, pod.Status.Phase)
123 switch event.Type {
124 case watch.Added, watch.Modified:
125 if pod.Status.Phase == v1.PodSucceeded {
126
127 return nil
128 }
129 if pod.Status.Phase == v1.PodFailed {
130 if pod.Status.Message != "" {
131 return fmt.Errorf(pod.Status.Message)
132 }
133 return fmt.Errorf("pod failed, pod.Status.Message unknown")
134 }
135
136 case watch.Deleted:
137 return fmt.Errorf("recycler pod was deleted")
138
139 case watch.Error:
140 return fmt.Errorf("recycler pod watcher failed")
141 }
142
143 case *v1.Event:
144
145 podEvent := event.Object.(*v1.Event)
146 klog.V(4).Infof("recycler event received: %s %s/%s %s/%s %s", event.Type, podEvent.Namespace, podEvent.Name, podEvent.InvolvedObject.Namespace, podEvent.InvolvedObject.Name, podEvent.Message)
147 if event.Type == watch.Added {
148 recyclerClient.Event(podEvent.Type, podEvent.Message)
149 }
150 }
151 }
152 }
153
154
155
156 type recyclerClient interface {
157 CreatePod(pod *v1.Pod) (*v1.Pod, error)
158 GetPod(name, namespace string) (*v1.Pod, error)
159 DeletePod(name, namespace string) error
160
161
162
163 WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error)
164
165 Event(eventtype, message string)
166 }
167
168 func newRecyclerClient(client clientset.Interface, recorder RecycleEventRecorder) recyclerClient {
169 return &realRecyclerClient{
170 client,
171 recorder,
172 }
173 }
174
175 type realRecyclerClient struct {
176 client clientset.Interface
177 recorder RecycleEventRecorder
178 }
179
180 func (c *realRecyclerClient) CreatePod(pod *v1.Pod) (*v1.Pod, error) {
181 return c.client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
182 }
183
184 func (c *realRecyclerClient) GetPod(name, namespace string) (*v1.Pod, error) {
185 return c.client.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
186 }
187
188 func (c *realRecyclerClient) DeletePod(name, namespace string) error {
189 return c.client.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
190 }
191
192 func (c *realRecyclerClient) Event(eventtype, message string) {
193 c.recorder(eventtype, message)
194 }
195
196
197
198 func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) {
199 podSelector, err := fields.ParseSelector("metadata.name=" + name)
200 if err != nil {
201 return nil, err
202 }
203 options := metav1.ListOptions{
204 FieldSelector: podSelector.String(),
205 Watch: true,
206 }
207
208 podWatch, err := c.client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
209 if err != nil {
210 return nil, err
211 }
212
213 eventSelector, _ := fields.ParseSelector("involvedObject.name=" + name)
214 eventWatch, err := c.client.CoreV1().Events(namespace).Watch(context.TODO(), metav1.ListOptions{
215 FieldSelector: eventSelector.String(),
216 Watch: true,
217 })
218 if err != nil {
219 podWatch.Stop()
220 return nil, err
221 }
222
223 eventCh := make(chan watch.Event, 30)
224 var wg sync.WaitGroup
225 wg.Add(2)
226
227 go func() {
228 defer close(eventCh)
229 wg.Wait()
230 }()
231
232 go func() {
233 defer eventWatch.Stop()
234 defer wg.Done()
235 for {
236 select {
237 case <-stopChannel:
238 return
239 case eventEvent, ok := <-eventWatch.ResultChan():
240 if !ok {
241 return
242 }
243 eventCh <- eventEvent
244 }
245 }
246 }()
247
248 go func() {
249 defer podWatch.Stop()
250 defer wg.Done()
251 for {
252 select {
253 case <-stopChannel:
254 return
255
256 case podEvent, ok := <-podWatch.ResultChan():
257 if !ok {
258 return
259 }
260 eventCh <- podEvent
261 }
262 }
263 }()
264
265 return eventCh, nil
266 }
267
View as plain text