...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "go.etcd.io/etcd/api/v3/mvccpb"
19 "go.etcd.io/etcd/pkg/v3/traceutil"
20 )
21
22 func (tw *watchableStoreTxnWrite) End() {
23 changes := tw.Changes()
24 if len(changes) == 0 {
25 tw.TxnWrite.End()
26 return
27 }
28
29 rev := tw.Rev() + 1
30 evs := make([]mvccpb.Event, len(changes))
31 for i, change := range changes {
32 evs[i].Kv = &changes[i]
33 if change.CreateRevision == 0 {
34 evs[i].Type = mvccpb.DELETE
35 evs[i].Kv.ModRevision = rev
36 } else {
37 evs[i].Type = mvccpb.PUT
38 }
39 }
40
41
42
43 tw.s.mu.Lock()
44 tw.s.notify(rev, evs)
45 tw.TxnWrite.End()
46 tw.s.mu.Unlock()
47 }
48
49 type watchableStoreTxnWrite struct {
50 TxnWrite
51 s *watchableStore
52 }
53
54 func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite {
55 return &watchableStoreTxnWrite{s.store.Write(trace), s}
56 }
57
View as plain text