...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package grpcproxy
16
17 import (
18 "sync"
19 )
20
21 type watchBroadcasts struct {
22 wp *watchProxy
23
24
25 mu sync.Mutex
26 bcasts map[*watchBroadcast]struct{}
27 watchers map[*watcher]*watchBroadcast
28
29 updatec chan *watchBroadcast
30 donec chan struct{}
31 }
32
33
34 const maxCoalesceReceivers = 5
35
36 func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts {
37 wbs := &watchBroadcasts{
38 wp: wp,
39 bcasts: make(map[*watchBroadcast]struct{}),
40 watchers: make(map[*watcher]*watchBroadcast),
41 updatec: make(chan *watchBroadcast, 1),
42 donec: make(chan struct{}),
43 }
44 go func() {
45 defer close(wbs.donec)
46 for wb := range wbs.updatec {
47 wbs.coalesce(wb)
48 }
49 }()
50 return wbs
51 }
52
53 func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
54 if wb.size() >= maxCoalesceReceivers {
55 return
56 }
57 wbs.mu.Lock()
58 for wbswb := range wbs.bcasts {
59 if wbswb == wb {
60 continue
61 }
62 wb.mu.Lock()
63 wbswb.mu.Lock()
64
65
66
67 if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 {
68 for w := range wb.receivers {
69 wbswb.receivers[w] = struct{}{}
70 wbs.watchers[w] = wbswb
71 }
72 wb.receivers = nil
73 }
74 wbswb.mu.Unlock()
75 wb.mu.Unlock()
76 if wb.empty() {
77 delete(wbs.bcasts, wb)
78 wb.stop()
79 break
80 }
81 }
82 wbs.mu.Unlock()
83 }
84
85 func (wbs *watchBroadcasts) add(w *watcher) {
86 wbs.mu.Lock()
87 defer wbs.mu.Unlock()
88
89 for wb := range wbs.bcasts {
90 if wb.add(w) {
91 wbs.watchers[w] = wb
92 return
93 }
94 }
95
96 wb := newWatchBroadcast(wbs.wp.lg, wbs.wp, w, wbs.update)
97 wbs.watchers[w] = wb
98 wbs.bcasts[wb] = struct{}{}
99 }
100
101
102 func (wbs *watchBroadcasts) delete(w *watcher) int {
103 wbs.mu.Lock()
104 defer wbs.mu.Unlock()
105
106 wb, ok := wbs.watchers[w]
107 if !ok {
108 panic("deleting missing watcher from broadcasts")
109 }
110 delete(wbs.watchers, w)
111 wb.delete(w)
112 if wb.empty() {
113 delete(wbs.bcasts, wb)
114 wb.stop()
115 }
116 return len(wbs.bcasts)
117 }
118
119 func (wbs *watchBroadcasts) stop() {
120 wbs.mu.Lock()
121 for wb := range wbs.bcasts {
122 wb.stop()
123 }
124 wbs.bcasts = nil
125 close(wbs.updatec)
126 wbs.mu.Unlock()
127 <-wbs.donec
128 }
129
130 func (wbs *watchBroadcasts) update(wb *watchBroadcast) {
131 select {
132 case wbs.updatec <- wb:
133 default:
134 }
135 }
136
View as plain text