1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package backend
16
17 import (
18 "bytes"
19 "sort"
20 )
21
22 const bucketBufferInitialSize = 512
23
24
25 type txBuffer struct {
26 buckets map[BucketID]*bucketBuffer
27 }
28
29 func (txb *txBuffer) reset() {
30 for k, v := range txb.buckets {
31 if v.used == 0 {
32
33 delete(txb.buckets, k)
34 }
35 v.used = 0
36 }
37 }
38
39
40 type txWriteBuffer struct {
41 txBuffer
42
43
44 bucket2seq map[BucketID]bool
45 }
46
47 func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {
48 txw.bucket2seq[bucket.ID()] = false
49 txw.putInternal(bucket, k, v)
50 }
51
52 func (txw *txWriteBuffer) putSeq(bucket Bucket, k, v []byte) {
53
54 txw.putInternal(bucket, k, v)
55 }
56
57 func (txw *txWriteBuffer) putInternal(bucket Bucket, k, v []byte) {
58 b, ok := txw.buckets[bucket.ID()]
59 if !ok {
60 b = newBucketBuffer()
61 txw.buckets[bucket.ID()] = b
62 }
63 b.add(k, v)
64 }
65
66 func (txw *txWriteBuffer) reset() {
67 txw.txBuffer.reset()
68 for k := range txw.bucket2seq {
69 v, ok := txw.buckets[k]
70 if !ok {
71 delete(txw.bucket2seq, k)
72 } else if v.used == 0 {
73 txw.bucket2seq[k] = true
74 }
75 }
76 }
77
78 func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
79 for k, wb := range txw.buckets {
80 rb, ok := txr.buckets[k]
81 if !ok {
82 delete(txw.buckets, k)
83 txr.buckets[k] = wb
84 continue
85 }
86 if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 {
87
88 sort.Sort(wb)
89 }
90 rb.merge(wb)
91 }
92 txw.reset()
93
94 txr.bufVersion++
95 }
96
97
98 type txReadBuffer struct {
99 txBuffer
100
101 bufVersion uint64
102 }
103
104 func (txr *txReadBuffer) Range(bucket Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
105 if b := txr.buckets[bucket.ID()]; b != nil {
106 return b.Range(key, endKey, limit)
107 }
108 return nil, nil
109 }
110
111 func (txr *txReadBuffer) ForEach(bucket Bucket, visitor func(k, v []byte) error) error {
112 if b := txr.buckets[bucket.ID()]; b != nil {
113 return b.ForEach(visitor)
114 }
115 return nil
116 }
117
118
119 func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
120 txrCopy := txReadBuffer{
121 txBuffer: txBuffer{
122 buckets: make(map[BucketID]*bucketBuffer, len(txr.txBuffer.buckets)),
123 },
124 bufVersion: 0,
125 }
126 for bucketName, bucket := range txr.txBuffer.buckets {
127 txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
128 }
129 return txrCopy
130 }
131
132 type kv struct {
133 key []byte
134 val []byte
135 }
136
137
138 type bucketBuffer struct {
139 buf []kv
140
141 used int
142 }
143
144 func newBucketBuffer() *bucketBuffer {
145 return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0}
146 }
147
148 func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
149 f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
150 idx := sort.Search(bb.used, f)
151 if idx < 0 {
152 return nil, nil
153 }
154 if len(endKey) == 0 {
155 if bytes.Equal(key, bb.buf[idx].key) {
156 keys = append(keys, bb.buf[idx].key)
157 vals = append(vals, bb.buf[idx].val)
158 }
159 return keys, vals
160 }
161 if bytes.Compare(endKey, bb.buf[idx].key) <= 0 {
162 return nil, nil
163 }
164 for i := idx; i < bb.used && int64(len(keys)) < limit; i++ {
165 if bytes.Compare(endKey, bb.buf[i].key) <= 0 {
166 break
167 }
168 keys = append(keys, bb.buf[i].key)
169 vals = append(vals, bb.buf[i].val)
170 }
171 return keys, vals
172 }
173
174 func (bb *bucketBuffer) ForEach(visitor func(k, v []byte) error) error {
175 for i := 0; i < bb.used; i++ {
176 if err := visitor(bb.buf[i].key, bb.buf[i].val); err != nil {
177 return err
178 }
179 }
180 return nil
181 }
182
183 func (bb *bucketBuffer) add(k, v []byte) {
184 bb.buf[bb.used].key, bb.buf[bb.used].val = k, v
185 bb.used++
186 if bb.used == len(bb.buf) {
187 buf := make([]kv, (3*len(bb.buf))/2)
188 copy(buf, bb.buf)
189 bb.buf = buf
190 }
191 }
192
193
194 func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
195 for i := 0; i < bbsrc.used; i++ {
196 bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val)
197 }
198 if bb.used == bbsrc.used {
199 return
200 }
201 if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
202 return
203 }
204
205 sort.Stable(bb)
206
207
208 widx := 0
209 for ridx := 1; ridx < bb.used; ridx++ {
210 if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
211 widx++
212 }
213 bb.buf[widx] = bb.buf[ridx]
214 }
215 bb.used = widx + 1
216 }
217
218 func (bb *bucketBuffer) Len() int { return bb.used }
219 func (bb *bucketBuffer) Less(i, j int) bool {
220 return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
221 }
222 func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }
223
224 func (bb *bucketBuffer) Copy() *bucketBuffer {
225 bbCopy := bucketBuffer{
226 buf: make([]kv, len(bb.buf)),
227 used: bb.used,
228 }
229 copy(bbCopy.buf, bb.buf)
230 return &bbCopy
231 }
232
View as plain text