...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package concurrency
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "sync"
22
23 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
24 v3 "go.etcd.io/etcd/client/v3"
25 )
26
27
28 var ErrLocked = errors.New("mutex: Locked by another session")
29 var ErrSessionExpired = errors.New("mutex: session is expired")
30
31
32 type Mutex struct {
33 s *Session
34
35 pfx string
36 myKey string
37 myRev int64
38 hdr *pb.ResponseHeader
39 }
40
41 func NewMutex(s *Session, pfx string) *Mutex {
42 return &Mutex{s, pfx + "/", "", -1, nil}
43 }
44
45
46
47
48 func (m *Mutex) TryLock(ctx context.Context) error {
49 resp, err := m.tryAcquire(ctx)
50 if err != nil {
51 return err
52 }
53
54 ownerKey := resp.Responses[1].GetResponseRange().Kvs
55 if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
56 m.hdr = resp.Header
57 return nil
58 }
59 client := m.s.Client()
60
61 if _, err := client.Delete(ctx, m.myKey); err != nil {
62 return err
63 }
64 m.myKey = "\x00"
65 m.myRev = -1
66 return ErrLocked
67 }
68
69
70
71 func (m *Mutex) Lock(ctx context.Context) error {
72 resp, err := m.tryAcquire(ctx)
73 if err != nil {
74 return err
75 }
76
77 ownerKey := resp.Responses[1].GetResponseRange().Kvs
78 if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
79 m.hdr = resp.Header
80 return nil
81 }
82 client := m.s.Client()
83
84
85 _, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
86
87 if werr != nil {
88 m.Unlock(client.Ctx())
89 return werr
90 }
91
92
93 gresp, werr := client.Get(ctx, m.myKey)
94 if werr != nil {
95 m.Unlock(client.Ctx())
96 return werr
97 }
98
99 if len(gresp.Kvs) == 0 {
100 return ErrSessionExpired
101 }
102 m.hdr = gresp.Header
103
104 return nil
105 }
106
107 func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
108 s := m.s
109 client := m.s.Client()
110
111 m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
112 cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
113
114 put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
115
116 get := v3.OpGet(m.myKey)
117
118 getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
119 resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
120 if err != nil {
121 return nil, err
122 }
123 m.myRev = resp.Header.Revision
124 if !resp.Succeeded {
125 m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
126 }
127 return resp, nil
128 }
129
130 func (m *Mutex) Unlock(ctx context.Context) error {
131 client := m.s.Client()
132 if _, err := client.Delete(ctx, m.myKey); err != nil {
133 return err
134 }
135 m.myKey = "\x00"
136 m.myRev = -1
137 return nil
138 }
139
140 func (m *Mutex) IsOwner() v3.Cmp {
141 return v3.Compare(v3.CreateRevision(m.myKey), "=", m.myRev)
142 }
143
144 func (m *Mutex) Key() string { return m.myKey }
145
146
147 func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }
148
149 type lockerMutex struct{ *Mutex }
150
151 func (lm *lockerMutex) Lock() {
152 client := lm.s.Client()
153 if err := lm.Mutex.Lock(client.Ctx()); err != nil {
154 panic(err)
155 }
156 }
157 func (lm *lockerMutex) Unlock() {
158 client := lm.s.Client()
159 if err := lm.Mutex.Unlock(client.Ctx()); err != nil {
160 panic(err)
161 }
162 }
163
164
165 func NewLocker(s *Session, pfx string) sync.Locker {
166 return &lockerMutex{NewMutex(s, pfx)}
167 }
168
View as plain text