...
1
16
17 package kubeutil
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23
24 "k8s.io/apimachinery/pkg/runtime/schema"
25 "k8s.io/apimachinery/pkg/types"
26 "k8s.io/client-go/util/workqueue"
27 "sigs.k8s.io/controller-runtime/pkg/reconcile"
28 "sigs.k8s.io/controller-runtime/pkg/source"
29 )
30
31 type EventSource interface {
32 AddConsumer(gvk schema.GroupVersionKind) source.Source
33 ReportError(gvk schema.GroupVersionKind, namespacedName types.NamespacedName, err error) error
34 HasReportedError(gvk schema.GroupVersionKind, namespacedName types.NamespacedName) error
35 }
36
37 type resource struct {
38 gvk schema.GroupVersionKind
39 namespacedName types.NamespacedName
40 }
41
42 type eventSource struct {
43 mu sync.RWMutex
44 dest map[schema.GroupVersionKind]workqueue.RateLimitingInterface
45 invalidate sync.Map
46 }
47
48 func NewEventStore() EventSource {
49 return &eventSource{
50 dest: make(map[schema.GroupVersionKind]workqueue.RateLimitingInterface),
51 }
52 }
53
54 func (es *eventSource) HasReportedError(gvk schema.GroupVersionKind, namespacedName types.NamespacedName) error {
55 err, ok := es.invalidate.LoadAndDelete(resource{
56 gvk: gvk,
57 namespacedName: namespacedName,
58 })
59 if !ok {
60 return nil
61 }
62 return err.(error)
63 }
64
65 func (es *eventSource) ReportError(gvk schema.GroupVersionKind, namespacedName types.NamespacedName, err error) error {
66 es.mu.RLock()
67 defer es.mu.RUnlock()
68
69 if queue, ok := es.dest[gvk]; !ok {
70 return fmt.Errorf("consumer for %v does not exist", gvk)
71 } else {
72 es.invalidate.Store(resource{
73 gvk: gvk,
74 namespacedName: namespacedName,
75 }, err)
76
77 queue.Add(reconcile.Request{NamespacedName: namespacedName})
78 return nil
79 }
80 }
81
82 func (es *eventSource) AddConsumer(gvk schema.GroupVersionKind) source.Source {
83 return &eventConsumer{
84 register: func(queue workqueue.RateLimitingInterface) error {
85 es.mu.Lock()
86 defer es.mu.Unlock()
87
88 _, ok := es.dest[gvk]
89 if ok {
90 return fmt.Errorf("consumer for %v already registered", gvk)
91 }
92
93 es.dest[gvk] = queue
94
95 return nil
96 },
97 }
98 }
99
100 type eventConsumer struct {
101 register func(queue workqueue.RateLimitingInterface) error
102 }
103
104 var _ source.Source = &eventConsumer{}
105
106 func (cs *eventConsumer) String() string {
107 return fmt.Sprintf("EventConsumer: %p", cs)
108 }
109
110
111 func (cs *eventConsumer) Start(_ context.Context, queue workqueue.RateLimitingInterface) error {
112 if cs.register == nil {
113 return fmt.Errorf("register function not provided")
114 }
115
116 err := cs.register(queue)
117 cs.register = nil
118
119 return err
120 }
121
View as plain text