...

Source file src/github.com/containerd/stargz-snapshotter/estargz/estargz.go

Documentation: github.com/containerd/stargz-snapshotter/estargz

     1  /*
     2     Copyright The containerd Authors.
     3  
     4     Licensed under the Apache License, Version 2.0 (the "License");
     5     you may not use this file except in compliance with the License.
     6     You may obtain a copy of the License at
     7  
     8         http://www.apache.org/licenses/LICENSE-2.0
     9  
    10     Unless required by applicable law or agreed to in writing, software
    11     distributed under the License is distributed on an "AS IS" BASIS,
    12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13     See the License for the specific language governing permissions and
    14     limitations under the License.
    15  */
    16  
    17  /*
    18     Copyright 2019 The Go Authors. All rights reserved.
    19     Use of this source code is governed by a BSD-style
    20     license that can be found in the LICENSE file.
    21  */
    22  
    23  package estargz
    24  
    25  import (
    26  	"bufio"
    27  	"bytes"
    28  	"compress/gzip"
    29  	"crypto/sha256"
    30  	"errors"
    31  	"fmt"
    32  	"hash"
    33  	"io"
    34  	"os"
    35  	"path"
    36  	"sort"
    37  	"strings"
    38  	"sync"
    39  	"time"
    40  
    41  	"github.com/containerd/stargz-snapshotter/estargz/errorutil"
    42  	digest "github.com/opencontainers/go-digest"
    43  	"github.com/vbatts/tar-split/archive/tar"
    44  )
    45  
    46  // A Reader permits random access reads from a stargz file.
    47  type Reader struct {
    48  	sr        *io.SectionReader
    49  	toc       *JTOC
    50  	tocDigest digest.Digest
    51  
    52  	// m stores all non-chunk entries, keyed by name.
    53  	m map[string]*TOCEntry
    54  
    55  	// chunks stores all TOCEntry values for regular files that
    56  	// are split up. For a file with a single chunk, it's only
    57  	// stored in m.
    58  	chunks map[string][]*TOCEntry
    59  
    60  	decompressor Decompressor
    61  }
    62  
    63  type openOpts struct {
    64  	tocOffset     int64
    65  	decompressors []Decompressor
    66  	telemetry     *Telemetry
    67  }
    68  
    69  // OpenOption is an option used during opening the layer
    70  type OpenOption func(o *openOpts) error
    71  
    72  // WithTOCOffset option specifies the offset of TOC
    73  func WithTOCOffset(tocOffset int64) OpenOption {
    74  	return func(o *openOpts) error {
    75  		o.tocOffset = tocOffset
    76  		return nil
    77  	}
    78  }
    79  
    80  // WithDecompressors option specifies decompressors to use.
    81  // Default is gzip-based decompressor.
    82  func WithDecompressors(decompressors ...Decompressor) OpenOption {
    83  	return func(o *openOpts) error {
    84  		o.decompressors = decompressors
    85  		return nil
    86  	}
    87  }
    88  
    89  // WithTelemetry option specifies the telemetry hooks
    90  func WithTelemetry(telemetry *Telemetry) OpenOption {
    91  	return func(o *openOpts) error {
    92  		o.telemetry = telemetry
    93  		return nil
    94  	}
    95  }
    96  
    97  // MeasureLatencyHook is a func which takes start time and records the diff
    98  type MeasureLatencyHook func(time.Time)
    99  
   100  // Telemetry is a struct which defines telemetry hooks. By implementing these hooks you should be able to record
   101  // the latency metrics of the respective steps of estargz open operation. To be used with estargz.OpenWithTelemetry(...)
   102  type Telemetry struct {
   103  	GetFooterLatency      MeasureLatencyHook // measure time to get stargz footer (in milliseconds)
   104  	GetTocLatency         MeasureLatencyHook // measure time to GET TOC JSON (in milliseconds)
   105  	DeserializeTocLatency MeasureLatencyHook // measure time to deserialize TOC JSON (in milliseconds)
   106  }
   107  
   108  // Open opens a stargz file for reading.
   109  // The behavior is configurable using options.
   110  //
   111  // Note that each entry name is normalized as the path that is relative to root.
   112  func Open(sr *io.SectionReader, opt ...OpenOption) (*Reader, error) {
   113  	var opts openOpts
   114  	for _, o := range opt {
   115  		if err := o(&opts); err != nil {
   116  			return nil, err
   117  		}
   118  	}
   119  
   120  	gzipCompressors := []Decompressor{new(GzipDecompressor), new(LegacyGzipDecompressor)}
   121  	decompressors := append(gzipCompressors, opts.decompressors...)
   122  
   123  	// Determine the size to fetch. Try to fetch as many bytes as possible.
   124  	fetchSize := maxFooterSize(sr.Size(), decompressors...)
   125  	if maybeTocOffset := opts.tocOffset; maybeTocOffset > fetchSize {
   126  		if maybeTocOffset > sr.Size() {
   127  			return nil, fmt.Errorf("blob size %d is smaller than the toc offset", sr.Size())
   128  		}
   129  		fetchSize = sr.Size() - maybeTocOffset
   130  	}
   131  
   132  	start := time.Now() // before getting layer footer
   133  	footer := make([]byte, fetchSize)
   134  	if _, err := sr.ReadAt(footer, sr.Size()-fetchSize); err != nil {
   135  		return nil, fmt.Errorf("error reading footer: %v", err)
   136  	}
   137  	if opts.telemetry != nil && opts.telemetry.GetFooterLatency != nil {
   138  		opts.telemetry.GetFooterLatency(start)
   139  	}
   140  
   141  	var allErr []error
   142  	var found bool
   143  	var r *Reader
   144  	for _, d := range decompressors {
   145  		fSize := d.FooterSize()
   146  		fOffset := positive(int64(len(footer)) - fSize)
   147  		maybeTocBytes := footer[:fOffset]
   148  		_, tocOffset, tocSize, err := d.ParseFooter(footer[fOffset:])
   149  		if err != nil {
   150  			allErr = append(allErr, err)
   151  			continue
   152  		}
   153  		if tocOffset >= 0 && tocSize <= 0 {
   154  			tocSize = sr.Size() - tocOffset - fSize
   155  		}
   156  		if tocOffset >= 0 && tocSize < int64(len(maybeTocBytes)) {
   157  			maybeTocBytes = maybeTocBytes[:tocSize]
   158  		}
   159  		r, err = parseTOC(d, sr, tocOffset, tocSize, maybeTocBytes, opts)
   160  		if err == nil {
   161  			found = true
   162  			break
   163  		}
   164  		allErr = append(allErr, err)
   165  	}
   166  	if !found {
   167  		return nil, errorutil.Aggregate(allErr)
   168  	}
   169  	if err := r.initFields(); err != nil {
   170  		return nil, fmt.Errorf("failed to initialize fields of entries: %v", err)
   171  	}
   172  	return r, nil
   173  }
   174  
   175  // OpenFooter extracts and parses footer from the given blob.
   176  // only supports gzip-based eStargz.
   177  func OpenFooter(sr *io.SectionReader) (tocOffset int64, footerSize int64, rErr error) {
   178  	if sr.Size() < FooterSize && sr.Size() < legacyFooterSize {
   179  		return 0, 0, fmt.Errorf("blob size %d is smaller than the footer size", sr.Size())
   180  	}
   181  	var footer [FooterSize]byte
   182  	if _, err := sr.ReadAt(footer[:], sr.Size()-FooterSize); err != nil {
   183  		return 0, 0, fmt.Errorf("error reading footer: %v", err)
   184  	}
   185  	var allErr []error
   186  	for _, d := range []Decompressor{new(GzipDecompressor), new(LegacyGzipDecompressor)} {
   187  		fSize := d.FooterSize()
   188  		fOffset := positive(int64(len(footer)) - fSize)
   189  		_, tocOffset, _, err := d.ParseFooter(footer[fOffset:])
   190  		if err == nil {
   191  			return tocOffset, fSize, err
   192  		}
   193  		allErr = append(allErr, err)
   194  	}
   195  	return 0, 0, errorutil.Aggregate(allErr)
   196  }
   197  
   198  // initFields populates the Reader from r.toc after decoding it from
   199  // JSON.
   200  //
   201  // Unexported fields are populated and TOCEntry fields that were
   202  // implicit in the JSON are populated.
   203  func (r *Reader) initFields() error {
   204  	r.m = make(map[string]*TOCEntry, len(r.toc.Entries))
   205  	r.chunks = make(map[string][]*TOCEntry)
   206  	var lastPath string
   207  	uname := map[int]string{}
   208  	gname := map[int]string{}
   209  	var lastRegEnt *TOCEntry
   210  	var chunkTopIndex int
   211  	for i, ent := range r.toc.Entries {
   212  		ent.Name = cleanEntryName(ent.Name)
   213  		switch ent.Type {
   214  		case "reg", "chunk":
   215  			if ent.Offset != r.toc.Entries[chunkTopIndex].Offset {
   216  				chunkTopIndex = i
   217  			}
   218  			ent.chunkTopIndex = chunkTopIndex
   219  		}
   220  		if ent.Type == "reg" {
   221  			lastRegEnt = ent
   222  		}
   223  		if ent.Type == "chunk" {
   224  			ent.Name = lastPath
   225  			r.chunks[ent.Name] = append(r.chunks[ent.Name], ent)
   226  			if ent.ChunkSize == 0 && lastRegEnt != nil {
   227  				ent.ChunkSize = lastRegEnt.Size - ent.ChunkOffset
   228  			}
   229  		} else {
   230  			lastPath = ent.Name
   231  
   232  			if ent.Uname != "" {
   233  				uname[ent.UID] = ent.Uname
   234  			} else {
   235  				ent.Uname = uname[ent.UID]
   236  			}
   237  			if ent.Gname != "" {
   238  				gname[ent.GID] = ent.Gname
   239  			} else {
   240  				ent.Gname = uname[ent.GID]
   241  			}
   242  
   243  			ent.modTime, _ = time.Parse(time.RFC3339, ent.ModTime3339)
   244  
   245  			if ent.Type == "dir" {
   246  				ent.NumLink++ // Parent dir links to this directory
   247  			}
   248  			r.m[ent.Name] = ent
   249  		}
   250  		if ent.Type == "reg" && ent.ChunkSize > 0 && ent.ChunkSize < ent.Size {
   251  			r.chunks[ent.Name] = make([]*TOCEntry, 0, ent.Size/ent.ChunkSize+1)
   252  			r.chunks[ent.Name] = append(r.chunks[ent.Name], ent)
   253  		}
   254  		if ent.ChunkSize == 0 && ent.Size != 0 {
   255  			ent.ChunkSize = ent.Size
   256  		}
   257  	}
   258  
   259  	// Populate children, add implicit directories:
   260  	for _, ent := range r.toc.Entries {
   261  		if ent.Type == "chunk" {
   262  			continue
   263  		}
   264  		// add "foo/":
   265  		//    add "foo" child to "" (creating "" if necessary)
   266  		//
   267  		// add "foo/bar/":
   268  		//    add "bar" child to "foo" (creating "foo" if necessary)
   269  		//
   270  		// add "foo/bar.txt":
   271  		//    add "bar.txt" child to "foo" (creating "foo" if necessary)
   272  		//
   273  		// add "a/b/c/d/e/f.txt":
   274  		//    create "a/b/c/d/e" node
   275  		//    add "f.txt" child to "e"
   276  
   277  		name := ent.Name
   278  		pdirName := parentDir(name)
   279  		if name == pdirName {
   280  			// This entry and its parent are the same.
   281  			// Ignore this for avoiding infinite loop of the reference.
   282  			// The example case where this can occur is when tar contains the root
   283  			// directory itself (e.g. "./", "/").
   284  			continue
   285  		}
   286  		pdir := r.getOrCreateDir(pdirName)
   287  		ent.NumLink++ // at least one name(ent.Name) references this entry.
   288  		if ent.Type == "hardlink" {
   289  			org, err := r.getSource(ent)
   290  			if err != nil {
   291  				return err
   292  			}
   293  			org.NumLink++ // original entry is referenced by this ent.Name.
   294  			ent = org
   295  		}
   296  		pdir.addChild(path.Base(name), ent)
   297  	}
   298  
   299  	lastOffset := r.sr.Size()
   300  	for i := len(r.toc.Entries) - 1; i >= 0; i-- {
   301  		e := r.toc.Entries[i]
   302  		if e.isDataType() {
   303  			e.nextOffset = lastOffset
   304  		}
   305  		if e.Offset != 0 && e.InnerOffset == 0 {
   306  			lastOffset = e.Offset
   307  		}
   308  	}
   309  
   310  	return nil
   311  }
   312  
   313  func (r *Reader) getSource(ent *TOCEntry) (_ *TOCEntry, err error) {
   314  	if ent.Type == "hardlink" {
   315  		org, ok := r.m[cleanEntryName(ent.LinkName)]
   316  		if !ok {
   317  			return nil, fmt.Errorf("%q is a hardlink but the linkname %q isn't found", ent.Name, ent.LinkName)
   318  		}
   319  		ent, err = r.getSource(org)
   320  		if err != nil {
   321  			return nil, err
   322  		}
   323  	}
   324  	return ent, nil
   325  }
   326  
   327  func parentDir(p string) string {
   328  	dir, _ := path.Split(p)
   329  	return strings.TrimSuffix(dir, "/")
   330  }
   331  
   332  func (r *Reader) getOrCreateDir(d string) *TOCEntry {
   333  	e, ok := r.m[d]
   334  	if !ok {
   335  		e = &TOCEntry{
   336  			Name:    d,
   337  			Type:    "dir",
   338  			Mode:    0755,
   339  			NumLink: 2, // The directory itself(.) and the parent link to this directory.
   340  		}
   341  		r.m[d] = e
   342  		if d != "" {
   343  			pdir := r.getOrCreateDir(parentDir(d))
   344  			pdir.addChild(path.Base(d), e)
   345  		}
   346  	}
   347  	return e
   348  }
   349  
   350  func (r *Reader) TOCDigest() digest.Digest {
   351  	return r.tocDigest
   352  }
   353  
   354  // VerifyTOC checks that the TOC JSON in the passed blob matches the
   355  // passed digests and that the TOC JSON contains digests for all chunks
   356  // contained in the blob. If the verification succceeds, this function
   357  // returns TOCEntryVerifier which holds all chunk digests in the stargz blob.
   358  func (r *Reader) VerifyTOC(tocDigest digest.Digest) (TOCEntryVerifier, error) {
   359  	// Verify the digest of TOC JSON
   360  	if r.tocDigest != tocDigest {
   361  		return nil, fmt.Errorf("invalid TOC JSON %q; want %q", r.tocDigest, tocDigest)
   362  	}
   363  	return r.Verifiers()
   364  }
   365  
   366  // Verifiers returns TOCEntryVerifier of this chunk. Use VerifyTOC instead in most cases
   367  // because this doesn't verify TOC.
   368  func (r *Reader) Verifiers() (TOCEntryVerifier, error) {
   369  	chunkDigestMap := make(map[int64]digest.Digest) // map from chunk offset to the chunk digest
   370  	regDigestMap := make(map[int64]digest.Digest)   // map from chunk offset to the reg file digest
   371  	var chunkDigestMapIncomplete bool
   372  	var regDigestMapIncomplete bool
   373  	var containsChunk bool
   374  	for _, e := range r.toc.Entries {
   375  		if e.Type != "reg" && e.Type != "chunk" {
   376  			continue
   377  		}
   378  
   379  		// offset must be unique in stargz blob
   380  		_, dOK := chunkDigestMap[e.Offset]
   381  		_, rOK := regDigestMap[e.Offset]
   382  		if dOK || rOK {
   383  			return nil, fmt.Errorf("offset %d found twice", e.Offset)
   384  		}
   385  
   386  		if e.Type == "reg" {
   387  			if e.Size == 0 {
   388  				continue // ignores empty file
   389  			}
   390  
   391  			// record the digest of regular file payload
   392  			if e.Digest != "" {
   393  				d, err := digest.Parse(e.Digest)
   394  				if err != nil {
   395  					return nil, fmt.Errorf("failed to parse regular file digest %q: %w", e.Digest, err)
   396  				}
   397  				regDigestMap[e.Offset] = d
   398  			} else {
   399  				regDigestMapIncomplete = true
   400  			}
   401  		} else {
   402  			containsChunk = true // this layer contains "chunk" entries.
   403  		}
   404  
   405  		// "reg" also can contain ChunkDigest (e.g. when "reg" is the first entry of
   406  		// chunked file)
   407  		if e.ChunkDigest != "" {
   408  			d, err := digest.Parse(e.ChunkDigest)
   409  			if err != nil {
   410  				return nil, fmt.Errorf("failed to parse chunk digest %q: %w", e.ChunkDigest, err)
   411  			}
   412  			chunkDigestMap[e.Offset] = d
   413  		} else {
   414  			chunkDigestMapIncomplete = true
   415  		}
   416  	}
   417  
   418  	if chunkDigestMapIncomplete {
   419  		// Though some chunk digests are not found, if this layer doesn't contain
   420  		// "chunk"s and all digest of "reg" files are recorded, we can use them instead.
   421  		if !containsChunk && !regDigestMapIncomplete {
   422  			return &verifier{digestMap: regDigestMap}, nil
   423  		}
   424  		return nil, fmt.Errorf("some ChunkDigest not found in TOC JSON")
   425  	}
   426  
   427  	return &verifier{digestMap: chunkDigestMap}, nil
   428  }
   429  
   430  // verifier is an implementation of TOCEntryVerifier which holds verifiers keyed by
   431  // offset of the chunk.
   432  type verifier struct {
   433  	digestMap   map[int64]digest.Digest
   434  	digestMapMu sync.Mutex
   435  }
   436  
   437  // Verifier returns a content verifier specified by TOCEntry.
   438  func (v *verifier) Verifier(ce *TOCEntry) (digest.Verifier, error) {
   439  	v.digestMapMu.Lock()
   440  	defer v.digestMapMu.Unlock()
   441  	d, ok := v.digestMap[ce.Offset]
   442  	if !ok {
   443  		return nil, fmt.Errorf("verifier for offset=%d,size=%d hasn't been registered",
   444  			ce.Offset, ce.ChunkSize)
   445  	}
   446  	return d.Verifier(), nil
   447  }
   448  
   449  // ChunkEntryForOffset returns the TOCEntry containing the byte of the
   450  // named file at the given offset within the file.
   451  // Name must be absolute path or one that is relative to root.
   452  func (r *Reader) ChunkEntryForOffset(name string, offset int64) (e *TOCEntry, ok bool) {
   453  	name = cleanEntryName(name)
   454  	e, ok = r.Lookup(name)
   455  	if !ok || !e.isDataType() {
   456  		return nil, false
   457  	}
   458  	ents := r.chunks[name]
   459  	if len(ents) < 2 {
   460  		if offset >= e.ChunkSize {
   461  			return nil, false
   462  		}
   463  		return e, true
   464  	}
   465  	i := sort.Search(len(ents), func(i int) bool {
   466  		e := ents[i]
   467  		return e.ChunkOffset >= offset || (offset > e.ChunkOffset && offset < e.ChunkOffset+e.ChunkSize)
   468  	})
   469  	if i == len(ents) {
   470  		return nil, false
   471  	}
   472  	return ents[i], true
   473  }
   474  
   475  // Lookup returns the Table of Contents entry for the given path.
   476  //
   477  // To get the root directory, use the empty string.
   478  // Path must be absolute path or one that is relative to root.
   479  func (r *Reader) Lookup(path string) (e *TOCEntry, ok bool) {
   480  	path = cleanEntryName(path)
   481  	if r == nil {
   482  		return
   483  	}
   484  	e, ok = r.m[path]
   485  	if ok && e.Type == "hardlink" {
   486  		var err error
   487  		e, err = r.getSource(e)
   488  		if err != nil {
   489  			return nil, false
   490  		}
   491  	}
   492  	return
   493  }
   494  
   495  // OpenFile returns the reader of the specified file payload.
   496  //
   497  // Name must be absolute path or one that is relative to root.
   498  func (r *Reader) OpenFile(name string) (*io.SectionReader, error) {
   499  	fr, err := r.newFileReader(name)
   500  	if err != nil {
   501  		return nil, err
   502  	}
   503  	return io.NewSectionReader(fr, 0, fr.size), nil
   504  }
   505  
   506  func (r *Reader) newFileReader(name string) (*fileReader, error) {
   507  	name = cleanEntryName(name)
   508  	ent, ok := r.Lookup(name)
   509  	if !ok {
   510  		// TODO: come up with some error plan. This is lazy:
   511  		return nil, &os.PathError{
   512  			Path: name,
   513  			Op:   "OpenFile",
   514  			Err:  os.ErrNotExist,
   515  		}
   516  	}
   517  	if ent.Type != "reg" {
   518  		return nil, &os.PathError{
   519  			Path: name,
   520  			Op:   "OpenFile",
   521  			Err:  errors.New("not a regular file"),
   522  		}
   523  	}
   524  	return &fileReader{
   525  		r:    r,
   526  		size: ent.Size,
   527  		ents: r.getChunks(ent),
   528  	}, nil
   529  }
   530  
   531  func (r *Reader) OpenFileWithPreReader(name string, preRead func(*TOCEntry, io.Reader) error) (*io.SectionReader, error) {
   532  	fr, err := r.newFileReader(name)
   533  	if err != nil {
   534  		return nil, err
   535  	}
   536  	fr.preRead = preRead
   537  	return io.NewSectionReader(fr, 0, fr.size), nil
   538  }
   539  
   540  func (r *Reader) getChunks(ent *TOCEntry) []*TOCEntry {
   541  	if ents, ok := r.chunks[ent.Name]; ok {
   542  		return ents
   543  	}
   544  	return []*TOCEntry{ent}
   545  }
   546  
   547  type fileReader struct {
   548  	r       *Reader
   549  	size    int64
   550  	ents    []*TOCEntry // 1 or more reg/chunk entries
   551  	preRead func(*TOCEntry, io.Reader) error
   552  }
   553  
   554  func (fr *fileReader) ReadAt(p []byte, off int64) (n int, err error) {
   555  	if off >= fr.size {
   556  		return 0, io.EOF
   557  	}
   558  	if off < 0 {
   559  		return 0, errors.New("invalid offset")
   560  	}
   561  	var i int
   562  	if len(fr.ents) > 1 {
   563  		i = sort.Search(len(fr.ents), func(i int) bool {
   564  			return fr.ents[i].ChunkOffset >= off
   565  		})
   566  		if i == len(fr.ents) {
   567  			i = len(fr.ents) - 1
   568  		}
   569  	}
   570  	ent := fr.ents[i]
   571  	if ent.ChunkOffset > off {
   572  		if i == 0 {
   573  			return 0, errors.New("internal error; first chunk offset is non-zero")
   574  		}
   575  		ent = fr.ents[i-1]
   576  	}
   577  
   578  	//  If ent is a chunk of a large file, adjust the ReadAt
   579  	//  offset by the chunk's offset.
   580  	off -= ent.ChunkOffset
   581  
   582  	finalEnt := fr.ents[len(fr.ents)-1]
   583  	compressedOff := ent.Offset
   584  	// compressedBytesRemain is the number of compressed bytes in this
   585  	// file remaining, over 1+ chunks.
   586  	compressedBytesRemain := finalEnt.NextOffset() - compressedOff
   587  
   588  	sr := io.NewSectionReader(fr.r.sr, compressedOff, compressedBytesRemain)
   589  
   590  	const maxRead = 2 << 20
   591  	var bufSize = maxRead
   592  	if compressedBytesRemain < maxRead {
   593  		bufSize = int(compressedBytesRemain)
   594  	}
   595  
   596  	br := bufio.NewReaderSize(sr, bufSize)
   597  	if _, err := br.Peek(bufSize); err != nil {
   598  		return 0, fmt.Errorf("fileReader.ReadAt.peek: %v", err)
   599  	}
   600  
   601  	dr, err := fr.r.decompressor.Reader(br)
   602  	if err != nil {
   603  		return 0, fmt.Errorf("fileReader.ReadAt.decompressor.Reader: %v", err)
   604  	}
   605  	defer dr.Close()
   606  
   607  	if fr.preRead == nil {
   608  		if n, err := io.CopyN(io.Discard, dr, ent.InnerOffset+off); n != ent.InnerOffset+off || err != nil {
   609  			return 0, fmt.Errorf("discard of %d bytes != %v, %v", ent.InnerOffset+off, n, err)
   610  		}
   611  		return io.ReadFull(dr, p)
   612  	}
   613  
   614  	var retN int
   615  	var retErr error
   616  	var found bool
   617  	var nr int64
   618  	for _, e := range fr.r.toc.Entries[ent.chunkTopIndex:] {
   619  		if !e.isDataType() {
   620  			continue
   621  		}
   622  		if e.Offset != fr.r.toc.Entries[ent.chunkTopIndex].Offset {
   623  			break
   624  		}
   625  		if in, err := io.CopyN(io.Discard, dr, e.InnerOffset-nr); err != nil || in != e.InnerOffset-nr {
   626  			return 0, fmt.Errorf("discard of remaining %d bytes != %v, %v", e.InnerOffset-nr, in, err)
   627  		}
   628  		nr = e.InnerOffset
   629  		if e == ent {
   630  			found = true
   631  			if n, err := io.CopyN(io.Discard, dr, off); n != off || err != nil {
   632  				return 0, fmt.Errorf("discard of offset %d bytes != %v, %v", off, n, err)
   633  			}
   634  			retN, retErr = io.ReadFull(dr, p)
   635  			nr += off + int64(retN)
   636  			continue
   637  		}
   638  		cr := &countReader{r: io.LimitReader(dr, e.ChunkSize)}
   639  		if err := fr.preRead(e, cr); err != nil {
   640  			return 0, fmt.Errorf("failed to pre read: %w", err)
   641  		}
   642  		nr += cr.n
   643  	}
   644  	if !found {
   645  		return 0, fmt.Errorf("fileReader.ReadAt: target entry not found")
   646  	}
   647  	return retN, retErr
   648  }
   649  
   650  // A Writer writes stargz files.
   651  //
   652  // Use NewWriter to create a new Writer.
   653  type Writer struct {
   654  	bw       *bufio.Writer
   655  	cw       *countWriter
   656  	toc      *JTOC
   657  	diffHash hash.Hash // SHA-256 of uncompressed tar
   658  
   659  	closed        bool
   660  	gz            io.WriteCloser
   661  	lastUsername  map[int]string
   662  	lastGroupname map[int]string
   663  	compressor    Compressor
   664  
   665  	uncompressedCounter *countWriteFlusher
   666  
   667  	// ChunkSize optionally controls the maximum number of bytes
   668  	// of data of a regular file that can be written in one gzip
   669  	// stream before a new gzip stream is started.
   670  	// Zero means to use a default, currently 4 MiB.
   671  	ChunkSize int
   672  
   673  	// MinChunkSize optionally controls the minimum number of bytes
   674  	// of data must be written in one gzip stream before a new gzip
   675  	// NOTE: This adds a TOC property that stargz snapshotter < v0.13.0 doesn't understand.
   676  	MinChunkSize int
   677  
   678  	needsOpenGzEntries map[string]struct{}
   679  }
   680  
   681  // currentCompressionWriter writes to the current w.gz field, which can
   682  // change throughout writing a tar entry.
   683  //
   684  // Additionally, it updates w's SHA-256 of the uncompressed bytes
   685  // of the tar file.
   686  type currentCompressionWriter struct{ w *Writer }
   687  
   688  func (ccw currentCompressionWriter) Write(p []byte) (int, error) {
   689  	ccw.w.diffHash.Write(p)
   690  	if ccw.w.gz == nil {
   691  		if err := ccw.w.condOpenGz(); err != nil {
   692  			return 0, err
   693  		}
   694  	}
   695  	return ccw.w.gz.Write(p)
   696  }
   697  
   698  func (w *Writer) chunkSize() int {
   699  	if w.ChunkSize <= 0 {
   700  		return 4 << 20
   701  	}
   702  	return w.ChunkSize
   703  }
   704  
   705  // Unpack decompresses the given estargz blob and returns a ReadCloser of the tar blob.
   706  // TOC JSON and footer are removed.
   707  func Unpack(sr *io.SectionReader, c Decompressor) (io.ReadCloser, error) {
   708  	footerSize := c.FooterSize()
   709  	if sr.Size() < footerSize {
   710  		return nil, fmt.Errorf("blob is too small; %d < %d", sr.Size(), footerSize)
   711  	}
   712  	footerOffset := sr.Size() - footerSize
   713  	footer := make([]byte, footerSize)
   714  	if _, err := sr.ReadAt(footer, footerOffset); err != nil {
   715  		return nil, err
   716  	}
   717  	blobPayloadSize, _, _, err := c.ParseFooter(footer)
   718  	if err != nil {
   719  		return nil, fmt.Errorf("failed to parse footer: %w", err)
   720  	}
   721  	if blobPayloadSize < 0 {
   722  		blobPayloadSize = sr.Size()
   723  	}
   724  	return c.Reader(io.LimitReader(sr, blobPayloadSize))
   725  }
   726  
   727  // NewWriter returns a new stargz writer (gzip-based) writing to w.
   728  //
   729  // The writer must be closed to write its trailing table of contents.
   730  func NewWriter(w io.Writer) *Writer {
   731  	return NewWriterLevel(w, gzip.BestCompression)
   732  }
   733  
   734  // NewWriterLevel returns a new stargz writer (gzip-based) writing to w.
   735  // The compression level is configurable.
   736  //
   737  // The writer must be closed to write its trailing table of contents.
   738  func NewWriterLevel(w io.Writer, compressionLevel int) *Writer {
   739  	return NewWriterWithCompressor(w, NewGzipCompressorWithLevel(compressionLevel))
   740  }
   741  
   742  // NewWriterWithCompressor returns a new stargz writer writing to w.
   743  // The compression method is configurable.
   744  //
   745  // The writer must be closed to write its trailing table of contents.
   746  func NewWriterWithCompressor(w io.Writer, c Compressor) *Writer {
   747  	bw := bufio.NewWriter(w)
   748  	cw := &countWriter{w: bw}
   749  	return &Writer{
   750  		bw:                  bw,
   751  		cw:                  cw,
   752  		toc:                 &JTOC{Version: 1},
   753  		diffHash:            sha256.New(),
   754  		compressor:          c,
   755  		uncompressedCounter: &countWriteFlusher{},
   756  	}
   757  }
   758  
   759  // Close writes the stargz's table of contents and flushes all the
   760  // buffers, returning any error.
   761  func (w *Writer) Close() (digest.Digest, error) {
   762  	if w.closed {
   763  		return "", nil
   764  	}
   765  	defer func() { w.closed = true }()
   766  
   767  	if err := w.closeGz(); err != nil {
   768  		return "", err
   769  	}
   770  
   771  	// Write the TOC index and footer.
   772  	tocDigest, err := w.compressor.WriteTOCAndFooter(w.cw, w.cw.n, w.toc, w.diffHash)
   773  	if err != nil {
   774  		return "", err
   775  	}
   776  	if err := w.bw.Flush(); err != nil {
   777  		return "", err
   778  	}
   779  
   780  	return tocDigest, nil
   781  }
   782  
   783  func (w *Writer) closeGz() error {
   784  	if w.closed {
   785  		return errors.New("write on closed Writer")
   786  	}
   787  	if w.gz != nil {
   788  		if err := w.gz.Close(); err != nil {
   789  			return err
   790  		}
   791  		w.gz = nil
   792  	}
   793  	return nil
   794  }
   795  
   796  func (w *Writer) flushGz() error {
   797  	if w.closed {
   798  		return errors.New("flush on closed Writer")
   799  	}
   800  	if w.gz != nil {
   801  		if f, ok := w.gz.(interface {
   802  			Flush() error
   803  		}); ok {
   804  			return f.Flush()
   805  		}
   806  	}
   807  	return nil
   808  }
   809  
   810  // nameIfChanged returns name, unless it was the already the value of (*mp)[id],
   811  // in which case it returns the empty string.
   812  func (w *Writer) nameIfChanged(mp *map[int]string, id int, name string) string {
   813  	if name == "" {
   814  		return ""
   815  	}
   816  	if *mp == nil {
   817  		*mp = make(map[int]string)
   818  	}
   819  	if (*mp)[id] == name {
   820  		return ""
   821  	}
   822  	(*mp)[id] = name
   823  	return name
   824  }
   825  
   826  func (w *Writer) condOpenGz() (err error) {
   827  	if w.gz == nil {
   828  		w.gz, err = w.compressor.Writer(w.cw)
   829  		if w.gz != nil {
   830  			w.gz = w.uncompressedCounter.register(w.gz)
   831  		}
   832  	}
   833  	return
   834  }
   835  
   836  // AppendTar reads the tar or tar.gz file from r and appends
   837  // each of its contents to w.
   838  //
   839  // The input r can optionally be gzip compressed but the output will
   840  // always be compressed by the specified compressor.
   841  func (w *Writer) AppendTar(r io.Reader) error {
   842  	return w.appendTar(r, false)
   843  }
   844  
   845  // AppendTarLossLess reads the tar or tar.gz file from r and appends
   846  // each of its contents to w.
   847  //
   848  // The input r can optionally be gzip compressed but the output will
   849  // always be compressed by the specified compressor.
   850  //
   851  // The difference of this func with AppendTar is that this writes
   852  // the input tar stream into w without any modification (e.g. to header bytes).
   853  //
   854  // Note that if the input tar stream already contains TOC JSON, this returns
   855  // error because w cannot overwrite the TOC JSON to the one generated by w without
   856  // lossy modification. To avoid this error, if the input stream is known to be stargz/estargz,
   857  // you shoud decompress it and remove TOC JSON in advance.
   858  func (w *Writer) AppendTarLossLess(r io.Reader) error {
   859  	return w.appendTar(r, true)
   860  }
   861  
   862  func (w *Writer) appendTar(r io.Reader, lossless bool) error {
   863  	var src io.Reader
   864  	br := bufio.NewReader(r)
   865  	if isGzip(br) {
   866  		zr, _ := gzip.NewReader(br)
   867  		src = zr
   868  	} else {
   869  		src = io.Reader(br)
   870  	}
   871  	dst := currentCompressionWriter{w}
   872  	var tw *tar.Writer
   873  	if !lossless {
   874  		tw = tar.NewWriter(dst) // use tar writer only when this isn't lossless mode.
   875  	}
   876  	tr := tar.NewReader(src)
   877  	if lossless {
   878  		tr.RawAccounting = true
   879  	}
   880  	prevOffset := w.cw.n
   881  	var prevOffsetUncompressed int64
   882  	for {
   883  		h, err := tr.Next()
   884  		if err == io.EOF {
   885  			if lossless {
   886  				if remain := tr.RawBytes(); len(remain) > 0 {
   887  					// Collect the remaining null bytes.
   888  					// https://github.com/vbatts/tar-split/blob/80a436fd6164c557b131f7c59ed69bd81af69761/concept/main.go#L49-L53
   889  					if _, err := dst.Write(remain); err != nil {
   890  						return err
   891  					}
   892  				}
   893  			}
   894  			break
   895  		}
   896  		if err != nil {
   897  			return fmt.Errorf("error reading from source tar: tar.Reader.Next: %v", err)
   898  		}
   899  		if cleanEntryName(h.Name) == TOCTarName {
   900  			// It is possible for a layer to be "stargzified" twice during the
   901  			// distribution lifecycle. So we reserve "TOCTarName" here to avoid
   902  			// duplicated entries in the resulting layer.
   903  			if lossless {
   904  				// We cannot handle this in lossless way.
   905  				return fmt.Errorf("existing TOC JSON is not allowed; decompress layer before append")
   906  			}
   907  			continue
   908  		}
   909  
   910  		xattrs := make(map[string][]byte)
   911  		const xattrPAXRecordsPrefix = "SCHILY.xattr."
   912  		if h.PAXRecords != nil {
   913  			for k, v := range h.PAXRecords {
   914  				if strings.HasPrefix(k, xattrPAXRecordsPrefix) {
   915  					xattrs[k[len(xattrPAXRecordsPrefix):]] = []byte(v)
   916  				}
   917  			}
   918  		}
   919  		ent := &TOCEntry{
   920  			Name:        h.Name,
   921  			Mode:        h.Mode,
   922  			UID:         h.Uid,
   923  			GID:         h.Gid,
   924  			Uname:       w.nameIfChanged(&w.lastUsername, h.Uid, h.Uname),
   925  			Gname:       w.nameIfChanged(&w.lastGroupname, h.Gid, h.Gname),
   926  			ModTime3339: formatModtime(h.ModTime),
   927  			Xattrs:      xattrs,
   928  		}
   929  		if err := w.condOpenGz(); err != nil {
   930  			return err
   931  		}
   932  		if tw != nil {
   933  			if err := tw.WriteHeader(h); err != nil {
   934  				return err
   935  			}
   936  		} else {
   937  			if _, err := dst.Write(tr.RawBytes()); err != nil {
   938  				return err
   939  			}
   940  		}
   941  		switch h.Typeflag {
   942  		case tar.TypeLink:
   943  			ent.Type = "hardlink"
   944  			ent.LinkName = h.Linkname
   945  		case tar.TypeSymlink:
   946  			ent.Type = "symlink"
   947  			ent.LinkName = h.Linkname
   948  		case tar.TypeDir:
   949  			ent.Type = "dir"
   950  		case tar.TypeReg:
   951  			ent.Type = "reg"
   952  			ent.Size = h.Size
   953  		case tar.TypeChar:
   954  			ent.Type = "char"
   955  			ent.DevMajor = int(h.Devmajor)
   956  			ent.DevMinor = int(h.Devminor)
   957  		case tar.TypeBlock:
   958  			ent.Type = "block"
   959  			ent.DevMajor = int(h.Devmajor)
   960  			ent.DevMinor = int(h.Devminor)
   961  		case tar.TypeFifo:
   962  			ent.Type = "fifo"
   963  		default:
   964  			return fmt.Errorf("unsupported input tar entry %q", h.Typeflag)
   965  		}
   966  
   967  		// We need to keep a reference to the TOC entry for regular files, so that we
   968  		// can fill the digest later.
   969  		var regFileEntry *TOCEntry
   970  		var payloadDigest digest.Digester
   971  		if h.Typeflag == tar.TypeReg {
   972  			regFileEntry = ent
   973  			payloadDigest = digest.Canonical.Digester()
   974  		}
   975  
   976  		if h.Typeflag == tar.TypeReg && ent.Size > 0 {
   977  			var written int64
   978  			totalSize := ent.Size // save it before we destroy ent
   979  			tee := io.TeeReader(tr, payloadDigest.Hash())
   980  			for written < totalSize {
   981  				chunkSize := int64(w.chunkSize())
   982  				remain := totalSize - written
   983  				if remain < chunkSize {
   984  					chunkSize = remain
   985  				} else {
   986  					ent.ChunkSize = chunkSize
   987  				}
   988  
   989  				// We flush the underlying compression writer here to correctly calculate "w.cw.n".
   990  				if err := w.flushGz(); err != nil {
   991  					return err
   992  				}
   993  				if w.needsOpenGz(ent) || w.cw.n-prevOffset >= int64(w.MinChunkSize) {
   994  					if err := w.closeGz(); err != nil {
   995  						return err
   996  					}
   997  					ent.Offset = w.cw.n
   998  					prevOffset = ent.Offset
   999  					prevOffsetUncompressed = w.uncompressedCounter.n
  1000  				} else {
  1001  					ent.Offset = prevOffset
  1002  					ent.InnerOffset = w.uncompressedCounter.n - prevOffsetUncompressed
  1003  				}
  1004  
  1005  				ent.ChunkOffset = written
  1006  				chunkDigest := digest.Canonical.Digester()
  1007  
  1008  				if err := w.condOpenGz(); err != nil {
  1009  					return err
  1010  				}
  1011  
  1012  				teeChunk := io.TeeReader(tee, chunkDigest.Hash())
  1013  				var out io.Writer
  1014  				if tw != nil {
  1015  					out = tw
  1016  				} else {
  1017  					out = dst
  1018  				}
  1019  				if _, err := io.CopyN(out, teeChunk, chunkSize); err != nil {
  1020  					return fmt.Errorf("error copying %q: %v", h.Name, err)
  1021  				}
  1022  				ent.ChunkDigest = chunkDigest.Digest().String()
  1023  				w.toc.Entries = append(w.toc.Entries, ent)
  1024  				written += chunkSize
  1025  				ent = &TOCEntry{
  1026  					Name: h.Name,
  1027  					Type: "chunk",
  1028  				}
  1029  			}
  1030  		} else {
  1031  			w.toc.Entries = append(w.toc.Entries, ent)
  1032  		}
  1033  		if payloadDigest != nil {
  1034  			regFileEntry.Digest = payloadDigest.Digest().String()
  1035  		}
  1036  		if tw != nil {
  1037  			if err := tw.Flush(); err != nil {
  1038  				return err
  1039  			}
  1040  		}
  1041  	}
  1042  	remainDest := io.Discard
  1043  	if lossless {
  1044  		remainDest = dst // Preserve the remaining bytes in lossless mode
  1045  	}
  1046  	_, err := io.Copy(remainDest, src)
  1047  	return err
  1048  }
  1049  
  1050  func (w *Writer) needsOpenGz(ent *TOCEntry) bool {
  1051  	if ent.Type != "reg" {
  1052  		return false
  1053  	}
  1054  	if w.needsOpenGzEntries == nil {
  1055  		return false
  1056  	}
  1057  	_, ok := w.needsOpenGzEntries[ent.Name]
  1058  	return ok
  1059  }
  1060  
  1061  // DiffID returns the SHA-256 of the uncompressed tar bytes.
  1062  // It is only valid to call DiffID after Close.
  1063  func (w *Writer) DiffID() string {
  1064  	return fmt.Sprintf("sha256:%x", w.diffHash.Sum(nil))
  1065  }
  1066  
  1067  func maxFooterSize(blobSize int64, decompressors ...Decompressor) (res int64) {
  1068  	for _, d := range decompressors {
  1069  		if s := d.FooterSize(); res < s && s <= blobSize {
  1070  			res = s
  1071  		}
  1072  	}
  1073  	return
  1074  }
  1075  
  1076  func parseTOC(d Decompressor, sr *io.SectionReader, tocOff, tocSize int64, tocBytes []byte, opts openOpts) (*Reader, error) {
  1077  	if tocOff < 0 {
  1078  		// This means that TOC isn't contained in the blob.
  1079  		// We pass nil reader to ParseTOC and expect that ParseTOC acquire TOC from
  1080  		// the external location.
  1081  		start := time.Now()
  1082  		toc, tocDgst, err := d.ParseTOC(nil)
  1083  		if err != nil {
  1084  			return nil, err
  1085  		}
  1086  		if opts.telemetry != nil && opts.telemetry.GetTocLatency != nil {
  1087  			opts.telemetry.GetTocLatency(start)
  1088  		}
  1089  		if opts.telemetry != nil && opts.telemetry.DeserializeTocLatency != nil {
  1090  			opts.telemetry.DeserializeTocLatency(start)
  1091  		}
  1092  		return &Reader{
  1093  			sr:           sr,
  1094  			toc:          toc,
  1095  			tocDigest:    tocDgst,
  1096  			decompressor: d,
  1097  		}, nil
  1098  	}
  1099  	if len(tocBytes) > 0 {
  1100  		start := time.Now()
  1101  		toc, tocDgst, err := d.ParseTOC(bytes.NewReader(tocBytes))
  1102  		if err == nil {
  1103  			if opts.telemetry != nil && opts.telemetry.DeserializeTocLatency != nil {
  1104  				opts.telemetry.DeserializeTocLatency(start)
  1105  			}
  1106  			return &Reader{
  1107  				sr:           sr,
  1108  				toc:          toc,
  1109  				tocDigest:    tocDgst,
  1110  				decompressor: d,
  1111  			}, nil
  1112  		}
  1113  	}
  1114  
  1115  	start := time.Now()
  1116  	tocBytes = make([]byte, tocSize)
  1117  	if _, err := sr.ReadAt(tocBytes, tocOff); err != nil {
  1118  		return nil, fmt.Errorf("error reading %d byte TOC targz: %v", len(tocBytes), err)
  1119  	}
  1120  	if opts.telemetry != nil && opts.telemetry.GetTocLatency != nil {
  1121  		opts.telemetry.GetTocLatency(start)
  1122  	}
  1123  	start = time.Now()
  1124  	toc, tocDgst, err := d.ParseTOC(bytes.NewReader(tocBytes))
  1125  	if err != nil {
  1126  		return nil, err
  1127  	}
  1128  	if opts.telemetry != nil && opts.telemetry.DeserializeTocLatency != nil {
  1129  		opts.telemetry.DeserializeTocLatency(start)
  1130  	}
  1131  	return &Reader{
  1132  		sr:           sr,
  1133  		toc:          toc,
  1134  		tocDigest:    tocDgst,
  1135  		decompressor: d,
  1136  	}, nil
  1137  }
  1138  
  1139  func formatModtime(t time.Time) string {
  1140  	if t.IsZero() || t.Unix() == 0 {
  1141  		return ""
  1142  	}
  1143  	return t.UTC().Round(time.Second).Format(time.RFC3339)
  1144  }
  1145  
  1146  func cleanEntryName(name string) string {
  1147  	// Use path.Clean to consistently deal with path separators across platforms.
  1148  	return strings.TrimPrefix(path.Clean("/"+name), "/")
  1149  }
  1150  
  1151  // countWriter counts how many bytes have been written to its wrapped
  1152  // io.Writer.
  1153  type countWriter struct {
  1154  	w io.Writer
  1155  	n int64
  1156  }
  1157  
  1158  func (cw *countWriter) Write(p []byte) (n int, err error) {
  1159  	n, err = cw.w.Write(p)
  1160  	cw.n += int64(n)
  1161  	return
  1162  }
  1163  
  1164  type countWriteFlusher struct {
  1165  	io.WriteCloser
  1166  	n int64
  1167  }
  1168  
  1169  func (wc *countWriteFlusher) register(w io.WriteCloser) io.WriteCloser {
  1170  	wc.WriteCloser = w
  1171  	return wc
  1172  }
  1173  
  1174  func (wc *countWriteFlusher) Write(p []byte) (n int, err error) {
  1175  	n, err = wc.WriteCloser.Write(p)
  1176  	wc.n += int64(n)
  1177  	return
  1178  }
  1179  
  1180  func (wc *countWriteFlusher) Flush() error {
  1181  	if f, ok := wc.WriteCloser.(interface {
  1182  		Flush() error
  1183  	}); ok {
  1184  		return f.Flush()
  1185  	}
  1186  	return nil
  1187  }
  1188  
  1189  func (wc *countWriteFlusher) Close() error {
  1190  	err := wc.WriteCloser.Close()
  1191  	wc.WriteCloser = nil
  1192  	return err
  1193  }
  1194  
  1195  // isGzip reports whether br is positioned right before an upcoming gzip stream.
  1196  // It does not consume any bytes from br.
  1197  func isGzip(br *bufio.Reader) bool {
  1198  	const (
  1199  		gzipID1     = 0x1f
  1200  		gzipID2     = 0x8b
  1201  		gzipDeflate = 8
  1202  	)
  1203  	peek, _ := br.Peek(3)
  1204  	return len(peek) >= 3 && peek[0] == gzipID1 && peek[1] == gzipID2 && peek[2] == gzipDeflate
  1205  }
  1206  
  1207  func positive(n int64) int64 {
  1208  	if n < 0 {
  1209  		return 0
  1210  	}
  1211  	return n
  1212  }
  1213  
  1214  type countReader struct {
  1215  	r io.Reader
  1216  	n int64
  1217  }
  1218  
  1219  func (cr *countReader) Read(p []byte) (n int, err error) {
  1220  	n, err = cr.r.Read(p)
  1221  	cr.n += int64(n)
  1222  	return
  1223  }
  1224  

View as plain text