1
16
17 package storageversionmigrator
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "k8s.io/apimachinery/pkg/api/meta"
25 "k8s.io/apimachinery/pkg/runtime/schema"
26 "k8s.io/apimachinery/pkg/util/wait"
27 "k8s.io/client-go/discovery"
28 "k8s.io/client-go/metadata"
29 "k8s.io/client-go/tools/cache"
30 "k8s.io/client-go/util/workqueue"
31 "k8s.io/klog/v2"
32 "k8s.io/kubernetes/pkg/controller"
33
34 svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
35 apierrors "k8s.io/apimachinery/pkg/api/errors"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
38 svminformers "k8s.io/client-go/informers/storagemigration/v1alpha1"
39 clientset "k8s.io/client-go/kubernetes"
40 svmlisters "k8s.io/client-go/listers/storagemigration/v1alpha1"
41 )
42
43 const (
44
45 fakeSVMNamespaceName string = "@fake:svm_ns!"
46 ResourceVersionControllerName string = "resource-version-controller"
47 )
48
49
50
51
52 type ResourceVersionController struct {
53 discoveryClient *discovery.DiscoveryClient
54 metadataClient metadata.Interface
55 svmListers svmlisters.StorageVersionMigrationLister
56 svmSynced cache.InformerSynced
57 queue workqueue.RateLimitingInterface
58 kubeClient clientset.Interface
59 mapper meta.ResettableRESTMapper
60 }
61
62 func NewResourceVersionController(
63 ctx context.Context,
64 kubeClient clientset.Interface,
65 discoveryClient *discovery.DiscoveryClient,
66 metadataClient metadata.Interface,
67 svmInformer svminformers.StorageVersionMigrationInformer,
68 mapper meta.ResettableRESTMapper,
69 ) *ResourceVersionController {
70 logger := klog.FromContext(ctx)
71
72 rvController := &ResourceVersionController{
73 kubeClient: kubeClient,
74 discoveryClient: discoveryClient,
75 metadataClient: metadataClient,
76 svmListers: svmInformer.Lister(),
77 svmSynced: svmInformer.Informer().HasSynced,
78 mapper: mapper,
79 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ResourceVersionControllerName),
80 }
81
82 _, _ = svmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
83 AddFunc: func(obj interface{}) {
84 rvController.addSVM(logger, obj)
85 },
86 UpdateFunc: func(oldObj, newObj interface{}) {
87 rvController.updateSVM(logger, oldObj, newObj)
88 },
89 })
90
91 return rvController
92 }
93
94 func (rv *ResourceVersionController) addSVM(logger klog.Logger, obj interface{}) {
95 svm := obj.(*svmv1alpha1.StorageVersionMigration)
96 logger.V(4).Info("Adding", "svm", klog.KObj(svm))
97 rv.enqueue(svm)
98 }
99
100 func (rv *ResourceVersionController) updateSVM(logger klog.Logger, oldObj, newObj interface{}) {
101 oldSVM := oldObj.(*svmv1alpha1.StorageVersionMigration)
102 newSVM := newObj.(*svmv1alpha1.StorageVersionMigration)
103 logger.V(4).Info("Updating", "svm", klog.KObj(oldSVM))
104 rv.enqueue(newSVM)
105 }
106
107 func (rv *ResourceVersionController) enqueue(svm *svmv1alpha1.StorageVersionMigration) {
108 key, err := controller.KeyFunc(svm)
109 if err != nil {
110 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %w", svm, err))
111 return
112 }
113
114 rv.queue.Add(key)
115 }
116
117 func (rv *ResourceVersionController) Run(ctx context.Context) {
118 defer utilruntime.HandleCrash()
119 defer rv.queue.ShutDown()
120
121 logger := klog.FromContext(ctx)
122 logger.Info("Starting", "controller", ResourceVersionControllerName)
123 defer logger.Info("Shutting down", "controller", ResourceVersionControllerName)
124
125 if !cache.WaitForNamedCacheSync(ResourceVersionControllerName, ctx.Done(), rv.svmSynced) {
126 return
127 }
128
129 go wait.UntilWithContext(ctx, rv.worker, time.Second)
130
131 <-ctx.Done()
132 }
133
134 func (rv *ResourceVersionController) worker(ctx context.Context) {
135 for rv.processNext(ctx) {
136 }
137 }
138
139 func (rv *ResourceVersionController) processNext(ctx context.Context) bool {
140 eKey, quit := rv.queue.Get()
141 if quit {
142 return false
143 }
144 defer rv.queue.Done(eKey)
145
146 key := eKey.(string)
147 err := rv.sync(ctx, key)
148 if err == nil {
149 rv.queue.Forget(key)
150 return true
151 }
152
153 klog.FromContext(ctx).V(2).Info("Error syncing SVM resource, retrying", "svm", key, "err", err)
154 rv.queue.AddRateLimited(key)
155
156 return true
157 }
158
159 func (rv *ResourceVersionController) sync(ctx context.Context, key string) error {
160 logger := klog.FromContext(ctx)
161 startTime := time.Now()
162
163
164 _, name, err := cache.SplitMetaNamespaceKey(key)
165 if err != nil {
166 return err
167 }
168
169 svm, err := rv.svmListers.Get(name)
170 if apierrors.IsNotFound(err) {
171
172 return nil
173 }
174 if err != nil {
175 return err
176 }
177
178 toBeProcessedSVM := svm.DeepCopy()
179 gvr := getGVRFromResource(toBeProcessedSVM)
180
181 if IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded) || IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationFailed) {
182 logger.V(4).Info("Migration has already succeeded or failed previously, skipping", "svm", name)
183 return nil
184 }
185
186 if len(toBeProcessedSVM.Status.ResourceVersion) != 0 {
187 logger.V(4).Info("Resource version is already set", "svm", name)
188 return nil
189 }
190
191 exists, err := rv.resourceExists(gvr)
192 if err != nil {
193 return err
194 }
195 if !exists {
196 _, err = rv.kubeClient.StoragemigrationV1alpha1().
197 StorageVersionMigrations().
198 UpdateStatus(
199 ctx,
200 setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
201 metav1.UpdateOptions{},
202 )
203 if err != nil {
204 return err
205 }
206
207 return nil
208 }
209
210 toBeProcessedSVM.Status.ResourceVersion, err = rv.getLatestResourceVersion(gvr, ctx)
211 if err != nil {
212 return err
213 }
214
215 _, err = rv.kubeClient.StoragemigrationV1alpha1().
216 StorageVersionMigrations().
217 UpdateStatus(ctx, toBeProcessedSVM, metav1.UpdateOptions{})
218 if err != nil {
219 return fmt.Errorf("error updating status for %s: %w", toBeProcessedSVM.Name, err)
220 }
221
222 logger.V(4).Info("Resource version has been successfully added", "svm", key, "elapsed", time.Since(startTime))
223 return nil
224 }
225
226 func (rv *ResourceVersionController) getLatestResourceVersion(gvr schema.GroupVersionResource, ctx context.Context) (string, error) {
227 isResourceNamespaceScoped, err := rv.isResourceNamespaceScoped(gvr)
228 if err != nil {
229 return "", err
230 }
231
232 var randomList *metav1.PartialObjectMetadataList
233 if isResourceNamespaceScoped {
234
235 randomList, err = rv.metadataClient.Resource(gvr).
236 Namespace(fakeSVMNamespaceName).
237 List(ctx, metav1.ListOptions{
238 Limit: 1,
239 })
240 } else {
241 randomList, err = rv.metadataClient.Resource(gvr).
242 List(ctx, metav1.ListOptions{
243 Limit: 1,
244 })
245 }
246 if err != nil {
247
248 return "", fmt.Errorf("error getting latest resourceVersion for %s: %w", gvr.String(), err)
249 }
250
251 return randomList.GetResourceVersion(), err
252 }
253
254 func (rv *ResourceVersionController) resourceExists(gvr schema.GroupVersionResource) (bool, error) {
255 mapperGVRs, err := rv.mapper.ResourcesFor(gvr)
256 if err != nil {
257 return false, err
258 }
259
260 for _, mapperGVR := range mapperGVRs {
261 if mapperGVR.Group == gvr.Group &&
262 mapperGVR.Version == gvr.Version &&
263 mapperGVR.Resource == gvr.Resource {
264 return true, nil
265 }
266 }
267
268 return false, nil
269 }
270
271 func (rv *ResourceVersionController) isResourceNamespaceScoped(gvr schema.GroupVersionResource) (bool, error) {
272 resourceList, err := rv.discoveryClient.ServerResourcesForGroupVersion(gvr.GroupVersion().String())
273 if err != nil {
274 return false, err
275 }
276
277 for _, resource := range resourceList.APIResources {
278 if resource.Name == gvr.Resource {
279 return resource.Namespaced, nil
280 }
281 }
282
283 return false, fmt.Errorf("resource %q not found", gvr.String())
284 }
285
View as plain text