1
16
17 package zstdchunked
18
19 import (
20 "bufio"
21 "bytes"
22 "encoding/binary"
23 "encoding/json"
24 "fmt"
25 "hash"
26 "io"
27 "sync"
28
29 "github.com/containerd/stargz-snapshotter/estargz"
30 "github.com/klauspost/compress/zstd"
31 digest "github.com/opencontainers/go-digest"
32 )
33
34 const (
35
36 ManifestChecksumAnnotation = "io.containers.zstd-chunked.manifest-checksum"
37
38
39 ManifestPositionAnnotation = "io.containers.zstd-chunked.manifest-position"
40
41
42 FooterSize = 40
43
44 manifestTypeCRFS = 1
45 )
46
47 var (
48 skippableFrameMagic = []byte{0x50, 0x2a, 0x4d, 0x18}
49 zstdFrameMagic = []byte{0x28, 0xb5, 0x2f, 0xfd}
50 zstdChunkedFrameMagic = []byte{0x47, 0x6e, 0x55, 0x6c, 0x49, 0x6e, 0x55, 0x78}
51 )
52
53 type Decompressor struct{}
54
55 func (zz *Decompressor) Reader(r io.Reader) (io.ReadCloser, error) {
56 decoder, err := zstd.NewReader(r)
57 if err != nil {
58 return nil, err
59 }
60 return &zstdReadCloser{decoder}, nil
61 }
62
63 func (zz *Decompressor) ParseTOC(r io.Reader) (toc *estargz.JTOC, tocDgst digest.Digest, err error) {
64 zr, err := zstd.NewReader(r)
65 if err != nil {
66 return nil, "", err
67 }
68 defer zr.Close()
69 dgstr := digest.Canonical.Digester()
70 toc = new(estargz.JTOC)
71 if err := json.NewDecoder(io.TeeReader(zr, dgstr.Hash())).Decode(&toc); err != nil {
72 return nil, "", fmt.Errorf("error decoding TOC JSON: %w", err)
73 }
74 return toc, dgstr.Digest(), nil
75 }
76
77 func (zz *Decompressor) ParseFooter(p []byte) (blobPayloadSize, tocOffset, tocSize int64, err error) {
78 offset := binary.LittleEndian.Uint64(p[0:8])
79 compressedLength := binary.LittleEndian.Uint64(p[8:16])
80 if !bytes.Equal(zstdChunkedFrameMagic, p[32:40]) {
81 return 0, 0, 0, fmt.Errorf("invalid magic number")
82 }
83
84 return int64(offset - 8), int64(offset), int64(compressedLength), nil
85 }
86
87 func (zz *Decompressor) FooterSize() int64 {
88 return FooterSize
89 }
90
91 func (zz *Decompressor) DecompressTOC(r io.Reader) (tocJSON io.ReadCloser, err error) {
92 decoder, err := zstd.NewReader(r)
93 if err != nil {
94 return nil, err
95 }
96 br := bufio.NewReader(decoder)
97 if _, err := br.Peek(1); err != nil {
98 return nil, err
99 }
100 return &reader{br, decoder.Close}, nil
101 }
102
103 type reader struct {
104 io.Reader
105 closeFunc func()
106 }
107
108 func (r *reader) Close() error { r.closeFunc(); return nil }
109
110 type zstdReadCloser struct{ *zstd.Decoder }
111
112 func (z *zstdReadCloser) Close() error {
113 z.Decoder.Close()
114 return nil
115 }
116
117 type Compressor struct {
118 CompressionLevel zstd.EncoderLevel
119 Metadata map[string]string
120
121 pool sync.Pool
122 }
123
124 func (zc *Compressor) Writer(w io.Writer) (estargz.WriteFlushCloser, error) {
125 if wc := zc.pool.Get(); wc != nil {
126 ec := wc.(*zstd.Encoder)
127 ec.Reset(w)
128 return &poolEncoder{ec, zc}, nil
129 }
130 ec, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zc.CompressionLevel), zstd.WithLowerEncoderMem(true))
131 if err != nil {
132 return nil, err
133 }
134 return &poolEncoder{ec, zc}, nil
135 }
136
137 type poolEncoder struct {
138 *zstd.Encoder
139 zc *Compressor
140 }
141
142 func (w *poolEncoder) Close() error {
143 if err := w.Encoder.Close(); err != nil {
144 return err
145 }
146 w.zc.pool.Put(w.Encoder)
147 return nil
148 }
149
150 func (zc *Compressor) WriteTOCAndFooter(w io.Writer, off int64, toc *estargz.JTOC, diffHash hash.Hash) (digest.Digest, error) {
151 tocJSON, err := json.MarshalIndent(toc, "", "\t")
152 if err != nil {
153 return "", err
154 }
155 buf := new(bytes.Buffer)
156 encoder, err := zstd.NewWriter(buf, zstd.WithEncoderLevel(zc.CompressionLevel))
157 if err != nil {
158 return "", err
159 }
160 if _, err := encoder.Write(tocJSON); err != nil {
161 return "", err
162 }
163 if err := encoder.Close(); err != nil {
164 return "", err
165 }
166 compressedTOC := buf.Bytes()
167 _, err = io.Copy(w, bytes.NewReader(appendSkippableFrameMagic(compressedTOC)))
168
169
170 tocOff := uint64(off) + 8
171 if _, err := w.Write(appendSkippableFrameMagic(
172 zstdFooterBytes(tocOff, uint64(len(tocJSON)), uint64(len(compressedTOC)))),
173 ); err != nil {
174 return "", err
175 }
176
177 if zc.Metadata != nil {
178 zc.Metadata[ManifestChecksumAnnotation] = digest.FromBytes(compressedTOC).String()
179 zc.Metadata[ManifestPositionAnnotation] = fmt.Sprintf("%d:%d:%d:%d",
180 tocOff, len(compressedTOC), len(tocJSON), manifestTypeCRFS)
181 }
182
183 return digest.FromBytes(tocJSON), err
184 }
185
186
187 func zstdFooterBytes(tocOff, tocRawSize, tocCompressedSize uint64) []byte {
188 footer := make([]byte, FooterSize)
189 binary.LittleEndian.PutUint64(footer, tocOff)
190 binary.LittleEndian.PutUint64(footer[8:], tocCompressedSize)
191 binary.LittleEndian.PutUint64(footer[16:], tocRawSize)
192 binary.LittleEndian.PutUint64(footer[24:], manifestTypeCRFS)
193 copy(footer[32:40], zstdChunkedFrameMagic)
194 return footer
195 }
196
197 func appendSkippableFrameMagic(b []byte) []byte {
198 size := make([]byte, 4)
199 binary.LittleEndian.PutUint32(size, uint32(len(b)))
200 return append(append(skippableFrameMagic, size...), b...)
201 }
202
View as plain text