...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package recipe
16
17 import (
18 "context"
19
20 "go.etcd.io/etcd/api/v3/mvccpb"
21 v3 "go.etcd.io/etcd/client/v3"
22 "go.etcd.io/etcd/client/v3/concurrency"
23 )
24
25 type RWMutex struct {
26 s *concurrency.Session
27 ctx context.Context
28
29 pfx string
30 myKey *EphemeralKV
31 }
32
33 func NewRWMutex(s *concurrency.Session, prefix string) *RWMutex {
34 return &RWMutex{s, context.TODO(), prefix + "/", nil}
35 }
36
37 func (rwm *RWMutex) RLock() error {
38 rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"read")
39 if err != nil {
40 return err
41 }
42 rwm.myKey = rk
43
44 for {
45 if done, werr := rwm.waitOnLastRev(rwm.pfx + "write"); done || werr != nil {
46 return werr
47 }
48 }
49 }
50
51 func (rwm *RWMutex) Lock() error {
52 rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"write")
53 if err != nil {
54 return err
55 }
56 rwm.myKey = rk
57
58 for {
59 if done, werr := rwm.waitOnLastRev(rwm.pfx); done || werr != nil {
60 return werr
61 }
62
63 }
64 }
65
66
67
68 func (rwm *RWMutex) waitOnLastRev(pfx string) (bool, error) {
69 client := rwm.s.Client()
70
71 opts := append(v3.WithLastRev(), v3.WithMaxModRev(rwm.myKey.Revision()-1))
72 lastKey, err := client.Get(rwm.ctx, pfx, opts...)
73 if err != nil {
74 return false, err
75 }
76 if len(lastKey.Kvs) == 0 {
77 return true, nil
78 }
79
80 _, err = WaitEvents(
81 client,
82 string(lastKey.Kvs[0].Key),
83 rwm.myKey.Revision(),
84 []mvccpb.Event_EventType{mvccpb.DELETE})
85 return false, err
86 }
87
88 func (rwm *RWMutex) RUnlock() error { return rwm.myKey.Delete() }
89 func (rwm *RWMutex) Unlock() error { return rwm.myKey.Delete() }
90
View as plain text