1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package recipe
16
17 import (
18 "context"
19 "fmt"
20 "strings"
21 "time"
22
23 v3 "go.etcd.io/etcd/client/v3"
24 "go.etcd.io/etcd/client/v3/concurrency"
25 )
26
27
28 type RemoteKV struct {
29 kv v3.KV
30 key string
31 rev int64
32 val string
33 }
34
35 func newKey(kv v3.KV, key string, leaseID v3.LeaseID) (*RemoteKV, error) {
36 return newKV(kv, key, "", leaseID)
37 }
38
39 func newKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) {
40 rev, err := putNewKV(kv, key, val, leaseID)
41 if err != nil {
42 return nil, err
43 }
44 return &RemoteKV{kv, key, rev, val}, nil
45 }
46
47 func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) {
48 for {
49 newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
50 rev, err := putNewKV(kv, newKey, val, v3.NoLease)
51 if err == nil {
52 return &RemoteKV{kv, newKey, rev, val}, nil
53 }
54 if err != ErrKeyExists {
55 return nil, err
56 }
57 }
58 }
59
60
61
62 func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) {
63 cmp := v3.Compare(v3.Version(key), "=", 0)
64 req := v3.OpPut(key, val, v3.WithLease(leaseID))
65 txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
66 if err != nil {
67 return 0, err
68 }
69 if !txnresp.Succeeded {
70 return 0, ErrKeyExists
71 }
72 return txnresp.Header.Revision, nil
73 }
74
75
76
77 func newSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) {
78 resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...)
79 if err != nil {
80 return nil, err
81 }
82
83
84 newSeqNum := 0
85 if len(resp.Kvs) != 0 {
86 fields := strings.Split(string(resp.Kvs[0].Key), "/")
87 _, serr := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
88 if serr != nil {
89 return nil, serr
90 }
91 newSeqNum++
92 }
93 newKey := fmt.Sprintf("%s/%016d", prefix, newSeqNum)
94
95
96
97
98
99
100 baseKey := "__" + prefix
101
102
103 cmp := v3.Compare(v3.ModRevision(baseKey), "<", resp.Header.Revision+1)
104 reqPrefix := v3.OpPut(baseKey, "")
105 reqnewKey := v3.OpPut(newKey, val)
106
107 txn := kv.Txn(context.TODO())
108 txnresp, err := txn.If(cmp).Then(reqPrefix, reqnewKey).Commit()
109 if err != nil {
110 return nil, err
111 }
112 if !txnresp.Succeeded {
113 return newSequentialKV(kv, prefix, val)
114 }
115 return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil
116 }
117
118 func (rk *RemoteKV) Key() string { return rk.key }
119 func (rk *RemoteKV) Revision() int64 { return rk.rev }
120 func (rk *RemoteKV) Value() string { return rk.val }
121
122 func (rk *RemoteKV) Delete() error {
123 if rk.kv == nil {
124 return nil
125 }
126 _, err := rk.kv.Delete(context.TODO(), rk.key)
127 rk.kv = nil
128 return err
129 }
130
131 func (rk *RemoteKV) Put(val string) error {
132 _, err := rk.kv.Put(context.TODO(), rk.key, val)
133 return err
134 }
135
136
137 type EphemeralKV struct{ RemoteKV }
138
139
140 func newEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) {
141 k, err := newKV(s.Client(), key, val, s.Lease())
142 if err != nil {
143 return nil, err
144 }
145 return &EphemeralKV{*k}, nil
146 }
147
148
149 func newUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) {
150 return newUniqueEphemeralKV(s, prefix, "")
151 }
152
153
154 func newUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) {
155 for {
156 newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
157 ek, err = newEphemeralKV(s, newKey, val)
158 if err == nil || err != ErrKeyExists {
159 break
160 }
161 }
162 return ek, err
163 }
164
View as plain text