...
1
2
3
4 package watcher
5
6 import (
7 "context"
8 "fmt"
9
10 "k8s.io/klog/v2"
11 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
12 )
13
14
15
16
17 type eventFunnel struct {
18
19 ctx context.Context
20
21 outCh chan event.Event
22
23
24 doneCh chan struct{}
25
26 counterCh chan int
27 }
28
29 func newEventFunnel(ctx context.Context) *eventFunnel {
30 funnel := &eventFunnel{
31 ctx: ctx,
32 outCh: make(chan event.Event),
33 doneCh: make(chan struct{}),
34 counterCh: make(chan int),
35 }
36
37
38 go func() {
39 defer func() {
40
41 klog.V(5).Info("Closing funnel")
42 close(funnel.outCh)
43 close(funnel.doneCh)
44 }()
45 ctxDoneCh := ctx.Done()
46
47
48 inputs := 0
49 for {
50 select {
51 case delta := <-funnel.counterCh:
52 inputs += delta
53 klog.V(5).Infof("Funnel input channels (%+d): %d", delta, inputs)
54 case <-ctxDoneCh:
55
56
57 ctxDoneCh = nil
58 }
59 if ctxDoneCh == nil && inputs <= 0 {
60
61 break
62 }
63 }
64 }()
65 return funnel
66 }
67
68
69 func (m *eventFunnel) AddInputChannel(inCh <-chan event.Event) error {
70 select {
71 case <-m.ctx.Done():
72 return &EventFunnelClosedError{ContextError: m.ctx.Err()}
73 case m.counterCh <- 1:
74 }
75
76
77 go m.drain(inCh, m.outCh)
78 return nil
79 }
80
81
82
83 func (m *eventFunnel) OutputChannel() <-chan event.Event {
84 return m.outCh
85 }
86
87
88
89
90 func (m *eventFunnel) Done() <-chan struct{} {
91 return m.doneCh
92 }
93
94
95 func (m *eventFunnel) drain(inCh <-chan event.Event, outCh chan<- event.Event) {
96 defer func() {
97 m.counterCh <- -1
98 }()
99 for event := range inCh {
100 outCh <- event
101 }
102 }
103
104 type EventFunnelClosedError struct {
105 ContextError error
106 }
107
108 func (e *EventFunnelClosedError) Error() string {
109 return fmt.Sprintf("event funnel closed: %v", e.ContextError)
110 }
111
112 func (e *EventFunnelClosedError) Is(err error) bool {
113 fcErr, ok := err.(*EventFunnelClosedError)
114 if !ok {
115 return false
116 }
117 return e.ContextError == fcErr.ContextError
118 }
119
120 func (e *EventFunnelClosedError) Unwrap() error {
121 return e.ContextError
122 }
123
View as plain text