...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package recipe
16
17 import (
18 "context"
19
20 "go.etcd.io/etcd/api/v3/mvccpb"
21 "go.etcd.io/etcd/client/v3"
22 "go.etcd.io/etcd/client/v3/concurrency"
23 )
24
25
26
27 type DoubleBarrier struct {
28 s *concurrency.Session
29 ctx context.Context
30
31 key string
32 count int
33 myKey *EphemeralKV
34 }
35
36 func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarrier {
37 return &DoubleBarrier{
38 s: s,
39 ctx: context.TODO(),
40 key: key,
41 count: count,
42 }
43 }
44
45
46 func (b *DoubleBarrier) Enter() error {
47 client := b.s.Client()
48
49
50
51 if resp1, err := b.enteredClients(client); err != nil {
52 return err
53 } else if len(resp1.Kvs) >= b.count {
54 return ErrTooManyClients
55 }
56
57 ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters")
58 if err != nil {
59 return err
60 }
61 b.myKey = ek
62
63
64 resp2, err := b.enteredClients(client)
65 if err != nil {
66 return err
67 }
68 if len(resp2.Kvs) >= b.count {
69 lastWaiter := resp2.Kvs[b.count-1]
70 if ek.rev > lastWaiter.CreateRevision {
71
72
73
74 if err = b.myKey.Delete(); err != nil {
75
76
77 }
78 return ErrTooManyClients
79 }
80
81 if ek.rev == lastWaiter.CreateRevision {
82
83
84
85 _, err = client.Put(b.ctx, b.key+"/ready", "")
86 return err
87 }
88 }
89
90 _, err = WaitEvents(
91 client,
92 b.key+"/ready",
93 ek.Revision(),
94 []mvccpb.Event_EventType{mvccpb.PUT})
95 return err
96 }
97
98
99
100 func (b *DoubleBarrier) enteredClients(cli *clientv3.Client) (*clientv3.GetResponse, error) {
101 resp, err := cli.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix(),
102 clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))
103 if err != nil {
104 return nil, err
105 }
106
107 return resp, nil
108 }
109
110
111 func (b *DoubleBarrier) Leave() error {
112 client := b.s.Client()
113 resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
114 if err != nil {
115 return err
116 }
117 if len(resp.Kvs) == 0 {
118 return nil
119 }
120
121 lowest, highest := resp.Kvs[0], resp.Kvs[0]
122 for _, k := range resp.Kvs {
123 if k.ModRevision < lowest.ModRevision {
124 lowest = k
125 }
126 if k.ModRevision > highest.ModRevision {
127 highest = k
128 }
129 }
130 isLowest := string(lowest.Key) == b.myKey.Key()
131
132 if len(resp.Kvs) == 1 && isLowest {
133
134 if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
135 return err
136 }
137 return b.myKey.Delete()
138 }
139
140
141
142
143
144 if isLowest {
145 _, err = WaitEvents(
146 client,
147 string(highest.Key),
148 highest.ModRevision,
149 []mvccpb.Event_EventType{mvccpb.DELETE})
150 if err != nil {
151 return err
152 }
153 return b.Leave()
154 }
155
156
157 if err = b.myKey.Delete(); err != nil {
158 return err
159 }
160
161 key := string(lowest.Key)
162 _, err = WaitEvents(
163 client,
164 key,
165 lowest.ModRevision,
166 []mvccpb.Event_EventType{mvccpb.DELETE})
167 if err != nil {
168 return err
169 }
170 return b.Leave()
171 }
172
View as plain text