...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package raft
16
17 import pb "go.etcd.io/etcd/raft/v3/raftpb"
18
19
20
21
22
23
24 type ReadState struct {
25 Index uint64
26 RequestCtx []byte
27 }
28
29 type readIndexStatus struct {
30 req pb.Message
31 index uint64
32
33
34
35
36 acks map[uint64]bool
37 }
38
39 type readOnly struct {
40 option ReadOnlyOption
41 pendingReadIndex map[string]*readIndexStatus
42 readIndexQueue []string
43 }
44
45 func newReadOnly(option ReadOnlyOption) *readOnly {
46 return &readOnly{
47 option: option,
48 pendingReadIndex: make(map[string]*readIndexStatus),
49 }
50 }
51
52
53
54
55
56 func (ro *readOnly) addRequest(index uint64, m pb.Message) {
57 s := string(m.Entries[0].Data)
58 if _, ok := ro.pendingReadIndex[s]; ok {
59 return
60 }
61 ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)}
62 ro.readIndexQueue = append(ro.readIndexQueue, s)
63 }
64
65
66
67
68 func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool {
69 rs, ok := ro.pendingReadIndex[string(context)]
70 if !ok {
71 return nil
72 }
73
74 rs.acks[id] = true
75 return rs.acks
76 }
77
78
79
80
81 func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
82 var (
83 i int
84 found bool
85 )
86
87 ctx := string(m.Context)
88 rss := []*readIndexStatus{}
89
90 for _, okctx := range ro.readIndexQueue {
91 i++
92 rs, ok := ro.pendingReadIndex[okctx]
93 if !ok {
94 panic("cannot find corresponding read state from pending map")
95 }
96 rss = append(rss, rs)
97 if okctx == ctx {
98 found = true
99 break
100 }
101 }
102
103 if found {
104 ro.readIndexQueue = ro.readIndexQueue[i:]
105 for _, rs := range rss {
106 delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
107 }
108 return rss
109 }
110
111 return nil
112 }
113
114
115
116 func (ro *readOnly) lastPendingRequestCtx() string {
117 if len(ro.readIndexQueue) == 0 {
118 return ""
119 }
120 return ro.readIndexQueue[len(ro.readIndexQueue)-1]
121 }
122
View as plain text