...

Source file src/github.com/containerd/stargz-snapshotter/estargz/zstdchunked/zstdchunked.go

Documentation: github.com/containerd/stargz-snapshotter/estargz/zstdchunked

     1  /*
     2     Copyright The containerd Authors.
     3  
     4     Licensed under the Apache License, Version 2.0 (the "License");
     5     you may not use this file except in compliance with the License.
     6     You may obtain a copy of the License at
     7  
     8         http://www.apache.org/licenses/LICENSE-2.0
     9  
    10     Unless required by applicable law or agreed to in writing, software
    11     distributed under the License is distributed on an "AS IS" BASIS,
    12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13     See the License for the specific language governing permissions and
    14     limitations under the License.
    15  */
    16  
    17  package zstdchunked
    18  
    19  import (
    20  	"bufio"
    21  	"bytes"
    22  	"encoding/binary"
    23  	"encoding/json"
    24  	"fmt"
    25  	"hash"
    26  	"io"
    27  	"sync"
    28  
    29  	"github.com/containerd/stargz-snapshotter/estargz"
    30  	"github.com/klauspost/compress/zstd"
    31  	digest "github.com/opencontainers/go-digest"
    32  )
    33  
    34  const (
    35  	// ManifestChecksumAnnotation is an annotation that contains the compressed TOC Digset
    36  	ManifestChecksumAnnotation = "io.containers.zstd-chunked.manifest-checksum"
    37  
    38  	// ManifestPositionAnnotation is an annotation that contains the offset to the TOC.
    39  	ManifestPositionAnnotation = "io.containers.zstd-chunked.manifest-position"
    40  
    41  	// FooterSize is the size of the footer
    42  	FooterSize = 40
    43  
    44  	manifestTypeCRFS = 1
    45  )
    46  
    47  var (
    48  	skippableFrameMagic   = []byte{0x50, 0x2a, 0x4d, 0x18}
    49  	zstdFrameMagic        = []byte{0x28, 0xb5, 0x2f, 0xfd}
    50  	zstdChunkedFrameMagic = []byte{0x47, 0x6e, 0x55, 0x6c, 0x49, 0x6e, 0x55, 0x78}
    51  )
    52  
    53  type Decompressor struct{}
    54  
    55  func (zz *Decompressor) Reader(r io.Reader) (io.ReadCloser, error) {
    56  	decoder, err := zstd.NewReader(r)
    57  	if err != nil {
    58  		return nil, err
    59  	}
    60  	return &zstdReadCloser{decoder}, nil
    61  }
    62  
    63  func (zz *Decompressor) ParseTOC(r io.Reader) (toc *estargz.JTOC, tocDgst digest.Digest, err error) {
    64  	zr, err := zstd.NewReader(r)
    65  	if err != nil {
    66  		return nil, "", err
    67  	}
    68  	defer zr.Close()
    69  	dgstr := digest.Canonical.Digester()
    70  	toc = new(estargz.JTOC)
    71  	if err := json.NewDecoder(io.TeeReader(zr, dgstr.Hash())).Decode(&toc); err != nil {
    72  		return nil, "", fmt.Errorf("error decoding TOC JSON: %w", err)
    73  	}
    74  	return toc, dgstr.Digest(), nil
    75  }
    76  
    77  func (zz *Decompressor) ParseFooter(p []byte) (blobPayloadSize, tocOffset, tocSize int64, err error) {
    78  	offset := binary.LittleEndian.Uint64(p[0:8])
    79  	compressedLength := binary.LittleEndian.Uint64(p[8:16])
    80  	if !bytes.Equal(zstdChunkedFrameMagic, p[32:40]) {
    81  		return 0, 0, 0, fmt.Errorf("invalid magic number")
    82  	}
    83  	// 8 is the size of the zstd skippable frame header + the frame size (see WriteTOCAndFooter)
    84  	return int64(offset - 8), int64(offset), int64(compressedLength), nil
    85  }
    86  
    87  func (zz *Decompressor) FooterSize() int64 {
    88  	return FooterSize
    89  }
    90  
    91  func (zz *Decompressor) DecompressTOC(r io.Reader) (tocJSON io.ReadCloser, err error) {
    92  	decoder, err := zstd.NewReader(r)
    93  	if err != nil {
    94  		return nil, err
    95  	}
    96  	br := bufio.NewReader(decoder)
    97  	if _, err := br.Peek(1); err != nil {
    98  		return nil, err
    99  	}
   100  	return &reader{br, decoder.Close}, nil
   101  }
   102  
   103  type reader struct {
   104  	io.Reader
   105  	closeFunc func()
   106  }
   107  
   108  func (r *reader) Close() error { r.closeFunc(); return nil }
   109  
   110  type zstdReadCloser struct{ *zstd.Decoder }
   111  
   112  func (z *zstdReadCloser) Close() error {
   113  	z.Decoder.Close()
   114  	return nil
   115  }
   116  
   117  type Compressor struct {
   118  	CompressionLevel zstd.EncoderLevel
   119  	Metadata         map[string]string
   120  
   121  	pool sync.Pool
   122  }
   123  
   124  func (zc *Compressor) Writer(w io.Writer) (estargz.WriteFlushCloser, error) {
   125  	if wc := zc.pool.Get(); wc != nil {
   126  		ec := wc.(*zstd.Encoder)
   127  		ec.Reset(w)
   128  		return &poolEncoder{ec, zc}, nil
   129  	}
   130  	ec, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zc.CompressionLevel), zstd.WithLowerEncoderMem(true))
   131  	if err != nil {
   132  		return nil, err
   133  	}
   134  	return &poolEncoder{ec, zc}, nil
   135  }
   136  
   137  type poolEncoder struct {
   138  	*zstd.Encoder
   139  	zc *Compressor
   140  }
   141  
   142  func (w *poolEncoder) Close() error {
   143  	if err := w.Encoder.Close(); err != nil {
   144  		return err
   145  	}
   146  	w.zc.pool.Put(w.Encoder)
   147  	return nil
   148  }
   149  
   150  func (zc *Compressor) WriteTOCAndFooter(w io.Writer, off int64, toc *estargz.JTOC, diffHash hash.Hash) (digest.Digest, error) {
   151  	tocJSON, err := json.MarshalIndent(toc, "", "\t")
   152  	if err != nil {
   153  		return "", err
   154  	}
   155  	buf := new(bytes.Buffer)
   156  	encoder, err := zstd.NewWriter(buf, zstd.WithEncoderLevel(zc.CompressionLevel))
   157  	if err != nil {
   158  		return "", err
   159  	}
   160  	if _, err := encoder.Write(tocJSON); err != nil {
   161  		return "", err
   162  	}
   163  	if err := encoder.Close(); err != nil {
   164  		return "", err
   165  	}
   166  	compressedTOC := buf.Bytes()
   167  	_, err = io.Copy(w, bytes.NewReader(appendSkippableFrameMagic(compressedTOC)))
   168  
   169  	// 8 is the size of the zstd skippable frame header + the frame size
   170  	tocOff := uint64(off) + 8
   171  	if _, err := w.Write(appendSkippableFrameMagic(
   172  		zstdFooterBytes(tocOff, uint64(len(tocJSON)), uint64(len(compressedTOC)))),
   173  	); err != nil {
   174  		return "", err
   175  	}
   176  
   177  	if zc.Metadata != nil {
   178  		zc.Metadata[ManifestChecksumAnnotation] = digest.FromBytes(compressedTOC).String()
   179  		zc.Metadata[ManifestPositionAnnotation] = fmt.Sprintf("%d:%d:%d:%d",
   180  			tocOff, len(compressedTOC), len(tocJSON), manifestTypeCRFS)
   181  	}
   182  
   183  	return digest.FromBytes(tocJSON), err
   184  }
   185  
   186  // zstdFooterBytes returns the 40 bytes footer.
   187  func zstdFooterBytes(tocOff, tocRawSize, tocCompressedSize uint64) []byte {
   188  	footer := make([]byte, FooterSize)
   189  	binary.LittleEndian.PutUint64(footer, tocOff)
   190  	binary.LittleEndian.PutUint64(footer[8:], tocCompressedSize)
   191  	binary.LittleEndian.PutUint64(footer[16:], tocRawSize)
   192  	binary.LittleEndian.PutUint64(footer[24:], manifestTypeCRFS)
   193  	copy(footer[32:40], zstdChunkedFrameMagic)
   194  	return footer
   195  }
   196  
   197  func appendSkippableFrameMagic(b []byte) []byte {
   198  	size := make([]byte, 4)
   199  	binary.LittleEndian.PutUint32(size, uint32(len(b)))
   200  	return append(append(skippableFrameMagic, size...), b...)
   201  }
   202  

View as plain text