...

Source file src/github.com/google/go-containerregistry/pkg/v1/stream/layer.go

Documentation: github.com/google/go-containerregistry/pkg/v1/stream

     1  // Copyright 2018 Google LLC All Rights Reserved.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //    http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Package stream implements a single-pass streaming v1.Layer.
    16  package stream
    17  
    18  import (
    19  	"bufio"
    20  	"compress/gzip"
    21  	"crypto"
    22  	"encoding/hex"
    23  	"errors"
    24  	"hash"
    25  	"io"
    26  	"os"
    27  	"sync"
    28  
    29  	v1 "github.com/google/go-containerregistry/pkg/v1"
    30  	"github.com/google/go-containerregistry/pkg/v1/types"
    31  )
    32  
    33  var (
    34  	// ErrNotComputed is returned when the requested value is not yet
    35  	// computed because the stream has not been consumed yet.
    36  	ErrNotComputed = errors.New("value not computed until stream is consumed")
    37  
    38  	// ErrConsumed is returned by Compressed when the underlying stream has
    39  	// already been consumed and closed.
    40  	ErrConsumed = errors.New("stream was already consumed")
    41  )
    42  
    43  // Layer is a streaming implementation of v1.Layer.
    44  type Layer struct {
    45  	blob        io.ReadCloser
    46  	consumed    bool
    47  	compression int
    48  
    49  	mu             sync.Mutex
    50  	digest, diffID *v1.Hash
    51  	size           int64
    52  	mediaType      types.MediaType
    53  }
    54  
    55  var _ v1.Layer = (*Layer)(nil)
    56  
    57  // LayerOption applies options to layer
    58  type LayerOption func(*Layer)
    59  
    60  // WithCompressionLevel sets the gzip compression. See `gzip.NewWriterLevel` for possible values.
    61  func WithCompressionLevel(level int) LayerOption {
    62  	return func(l *Layer) {
    63  		l.compression = level
    64  	}
    65  }
    66  
    67  // WithMediaType is a functional option for overriding the layer's media type.
    68  func WithMediaType(mt types.MediaType) LayerOption {
    69  	return func(l *Layer) {
    70  		l.mediaType = mt
    71  	}
    72  }
    73  
    74  // NewLayer creates a Layer from an io.ReadCloser.
    75  func NewLayer(rc io.ReadCloser, opts ...LayerOption) *Layer {
    76  	layer := &Layer{
    77  		blob:        rc,
    78  		compression: gzip.BestSpeed,
    79  		// We use DockerLayer for now as uncompressed layers
    80  		// are unimplemented
    81  		mediaType: types.DockerLayer,
    82  	}
    83  
    84  	for _, opt := range opts {
    85  		opt(layer)
    86  	}
    87  
    88  	return layer
    89  }
    90  
    91  // Digest implements v1.Layer.
    92  func (l *Layer) Digest() (v1.Hash, error) {
    93  	l.mu.Lock()
    94  	defer l.mu.Unlock()
    95  	if l.digest == nil {
    96  		return v1.Hash{}, ErrNotComputed
    97  	}
    98  	return *l.digest, nil
    99  }
   100  
   101  // DiffID implements v1.Layer.
   102  func (l *Layer) DiffID() (v1.Hash, error) {
   103  	l.mu.Lock()
   104  	defer l.mu.Unlock()
   105  	if l.diffID == nil {
   106  		return v1.Hash{}, ErrNotComputed
   107  	}
   108  	return *l.diffID, nil
   109  }
   110  
   111  // Size implements v1.Layer.
   112  func (l *Layer) Size() (int64, error) {
   113  	l.mu.Lock()
   114  	defer l.mu.Unlock()
   115  	if l.size == 0 {
   116  		return 0, ErrNotComputed
   117  	}
   118  	return l.size, nil
   119  }
   120  
   121  // MediaType implements v1.Layer
   122  func (l *Layer) MediaType() (types.MediaType, error) {
   123  	return l.mediaType, nil
   124  }
   125  
   126  // Uncompressed implements v1.Layer.
   127  func (l *Layer) Uncompressed() (io.ReadCloser, error) {
   128  	return nil, errors.New("NYI: stream.Layer.Uncompressed is not implemented")
   129  }
   130  
   131  // Compressed implements v1.Layer.
   132  func (l *Layer) Compressed() (io.ReadCloser, error) {
   133  	l.mu.Lock()
   134  	defer l.mu.Unlock()
   135  	if l.consumed {
   136  		return nil, ErrConsumed
   137  	}
   138  	return newCompressedReader(l)
   139  }
   140  
   141  // finalize sets the layer to consumed and computes all hash and size values.
   142  func (l *Layer) finalize(uncompressed, compressed hash.Hash, size int64) error {
   143  	l.mu.Lock()
   144  	defer l.mu.Unlock()
   145  
   146  	diffID, err := v1.NewHash("sha256:" + hex.EncodeToString(uncompressed.Sum(nil)))
   147  	if err != nil {
   148  		return err
   149  	}
   150  	l.diffID = &diffID
   151  
   152  	digest, err := v1.NewHash("sha256:" + hex.EncodeToString(compressed.Sum(nil)))
   153  	if err != nil {
   154  		return err
   155  	}
   156  	l.digest = &digest
   157  
   158  	l.size = size
   159  	l.consumed = true
   160  	return nil
   161  }
   162  
   163  type compressedReader struct {
   164  	pr     io.Reader
   165  	closer func() error
   166  }
   167  
   168  func newCompressedReader(l *Layer) (*compressedReader, error) {
   169  	// Collect digests of compressed and uncompressed stream and size of
   170  	// compressed stream.
   171  	h := crypto.SHA256.New()
   172  	zh := crypto.SHA256.New()
   173  	count := &countWriter{}
   174  
   175  	// gzip.Writer writes to the output stream via pipe, a hasher to
   176  	// capture compressed digest, and a countWriter to capture compressed
   177  	// size.
   178  	pr, pw := io.Pipe()
   179  
   180  	// Write compressed bytes to be read by the pipe.Reader, hashed by zh, and counted by count.
   181  	mw := io.MultiWriter(pw, zh, count)
   182  
   183  	// Buffer the output of the gzip writer so we don't have to wait on pr to keep writing.
   184  	// 64K ought to be small enough for anybody.
   185  	bw := bufio.NewWriterSize(mw, 2<<16)
   186  	zw, err := gzip.NewWriterLevel(bw, l.compression)
   187  	if err != nil {
   188  		return nil, err
   189  	}
   190  
   191  	doneDigesting := make(chan struct{})
   192  
   193  	cr := &compressedReader{
   194  		pr: pr,
   195  		closer: func() error {
   196  			// Immediately close pw without error. There are three ways to get
   197  			// here.
   198  			//
   199  			// 1. There was a copy error due from the underlying reader, in which
   200  			//    case the error will not be overwritten.
   201  			// 2. Copying from the underlying reader completed successfully.
   202  			// 3. Close has been called before the underlying reader has been
   203  			//    fully consumed. In this case pw must be closed in order to
   204  			//    keep the flush of bw from blocking indefinitely.
   205  			//
   206  			// NOTE: pw.Close never returns an error. The signature is only to
   207  			// implement io.Closer.
   208  			_ = pw.Close()
   209  
   210  			// Close the inner ReadCloser.
   211  			//
   212  			// NOTE: net/http will call close on success, so if we've already
   213  			// closed the inner rc, it's not an error.
   214  			if err := l.blob.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
   215  				return err
   216  			}
   217  
   218  			// Finalize layer with its digest and size values.
   219  			<-doneDigesting
   220  			return l.finalize(h, zh, count.n)
   221  		},
   222  	}
   223  	go func() {
   224  		// Copy blob into the gzip writer, which also hashes and counts the
   225  		// size of the compressed output, and hasher of the raw contents.
   226  		_, copyErr := io.Copy(io.MultiWriter(h, zw), l.blob)
   227  
   228  		// Close the gzip writer once copying is done. If this is done in the
   229  		// Close method of compressedReader instead, then it can cause a panic
   230  		// when the compressedReader is closed before the blob is fully
   231  		// consumed and io.Copy in this goroutine is still blocking.
   232  		closeErr := zw.Close()
   233  
   234  		// Check errors from writing and closing streams.
   235  		if copyErr != nil {
   236  			close(doneDigesting)
   237  			pw.CloseWithError(copyErr)
   238  			return
   239  		}
   240  		if closeErr != nil {
   241  			close(doneDigesting)
   242  			pw.CloseWithError(closeErr)
   243  			return
   244  		}
   245  
   246  		// Flush the buffer once all writes are complete to the gzip writer.
   247  		if err := bw.Flush(); err != nil {
   248  			close(doneDigesting)
   249  			pw.CloseWithError(err)
   250  			return
   251  		}
   252  
   253  		// Notify closer that digests are done being written.
   254  		close(doneDigesting)
   255  
   256  		// Close the compressed reader to calculate digest/diffID/size. This
   257  		// will cause pr to return EOF which will cause readers of the
   258  		// Compressed stream to finish reading.
   259  		pw.CloseWithError(cr.Close())
   260  	}()
   261  
   262  	return cr, nil
   263  }
   264  
   265  func (cr *compressedReader) Read(b []byte) (int, error) { return cr.pr.Read(b) }
   266  
   267  func (cr *compressedReader) Close() error { return cr.closer() }
   268  
   269  // countWriter counts bytes written to it.
   270  type countWriter struct{ n int64 }
   271  
   272  func (c *countWriter) Write(p []byte) (int, error) {
   273  	c.n += int64(len(p))
   274  	return len(p), nil
   275  }
   276  

View as plain text