1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package stream
17
18 import (
19 "bufio"
20 "compress/gzip"
21 "crypto"
22 "encoding/hex"
23 "errors"
24 "hash"
25 "io"
26 "os"
27 "sync"
28
29 v1 "github.com/google/go-containerregistry/pkg/v1"
30 "github.com/google/go-containerregistry/pkg/v1/types"
31 )
32
33 var (
34
35
36 ErrNotComputed = errors.New("value not computed until stream is consumed")
37
38
39
40 ErrConsumed = errors.New("stream was already consumed")
41 )
42
43
44 type Layer struct {
45 blob io.ReadCloser
46 consumed bool
47 compression int
48
49 mu sync.Mutex
50 digest, diffID *v1.Hash
51 size int64
52 mediaType types.MediaType
53 }
54
55 var _ v1.Layer = (*Layer)(nil)
56
57
58 type LayerOption func(*Layer)
59
60
61 func WithCompressionLevel(level int) LayerOption {
62 return func(l *Layer) {
63 l.compression = level
64 }
65 }
66
67
68 func WithMediaType(mt types.MediaType) LayerOption {
69 return func(l *Layer) {
70 l.mediaType = mt
71 }
72 }
73
74
75 func NewLayer(rc io.ReadCloser, opts ...LayerOption) *Layer {
76 layer := &Layer{
77 blob: rc,
78 compression: gzip.BestSpeed,
79
80
81 mediaType: types.DockerLayer,
82 }
83
84 for _, opt := range opts {
85 opt(layer)
86 }
87
88 return layer
89 }
90
91
92 func (l *Layer) Digest() (v1.Hash, error) {
93 l.mu.Lock()
94 defer l.mu.Unlock()
95 if l.digest == nil {
96 return v1.Hash{}, ErrNotComputed
97 }
98 return *l.digest, nil
99 }
100
101
102 func (l *Layer) DiffID() (v1.Hash, error) {
103 l.mu.Lock()
104 defer l.mu.Unlock()
105 if l.diffID == nil {
106 return v1.Hash{}, ErrNotComputed
107 }
108 return *l.diffID, nil
109 }
110
111
112 func (l *Layer) Size() (int64, error) {
113 l.mu.Lock()
114 defer l.mu.Unlock()
115 if l.size == 0 {
116 return 0, ErrNotComputed
117 }
118 return l.size, nil
119 }
120
121
122 func (l *Layer) MediaType() (types.MediaType, error) {
123 return l.mediaType, nil
124 }
125
126
127 func (l *Layer) Uncompressed() (io.ReadCloser, error) {
128 return nil, errors.New("NYI: stream.Layer.Uncompressed is not implemented")
129 }
130
131
132 func (l *Layer) Compressed() (io.ReadCloser, error) {
133 l.mu.Lock()
134 defer l.mu.Unlock()
135 if l.consumed {
136 return nil, ErrConsumed
137 }
138 return newCompressedReader(l)
139 }
140
141
142 func (l *Layer) finalize(uncompressed, compressed hash.Hash, size int64) error {
143 l.mu.Lock()
144 defer l.mu.Unlock()
145
146 diffID, err := v1.NewHash("sha256:" + hex.EncodeToString(uncompressed.Sum(nil)))
147 if err != nil {
148 return err
149 }
150 l.diffID = &diffID
151
152 digest, err := v1.NewHash("sha256:" + hex.EncodeToString(compressed.Sum(nil)))
153 if err != nil {
154 return err
155 }
156 l.digest = &digest
157
158 l.size = size
159 l.consumed = true
160 return nil
161 }
162
163 type compressedReader struct {
164 pr io.Reader
165 closer func() error
166 }
167
168 func newCompressedReader(l *Layer) (*compressedReader, error) {
169
170
171 h := crypto.SHA256.New()
172 zh := crypto.SHA256.New()
173 count := &countWriter{}
174
175
176
177
178 pr, pw := io.Pipe()
179
180
181 mw := io.MultiWriter(pw, zh, count)
182
183
184
185 bw := bufio.NewWriterSize(mw, 2<<16)
186 zw, err := gzip.NewWriterLevel(bw, l.compression)
187 if err != nil {
188 return nil, err
189 }
190
191 doneDigesting := make(chan struct{})
192
193 cr := &compressedReader{
194 pr: pr,
195 closer: func() error {
196
197
198
199
200
201
202
203
204
205
206
207
208 _ = pw.Close()
209
210
211
212
213
214 if err := l.blob.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
215 return err
216 }
217
218
219 <-doneDigesting
220 return l.finalize(h, zh, count.n)
221 },
222 }
223 go func() {
224
225
226 _, copyErr := io.Copy(io.MultiWriter(h, zw), l.blob)
227
228
229
230
231
232 closeErr := zw.Close()
233
234
235 if copyErr != nil {
236 close(doneDigesting)
237 pw.CloseWithError(copyErr)
238 return
239 }
240 if closeErr != nil {
241 close(doneDigesting)
242 pw.CloseWithError(closeErr)
243 return
244 }
245
246
247 if err := bw.Flush(); err != nil {
248 close(doneDigesting)
249 pw.CloseWithError(err)
250 return
251 }
252
253
254 close(doneDigesting)
255
256
257
258
259 pw.CloseWithError(cr.Close())
260 }()
261
262 return cr, nil
263 }
264
265 func (cr *compressedReader) Read(b []byte) (int, error) { return cr.pr.Read(b) }
266
267 func (cr *compressedReader) Close() error { return cr.closer() }
268
269
270 type countWriter struct{ n int64 }
271
272 func (c *countWriter) Write(p []byte) (int, error) {
273 c.n += int64(len(p))
274 return len(p), nil
275 }
276
View as plain text