...
1
16
17 package establish
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 apierrors "k8s.io/apimachinery/pkg/api/errors"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27 "k8s.io/apimachinery/pkg/util/wait"
28 "k8s.io/client-go/tools/cache"
29 "k8s.io/client-go/util/workqueue"
30 "k8s.io/klog/v2"
31
32 apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
33 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
34 client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
35 informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
36 listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
37 )
38
39
40 type EstablishingController struct {
41 crdClient client.CustomResourceDefinitionsGetter
42 crdLister listers.CustomResourceDefinitionLister
43 crdSynced cache.InformerSynced
44
45
46 syncFn func(key string) error
47
48 queue workqueue.RateLimitingInterface
49 }
50
51
52 func NewEstablishingController(crdInformer informers.CustomResourceDefinitionInformer,
53 crdClient client.CustomResourceDefinitionsGetter) *EstablishingController {
54 ec := &EstablishingController{
55 crdClient: crdClient,
56 crdLister: crdInformer.Lister(),
57 crdSynced: crdInformer.Informer().HasSynced,
58 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crdEstablishing"),
59 }
60
61 ec.syncFn = ec.sync
62
63 return ec
64 }
65
66
67 func (ec *EstablishingController) QueueCRD(key string, timeout time.Duration) {
68 ec.queue.AddAfter(key, timeout)
69 }
70
71
72 func (ec *EstablishingController) Run(stopCh <-chan struct{}) {
73 defer utilruntime.HandleCrash()
74 defer ec.queue.ShutDown()
75
76 klog.Info("Starting EstablishingController")
77 defer klog.Info("Shutting down EstablishingController")
78
79 if !cache.WaitForCacheSync(stopCh, ec.crdSynced) {
80 return
81 }
82
83
84 go wait.Until(ec.runWorker, time.Second, stopCh)
85
86 <-stopCh
87 }
88
89 func (ec *EstablishingController) runWorker() {
90 for ec.processNextWorkItem() {
91 }
92 }
93
94
95
96 func (ec *EstablishingController) processNextWorkItem() bool {
97 key, quit := ec.queue.Get()
98 if quit {
99 return false
100 }
101 defer ec.queue.Done(key)
102
103 err := ec.syncFn(key.(string))
104 if err == nil {
105 ec.queue.Forget(key)
106 return true
107 }
108
109 utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
110 ec.queue.AddRateLimited(key)
111
112 return true
113 }
114
115
116 func (ec *EstablishingController) sync(key string) error {
117 cachedCRD, err := ec.crdLister.Get(key)
118 if apierrors.IsNotFound(err) {
119 return nil
120 }
121 if err != nil {
122 return err
123 }
124
125 if !apiextensionshelpers.IsCRDConditionTrue(cachedCRD, apiextensionsv1.NamesAccepted) ||
126 apiextensionshelpers.IsCRDConditionTrue(cachedCRD, apiextensionsv1.Established) {
127 return nil
128 }
129
130 crd := cachedCRD.DeepCopy()
131 establishedCondition := apiextensionsv1.CustomResourceDefinitionCondition{
132 Type: apiextensionsv1.Established,
133 Status: apiextensionsv1.ConditionTrue,
134 Reason: "InitialNamesAccepted",
135 Message: "the initial names have been accepted",
136 }
137 apiextensionshelpers.SetCRDCondition(crd, establishedCondition)
138
139
140 _, err = ec.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
141 if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
142
143 return nil
144 }
145 if err != nil {
146 return err
147 }
148
149 return nil
150 }
151
View as plain text