...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package wal
16
17 import (
18 "encoding/binary"
19 "hash"
20 "io"
21 "os"
22 "sync"
23
24 "go.etcd.io/etcd/pkg/v3/crc"
25 "go.etcd.io/etcd/pkg/v3/ioutil"
26 "go.etcd.io/etcd/server/v3/wal/walpb"
27 )
28
29
30
31
32 const walPageBytes = 8 * minSectorSize
33
34 type encoder struct {
35 mu sync.Mutex
36 bw *ioutil.PageWriter
37
38 crc hash.Hash32
39 buf []byte
40 uint64buf []byte
41 }
42
43 func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
44 return &encoder{
45 bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset),
46 crc: crc.New(prevCrc, crcTable),
47
48 buf: make([]byte, 1024*1024),
49 uint64buf: make([]byte, 8),
50 }
51 }
52
53
54 func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
55 offset, err := f.Seek(0, io.SeekCurrent)
56 if err != nil {
57 return nil, err
58 }
59 return newEncoder(f, prevCrc, int(offset)), nil
60 }
61
62 func (e *encoder) encode(rec *walpb.Record) error {
63 e.mu.Lock()
64 defer e.mu.Unlock()
65
66 e.crc.Write(rec.Data)
67 rec.Crc = e.crc.Sum32()
68 var (
69 data []byte
70 err error
71 n int
72 )
73
74 if rec.Size() > len(e.buf) {
75 data, err = rec.Marshal()
76 if err != nil {
77 return err
78 }
79 } else {
80 n, err = rec.MarshalTo(e.buf)
81 if err != nil {
82 return err
83 }
84 data = e.buf[:n]
85 }
86
87 lenField, padBytes := encodeFrameSize(len(data))
88 if err = writeUint64(e.bw, lenField, e.uint64buf); err != nil {
89 return err
90 }
91
92 if padBytes != 0 {
93 data = append(data, make([]byte, padBytes)...)
94 }
95 n, err = e.bw.Write(data)
96 walWriteBytes.Add(float64(n))
97 return err
98 }
99
100 func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) {
101 lenField = uint64(dataBytes)
102
103 padBytes = (8 - (dataBytes % 8)) % 8
104 if padBytes != 0 {
105 lenField |= uint64(0x80|padBytes) << 56
106 }
107 return lenField, padBytes
108 }
109
110 func (e *encoder) flush() error {
111 e.mu.Lock()
112 n, err := e.bw.FlushN()
113 e.mu.Unlock()
114 walWriteBytes.Add(float64(n))
115 return err
116 }
117
118 func writeUint64(w io.Writer, n uint64, buf []byte) error {
119
120 binary.LittleEndian.PutUint64(buf, n)
121 nv, err := w.Write(buf)
122 walWriteBytes.Add(float64(nv))
123 return err
124 }
125
View as plain text