1
16
17 package kubeutil
18
19 import (
20 "context"
21 "fmt"
22 "math/rand"
23
24 "github.com/go-logr/logr"
25 apimeta "k8s.io/apimachinery/pkg/api/meta"
26 "k8s.io/apimachinery/pkg/fields"
27 "k8s.io/apimachinery/pkg/runtime"
28 "k8s.io/client-go/util/workqueue"
29 "sigs.k8s.io/controller-runtime/pkg/cache"
30 "sigs.k8s.io/controller-runtime/pkg/client"
31 "sigs.k8s.io/controller-runtime/pkg/event"
32 "sigs.k8s.io/controller-runtime/pkg/handler"
33 "sigs.k8s.io/controller-runtime/pkg/reconcile"
34 )
35
36 type linkedResourceHandler struct {
37 cache cache.Cache
38 objType client.Object
39 addToQueue func(q workqueue.RateLimitingInterface, req reconcile.Request)
40
41 refField string
42 scheme *runtime.Scheme
43 logger logr.Logger
44 }
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 func NewLinkedResourceHandler(
67 cacheCtx context.Context,
68 logger logr.Logger,
69 scheme *runtime.Scheme,
70 cache cache.Cache,
71 objType client.Object,
72 toId func(obj client.Object) []string,
73 addToQueue func(q workqueue.RateLimitingInterface, req reconcile.Request),
74 ) (handler.EventHandler, error) {
75
76 refField := fmt.Sprintf(".x-index.%s", randStringRunes(10))
77
78 if err := SetGroupVersionKind(scheme, objType); err != nil {
79 return nil, err
80 }
81
82
83
84
85 if err := cache.IndexField(cacheCtx, objType, refField, toId); err != nil {
86 return nil, err
87 }
88
89 return &linkedResourceHandler{
90 logger: logger,
91 scheme: scheme,
92 cache: cache,
93 objType: objType,
94 addToQueue: addToQueue,
95
96 refField: refField,
97 }, nil
98 }
99
100
101
102
103
104
105
106
107 func (r *linkedResourceHandler) findObjectsForKind(ctx context.Context, object client.Object) []reconcile.Request {
108 logger := r.logger.WithName("FindObjectsForKind").WithValues(
109 "object", client.ObjectKeyFromObject(object),
110 "objectType", fmt.Sprintf("%T", object),
111 )
112
113 objList, err := NewListObject(r.scheme, r.objType.GetObjectKind().GroupVersionKind())
114 if err != nil {
115 logger.Error(err, "While creating a List object")
116 return nil
117 }
118 listOps := &client.ListOptions{
119 FieldSelector: fields.OneTermEqualSelector(r.refField, fmt.Sprintf("%s/%s", object.GetNamespace(), object.GetName())),
120 }
121
122 if err := r.cache.List(ctx, objList, listOps); err != nil {
123 logger.Error(err, "While listing liked resources")
124 return nil
125 }
126
127 requests := make([]reconcile.Request, 0, apimeta.LenList(objList))
128 if err := apimeta.EachListItem(objList, func(object runtime.Object) error {
129 clientObj, ok := object.(client.Object)
130 if !ok {
131 return fmt.Errorf("object %T cannot be converted to client.Object", object)
132 }
133 requests = append(requests, reconcile.Request{
134 NamespacedName: client.ObjectKeyFromObject(clientObj),
135 })
136 return nil
137 }); err != nil {
138 logger.Error(err, "While itterating list")
139 return nil
140 }
141
142 return requests
143 }
144
145 var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
146
147 func randStringRunes(n int) string {
148 b := make([]rune, n)
149 for i := range b {
150 b[i] = letterRunes[rand.Intn(len(letterRunes))]
151 }
152 return string(b)
153 }
154
155
156
157
158 var _ handler.EventHandler = &linkedResourceHandler{}
159
160
161 func (e *linkedResourceHandler) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
162 reqs := map[reconcile.Request]struct{}{}
163 e.mapAndEnqueue(ctx, q, evt.Object, reqs)
164 }
165
166
167 func (e *linkedResourceHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
168 reqs := map[reconcile.Request]struct{}{}
169 e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
170 e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
171 }
172
173
174 func (e *linkedResourceHandler) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
175 reqs := map[reconcile.Request]struct{}{}
176 e.mapAndEnqueue(ctx, q, evt.Object, reqs)
177 }
178
179
180 func (e *linkedResourceHandler) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
181 reqs := map[reconcile.Request]struct{}{}
182 e.mapAndEnqueue(ctx, q, evt.Object, reqs)
183 }
184
185 func (e *linkedResourceHandler) mapAndEnqueue(ctx context.Context, q workqueue.RateLimitingInterface, object client.Object, reqs map[reconcile.Request]struct{}) {
186 for _, req := range e.findObjectsForKind(ctx, object) {
187 _, ok := reqs[req]
188 if !ok {
189 if e.addToQueue != nil {
190 e.addToQueue(q, req)
191 } else {
192 q.Add(req)
193 }
194
195 reqs[req] = struct{}{}
196 }
197 }
198 }
199
View as plain text