...
1
16
17 package watch
18
19 import (
20 "sync"
21
22 "k8s.io/apimachinery/pkg/runtime"
23 "k8s.io/apimachinery/pkg/watch"
24 "k8s.io/client-go/tools/cache"
25 )
26
27 func newEventProcessor(out chan<- watch.Event) *eventProcessor {
28 return &eventProcessor{
29 out: out,
30 cond: sync.NewCond(&sync.Mutex{}),
31 done: make(chan struct{}),
32 }
33 }
34
35
36
37
38
39
40 type eventProcessor struct {
41 out chan<- watch.Event
42
43 cond *sync.Cond
44 buff []watch.Event
45
46 done chan struct{}
47 }
48
49 func (e *eventProcessor) run() {
50 for {
51 batch := e.takeBatch()
52 e.writeBatch(batch)
53 if e.stopped() {
54 return
55 }
56 }
57 }
58
59 func (e *eventProcessor) takeBatch() []watch.Event {
60 e.cond.L.Lock()
61 defer e.cond.L.Unlock()
62
63 for len(e.buff) == 0 && !e.stopped() {
64 e.cond.Wait()
65 }
66
67 batch := e.buff
68 e.buff = nil
69 return batch
70 }
71
72 func (e *eventProcessor) writeBatch(events []watch.Event) {
73 for _, event := range events {
74 select {
75 case e.out <- event:
76 case <-e.done:
77 return
78 }
79 }
80 }
81
82 func (e *eventProcessor) push(event watch.Event) {
83 e.cond.L.Lock()
84 defer e.cond.L.Unlock()
85 defer e.cond.Signal()
86 e.buff = append(e.buff, event)
87 }
88
89 func (e *eventProcessor) stopped() bool {
90 select {
91 case <-e.done:
92 return true
93 default:
94 return false
95 }
96 }
97
98 func (e *eventProcessor) stop() {
99 close(e.done)
100 e.cond.Signal()
101 }
102
103
104
105
106 func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
107 ch := make(chan watch.Event)
108 w := watch.NewProxyWatcher(ch)
109 e := newEventProcessor(ch)
110
111 indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
112 AddFunc: func(obj interface{}) {
113 e.push(watch.Event{
114 Type: watch.Added,
115 Object: obj.(runtime.Object),
116 })
117 },
118 UpdateFunc: func(old, new interface{}) {
119 e.push(watch.Event{
120 Type: watch.Modified,
121 Object: new.(runtime.Object),
122 })
123 },
124 DeleteFunc: func(obj interface{}) {
125 staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
126 if stale {
127
128
129
130 obj = staleObj.Obj
131 }
132
133 e.push(watch.Event{
134 Type: watch.Deleted,
135 Object: obj.(runtime.Object),
136 })
137 },
138 }, cache.Indexers{})
139
140 go e.run()
141
142 doneCh := make(chan struct{})
143 go func() {
144 defer close(doneCh)
145 defer e.stop()
146 informer.Run(w.StopChan())
147 }()
148
149 return indexer, informer, w, doneCh
150 }
151
View as plain text