...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package grpcproxy
16
17 import (
18 "time"
19
20 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
21 "go.etcd.io/etcd/api/v3/mvccpb"
22 "go.etcd.io/etcd/client/v3"
23 "go.etcd.io/etcd/server/v3/mvcc"
24 )
25
26 type watchRange struct {
27 key, end string
28 }
29
30 func (wr *watchRange) valid() bool {
31 return len(wr.end) == 0 || wr.end > wr.key || (wr.end[0] == 0 && len(wr.end) == 1)
32 }
33
34 type watcher struct {
35
36
37 wr watchRange
38 filters []mvcc.FilterFunc
39 progress bool
40 prevKV bool
41
42
43 id int64
44
45 nextrev int64
46
47 lastHeader pb.ResponseHeader
48
49
50 wps *watchProxyStream
51 }
52
53
54
55 func (w *watcher) send(wr clientv3.WatchResponse) {
56 if wr.IsProgressNotify() && !w.progress {
57 return
58 }
59 if w.nextrev > wr.Header.Revision && len(wr.Events) > 0 {
60 return
61 }
62 if w.nextrev == 0 {
63
64 w.nextrev = wr.Header.Revision + 1
65 }
66
67 events := make([]*mvccpb.Event, 0, len(wr.Events))
68
69 var lastRev int64
70 for i := range wr.Events {
71 ev := (*mvccpb.Event)(wr.Events[i])
72 if ev.Kv.ModRevision < w.nextrev {
73 continue
74 } else {
75
76
77
78 lastRev = ev.Kv.ModRevision
79 }
80
81 filtered := false
82 for _, filter := range w.filters {
83 if filter(*ev) {
84 filtered = true
85 break
86 }
87 }
88 if filtered {
89 continue
90 }
91
92 if !w.prevKV {
93 evCopy := *ev
94 evCopy.PrevKv = nil
95 ev = &evCopy
96 }
97 events = append(events, ev)
98 }
99
100 if lastRev >= w.nextrev {
101 w.nextrev = lastRev + 1
102 }
103
104
105 if !wr.IsProgressNotify() && !wr.Created && len(events) == 0 && wr.CompactRevision == 0 {
106 return
107 }
108
109 w.lastHeader = wr.Header
110 w.post(&pb.WatchResponse{
111 Header: &wr.Header,
112 Created: wr.Created,
113 CompactRevision: wr.CompactRevision,
114 Canceled: wr.Canceled,
115 WatchId: w.id,
116 Events: events,
117 })
118 }
119
120
121 func (w *watcher) post(wr *pb.WatchResponse) bool {
122 select {
123 case w.wps.watchCh <- wr:
124 case <-time.After(50 * time.Millisecond):
125 w.wps.cancel()
126 w.wps.lg.Error("failed to put a watch response on the watcher's proxy stream channel,err is timeout")
127 return false
128 }
129 return true
130 }
131
View as plain text