1
16
17 package recorder
18
19 import (
20 "context"
21 "fmt"
22 "net/http"
23 "sync"
24
25 "github.com/go-logr/logr"
26 corev1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/runtime"
28 corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
29 "k8s.io/client-go/rest"
30 "k8s.io/client-go/tools/record"
31 )
32
33
34
35
36 type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool)
37
38
39
40 type Provider struct {
41 lock sync.RWMutex
42 stopped bool
43
44
45 scheme *runtime.Scheme
46
47 logger logr.Logger
48 evtClient corev1client.EventInterface
49 makeBroadcaster EventBroadcasterProducer
50
51 broadcasterOnce sync.Once
52 broadcaster record.EventBroadcaster
53 stopBroadcaster bool
54 }
55
56
57
58
59
60
61
62
63
64
65
66 func (p *Provider) Stop(shutdownCtx context.Context) {
67 doneCh := make(chan struct{})
68
69 go func() {
70
71
72
73
74 broadcaster := p.getBroadcaster()
75 if p.stopBroadcaster {
76 p.lock.Lock()
77 broadcaster.Shutdown()
78 p.stopped = true
79 p.lock.Unlock()
80 }
81 close(doneCh)
82 }()
83
84 select {
85 case <-shutdownCtx.Done():
86 case <-doneCh:
87 }
88 }
89
90
91
92 func (p *Provider) getBroadcaster() record.EventBroadcaster {
93
94
95
96
97
98
99 p.broadcasterOnce.Do(func() {
100 broadcaster, stop := p.makeBroadcaster()
101 broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
102 broadcaster.StartEventWatcher(
103 func(e *corev1.Event) {
104 p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason)
105 })
106 p.broadcaster = broadcaster
107 p.stopBroadcaster = stop
108 })
109
110 return p.broadcaster
111 }
112
113
114 func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {
115 if httpClient == nil {
116 panic("httpClient must not be nil")
117 }
118
119 corev1Client, err := corev1client.NewForConfigAndClient(config, httpClient)
120 if err != nil {
121 return nil, fmt.Errorf("failed to init client: %w", err)
122 }
123
124 p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: corev1Client.Events("")}
125 return p, nil
126 }
127
128
129
130 func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
131 return &lazyRecorder{
132 prov: p,
133 name: name,
134 }
135 }
136
137
138
139 type lazyRecorder struct {
140 prov *Provider
141 name string
142
143 recOnce sync.Once
144 rec record.EventRecorder
145 }
146
147
148 func (l *lazyRecorder) ensureRecording() {
149 l.recOnce.Do(func() {
150 broadcaster := l.prov.getBroadcaster()
151 l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
152 })
153 }
154
155 func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
156 l.ensureRecording()
157
158 l.prov.lock.RLock()
159 if !l.prov.stopped {
160 l.rec.Event(object, eventtype, reason, message)
161 }
162 l.prov.lock.RUnlock()
163 }
164 func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
165 l.ensureRecording()
166
167 l.prov.lock.RLock()
168 if !l.prov.stopped {
169 l.rec.Eventf(object, eventtype, reason, messageFmt, args...)
170 }
171 l.prov.lock.RUnlock()
172 }
173 func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
174 l.ensureRecording()
175
176 l.prov.lock.RLock()
177 if !l.prov.stopped {
178 l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)
179 }
180 l.prov.lock.RUnlock()
181 }
182
View as plain text