...

Source file src/oras.land/oras-go/pkg/content/passthrough.go

Documentation: oras.land/oras-go/pkg/content

     1  /*
     2  Copyright The ORAS Authors.
     3  Licensed under the Apache License, Version 2.0 (the "License");
     4  you may not use this file except in compliance with the License.
     5  You may obtain a copy of the License at
     6  
     7  http://www.apache.org/licenses/LICENSE-2.0
     8  
     9  Unless required by applicable law or agreed to in writing, software
    10  distributed under the License is distributed on an "AS IS" BASIS,
    11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  See the License for the specific language governing permissions and
    13  limitations under the License.
    14  */
    15  
    16  package content
    17  
    18  import (
    19  	"context"
    20  	"errors"
    21  	"io"
    22  	"time"
    23  
    24  	"github.com/containerd/containerd/content"
    25  	"github.com/opencontainers/go-digest"
    26  )
    27  
    28  // PassthroughWriter takes an input stream and passes it through to an underlying writer,
    29  // while providing the ability to manipulate the stream before it gets passed through
    30  type PassthroughWriter struct {
    31  	writer           content.Writer
    32  	pipew            *io.PipeWriter
    33  	digester         digest.Digester
    34  	size             int64
    35  	underlyingWriter *underlyingWriter
    36  	reader           *io.PipeReader
    37  	hash             *digest.Digest
    38  	done             chan error
    39  }
    40  
    41  // NewPassthroughWriter creates a pass-through writer that allows for processing
    42  // the content via an arbitrary function. The function should do whatever processing it
    43  // wants, reading from the Reader to the Writer. When done, it must indicate via
    44  // sending an error or nil to the Done
    45  func NewPassthroughWriter(writer content.Writer, f func(r io.Reader, w io.Writer, done chan<- error), opts ...WriterOpt) content.Writer {
    46  	// process opts for default
    47  	wOpts := DefaultWriterOpts()
    48  	for _, opt := range opts {
    49  		if err := opt(&wOpts); err != nil {
    50  			return nil
    51  		}
    52  	}
    53  
    54  	r, w := io.Pipe()
    55  	pw := &PassthroughWriter{
    56  		writer:   writer,
    57  		pipew:    w,
    58  		digester: digest.Canonical.Digester(),
    59  		underlyingWriter: &underlyingWriter{
    60  			writer:   writer,
    61  			digester: digest.Canonical.Digester(),
    62  			hash:     wOpts.OutputHash,
    63  		},
    64  		reader: r,
    65  		hash:   wOpts.InputHash,
    66  		done:   make(chan error, 1),
    67  	}
    68  	go f(r, pw.underlyingWriter, pw.done)
    69  	return pw
    70  }
    71  
    72  func (pw *PassthroughWriter) Write(p []byte) (n int, err error) {
    73  	n, err = pw.pipew.Write(p)
    74  	if pw.hash == nil {
    75  		pw.digester.Hash().Write(p[:n])
    76  	}
    77  	pw.size += int64(n)
    78  	return
    79  }
    80  
    81  func (pw *PassthroughWriter) Close() error {
    82  	if pw.pipew != nil {
    83  		pw.pipew.Close()
    84  	}
    85  	pw.writer.Close()
    86  	return nil
    87  }
    88  
    89  // Digest may return empty digest or panics until committed.
    90  func (pw *PassthroughWriter) Digest() digest.Digest {
    91  	if pw.hash != nil {
    92  		return *pw.hash
    93  	}
    94  	return pw.digester.Digest()
    95  }
    96  
    97  // Commit commits the blob (but no roll-back is guaranteed on an error).
    98  // size and expected can be zero-value when unknown.
    99  // Commit always closes the writer, even on error.
   100  // ErrAlreadyExists aborts the writer.
   101  func (pw *PassthroughWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
   102  	if pw.pipew != nil {
   103  		pw.pipew.Close()
   104  	}
   105  	err := <-pw.done
   106  	if pw.reader != nil {
   107  		pw.reader.Close()
   108  	}
   109  	if err != nil && err != io.EOF {
   110  		return err
   111  	}
   112  
   113  	// Some underlying writers will validate an expected digest, so we need the option to pass it
   114  	// that digest. That is why we caluclate the digest of the underlying writer throughout the write process.
   115  	return pw.writer.Commit(ctx, pw.underlyingWriter.size, pw.underlyingWriter.Digest(), opts...)
   116  }
   117  
   118  // Status returns the current state of write
   119  func (pw *PassthroughWriter) Status() (content.Status, error) {
   120  	return pw.writer.Status()
   121  }
   122  
   123  // Truncate updates the size of the target blob
   124  func (pw *PassthroughWriter) Truncate(size int64) error {
   125  	return pw.writer.Truncate(size)
   126  }
   127  
   128  // underlyingWriter implementation of io.Writer to write to the underlying
   129  // io.Writer
   130  type underlyingWriter struct {
   131  	writer   content.Writer
   132  	digester digest.Digester
   133  	size     int64
   134  	hash     *digest.Digest
   135  }
   136  
   137  // Write write to the underlying writer
   138  func (u *underlyingWriter) Write(p []byte) (int, error) {
   139  	n, err := u.writer.Write(p)
   140  	if err != nil {
   141  		return 0, err
   142  	}
   143  
   144  	if u.hash == nil {
   145  		u.digester.Hash().Write(p)
   146  	}
   147  	u.size += int64(len(p))
   148  	return n, nil
   149  }
   150  
   151  // Size get total size written
   152  func (u *underlyingWriter) Size() int64 {
   153  	return u.size
   154  }
   155  
   156  // Digest may return empty digest or panics until committed.
   157  func (u *underlyingWriter) Digest() digest.Digest {
   158  	if u.hash != nil {
   159  		return *u.hash
   160  	}
   161  	return u.digester.Digest()
   162  }
   163  
   164  // PassthroughMultiWriter single writer that passes through to multiple writers, allowing the passthrough
   165  // function to select which writer to use.
   166  type PassthroughMultiWriter struct {
   167  	writers   []*PassthroughWriter
   168  	pipew     *io.PipeWriter
   169  	digester  digest.Digester
   170  	size      int64
   171  	reader    *io.PipeReader
   172  	hash      *digest.Digest
   173  	done      chan error
   174  	startedAt time.Time
   175  	updatedAt time.Time
   176  }
   177  
   178  func NewPassthroughMultiWriter(writers func(name string) (content.Writer, error), f func(r io.Reader, getwriter func(name string) io.Writer, done chan<- error), opts ...WriterOpt) content.Writer {
   179  	// process opts for default
   180  	wOpts := DefaultWriterOpts()
   181  	for _, opt := range opts {
   182  		if err := opt(&wOpts); err != nil {
   183  			return nil
   184  		}
   185  	}
   186  
   187  	r, w := io.Pipe()
   188  
   189  	pmw := &PassthroughMultiWriter{
   190  		startedAt: time.Now(),
   191  		updatedAt: time.Now(),
   192  		done:      make(chan error, 1),
   193  		digester: digest.Canonical.Digester(),
   194  		hash:     wOpts.InputHash,
   195  		pipew: w,
   196  		reader: r,
   197  	}
   198  
   199  	// get our output writers
   200  	getwriter := func(name string) io.Writer {
   201  		writer, err := writers(name)
   202  		if err != nil || writer == nil {
   203  			return nil
   204  		}
   205  		pw := &PassthroughWriter{
   206  			writer:   writer,
   207  			digester: digest.Canonical.Digester(),
   208  			underlyingWriter: &underlyingWriter{
   209  				writer:   writer,
   210  				digester: digest.Canonical.Digester(),
   211  				hash:     wOpts.OutputHash,
   212  			},
   213  			done:   make(chan error, 1),
   214  		}
   215  		pmw.writers = append(pmw.writers, pw)
   216  		return pw.underlyingWriter
   217  	}
   218  	go f(r, getwriter, pmw.done)
   219  	return pmw
   220  }
   221  
   222  func (pmw *PassthroughMultiWriter) Write(p []byte) (n int, err error) {
   223  	n, err = pmw.pipew.Write(p)
   224  	if pmw.hash == nil {
   225  		pmw.digester.Hash().Write(p[:n])
   226  	}
   227  	pmw.size += int64(n)
   228  	pmw.updatedAt = time.Now()
   229  	return
   230  }
   231  
   232  func (pmw *PassthroughMultiWriter) Close() error {
   233  	pmw.pipew.Close()
   234  	for _, w := range pmw.writers {
   235  		w.Close()
   236  	}
   237  	return nil
   238  }
   239  
   240  // Digest may return empty digest or panics until committed.
   241  func (pmw *PassthroughMultiWriter) Digest() digest.Digest {
   242  	if pmw.hash != nil {
   243  		return *pmw.hash
   244  	}
   245  	return pmw.digester.Digest()
   246  }
   247  
   248  // Commit commits the blob (but no roll-back is guaranteed on an error).
   249  // size and expected can be zero-value when unknown.
   250  // Commit always closes the writer, even on error.
   251  // ErrAlreadyExists aborts the writer.
   252  func (pmw *PassthroughMultiWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
   253  	pmw.pipew.Close()
   254  	err := <-pmw.done
   255  	if pmw.reader != nil {
   256  		pmw.reader.Close()
   257  	}
   258  	if err != nil && err != io.EOF {
   259  		return err
   260  	}
   261  
   262  	// Some underlying writers will validate an expected digest, so we need the option to pass it
   263  	// that digest. That is why we caluclate the digest of the underlying writer throughout the write process.
   264  	for _, w := range pmw.writers {
   265  		// maybe this should be Commit(ctx, pw.underlyingWriter.size, pw.underlyingWriter.Digest(), opts...)
   266  		w.done <- err
   267  		if err := w.Commit(ctx, size, expected, opts...); err != nil {
   268  			return err
   269  		}
   270  	}
   271  	return nil
   272  }
   273  
   274  // Status returns the current state of write
   275  func (pmw *PassthroughMultiWriter) Status() (content.Status, error) {
   276  	return content.Status{
   277  		StartedAt: pmw.startedAt,
   278  		UpdatedAt: pmw.updatedAt,
   279  		Total:     pmw.size,
   280  	}, nil
   281  }
   282  
   283  // Truncate updates the size of the target blob, but cannot do anything with a multiwriter
   284  func (pmw *PassthroughMultiWriter) Truncate(size int64) error {
   285  	return errors.New("truncate unavailable on multiwriter")
   286  }
   287  

View as plain text