...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package ordering
16
17 import (
18 "context"
19 "sync"
20
21 "go.etcd.io/etcd/client/v3"
22 )
23
24
25
26
27 type kvOrdering struct {
28 clientv3.KV
29 orderViolationFunc OrderViolationFunc
30 prevRev int64
31 revMu sync.RWMutex
32 }
33
34 func NewKV(kv clientv3.KV, orderViolationFunc OrderViolationFunc) *kvOrdering {
35 return &kvOrdering{kv, orderViolationFunc, 0, sync.RWMutex{}}
36 }
37
38 func (kv *kvOrdering) getPrevRev() int64 {
39 kv.revMu.RLock()
40 defer kv.revMu.RUnlock()
41 return kv.prevRev
42 }
43
44 func (kv *kvOrdering) setPrevRev(currRev int64) {
45 kv.revMu.Lock()
46 defer kv.revMu.Unlock()
47 if currRev > kv.prevRev {
48 kv.prevRev = currRev
49 }
50 }
51
52 func (kv *kvOrdering) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
53
54
55
56
57 prevRev := kv.getPrevRev()
58 op := clientv3.OpGet(key, opts...)
59 for {
60 r, err := kv.KV.Do(ctx, op)
61 if err != nil {
62 return nil, err
63 }
64 resp := r.Get()
65 if resp.Header.Revision == prevRev {
66 return resp, nil
67 } else if resp.Header.Revision > prevRev {
68 kv.setPrevRev(resp.Header.Revision)
69 return resp, nil
70 }
71 err = kv.orderViolationFunc(op, r, prevRev)
72 if err != nil {
73 return nil, err
74 }
75 }
76 }
77
78 func (kv *kvOrdering) Txn(ctx context.Context) clientv3.Txn {
79 return &txnOrdering{
80 kv.KV.Txn(ctx),
81 kv,
82 ctx,
83 sync.Mutex{},
84 []clientv3.Cmp{},
85 []clientv3.Op{},
86 []clientv3.Op{},
87 }
88 }
89
90
91
92
93 type txnOrdering struct {
94 clientv3.Txn
95 *kvOrdering
96 ctx context.Context
97 mu sync.Mutex
98 cmps []clientv3.Cmp
99 thenOps []clientv3.Op
100 elseOps []clientv3.Op
101 }
102
103 func (txn *txnOrdering) If(cs ...clientv3.Cmp) clientv3.Txn {
104 txn.mu.Lock()
105 defer txn.mu.Unlock()
106 txn.cmps = cs
107 txn.Txn.If(cs...)
108 return txn
109 }
110
111 func (txn *txnOrdering) Then(ops ...clientv3.Op) clientv3.Txn {
112 txn.mu.Lock()
113 defer txn.mu.Unlock()
114 txn.thenOps = ops
115 txn.Txn.Then(ops...)
116 return txn
117 }
118
119 func (txn *txnOrdering) Else(ops ...clientv3.Op) clientv3.Txn {
120 txn.mu.Lock()
121 defer txn.mu.Unlock()
122 txn.elseOps = ops
123 txn.Txn.Else(ops...)
124 return txn
125 }
126
127 func (txn *txnOrdering) Commit() (*clientv3.TxnResponse, error) {
128
129
130
131
132 prevRev := txn.getPrevRev()
133 opTxn := clientv3.OpTxn(txn.cmps, txn.thenOps, txn.elseOps)
134 for {
135 opResp, err := txn.KV.Do(txn.ctx, opTxn)
136 if err != nil {
137 return nil, err
138 }
139 txnResp := opResp.Txn()
140 if txnResp.Header.Revision >= prevRev {
141 txn.setPrevRev(txnResp.Header.Revision)
142 return txnResp, nil
143 }
144 err = txn.orderViolationFunc(opTxn, opResp, prevRev)
145 if err != nil {
146 return nil, err
147 }
148 }
149 }
150
View as plain text