...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package wal
16
17 import (
18 "encoding/binary"
19 "fmt"
20 "hash"
21 "io"
22 "sync"
23
24 "go.etcd.io/etcd/client/pkg/v3/fileutil"
25 "go.etcd.io/etcd/pkg/v3/crc"
26 "go.etcd.io/etcd/pkg/v3/pbutil"
27 "go.etcd.io/etcd/raft/v3/raftpb"
28 "go.etcd.io/etcd/server/v3/wal/walpb"
29 )
30
31 const minSectorSize = 512
32
33
34 const frameSizeBytes = 8
35
36 type decoder struct {
37 mu sync.Mutex
38 brs []*fileutil.FileBufReader
39
40
41 lastValidOff int64
42 crc hash.Hash32
43 }
44
45 func newDecoder(r ...fileutil.FileReader) *decoder {
46 readers := make([]*fileutil.FileBufReader, len(r))
47 for i := range r {
48 readers[i] = fileutil.NewFileBufReader(r[i])
49 }
50 return &decoder{
51 brs: readers,
52 crc: crc.New(0, crcTable),
53 }
54 }
55
56 func (d *decoder) decode(rec *walpb.Record) error {
57 rec.Reset()
58 d.mu.Lock()
59 defer d.mu.Unlock()
60 return d.decodeRecord(rec)
61 }
62
63 func (d *decoder) decodeRecord(rec *walpb.Record) error {
64 if len(d.brs) == 0 {
65 return io.EOF
66 }
67
68 fileBufReader := d.brs[0]
69 l, err := readInt64(fileBufReader)
70 if err == io.EOF || (err == nil && l == 0) {
71
72 d.brs = d.brs[1:]
73 if len(d.brs) == 0 {
74 return io.EOF
75 }
76 d.lastValidOff = 0
77 return d.decodeRecord(rec)
78 }
79 if err != nil {
80 return err
81 }
82
83 recBytes, padBytes := decodeFrameSize(l)
84
85 maxEntryLimit := fileBufReader.FileInfo().Size() - d.lastValidOff - padBytes
86 if recBytes > maxEntryLimit {
87 return fmt.Errorf("%w: [wal] max entry size limit exceeded when decoding %q, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)",
88 io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit)
89 }
90
91 data := make([]byte, recBytes+padBytes)
92 if _, err = io.ReadFull(fileBufReader, data); err != nil {
93
94
95 if err == io.EOF {
96 err = io.ErrUnexpectedEOF
97 }
98 return err
99 }
100 if err := rec.Unmarshal(data[:recBytes]); err != nil {
101 if d.isTornEntry(data) {
102 return io.ErrUnexpectedEOF
103 }
104 return err
105 }
106
107
108 if rec.Type != crcType {
109 d.crc.Write(rec.Data)
110 if err := rec.Validate(d.crc.Sum32()); err != nil {
111 if d.isTornEntry(data) {
112 return io.ErrUnexpectedEOF
113 }
114 return err
115 }
116 }
117
118 d.lastValidOff += frameSizeBytes + recBytes + padBytes
119 return nil
120 }
121
122 func decodeFrameSize(lenField int64) (recBytes int64, padBytes int64) {
123
124 recBytes = int64(uint64(lenField) & ^(uint64(0xff) << 56))
125
126 if lenField < 0 {
127
128 padBytes = int64((uint64(lenField) >> 56) & 0x7)
129 }
130 return recBytes, padBytes
131 }
132
133
134
135 func (d *decoder) isTornEntry(data []byte) bool {
136 if len(d.brs) != 1 {
137 return false
138 }
139
140 fileOff := d.lastValidOff + frameSizeBytes
141 curOff := 0
142 chunks := [][]byte{}
143
144 for curOff < len(data) {
145 chunkLen := int(minSectorSize - (fileOff % minSectorSize))
146 if chunkLen > len(data)-curOff {
147 chunkLen = len(data) - curOff
148 }
149 chunks = append(chunks, data[curOff:curOff+chunkLen])
150 fileOff += int64(chunkLen)
151 curOff += chunkLen
152 }
153
154
155 for _, sect := range chunks {
156 isZero := true
157 for _, v := range sect {
158 if v != 0 {
159 isZero = false
160 break
161 }
162 }
163 if isZero {
164 return true
165 }
166 }
167 return false
168 }
169
170 func (d *decoder) updateCRC(prevCrc uint32) {
171 d.crc = crc.New(prevCrc, crcTable)
172 }
173
174 func (d *decoder) lastCRC() uint32 {
175 return d.crc.Sum32()
176 }
177
178 func (d *decoder) lastOffset() int64 { return d.lastValidOff }
179
180 func mustUnmarshalEntry(d []byte) raftpb.Entry {
181 var e raftpb.Entry
182 pbutil.MustUnmarshal(&e, d)
183 return e
184 }
185
186 func mustUnmarshalState(d []byte) raftpb.HardState {
187 var s raftpb.HardState
188 pbutil.MustUnmarshal(&s, d)
189 return s
190 }
191
192 func readInt64(r io.Reader) (int64, error) {
193 var n int64
194 err := binary.Read(r, binary.LittleEndian, &n)
195 return n, err
196 }
197
View as plain text