1
15
16 package content
17
18 import (
19 "context"
20 "errors"
21 "io"
22 "time"
23
24 "github.com/containerd/containerd/content"
25 "github.com/opencontainers/go-digest"
26 )
27
28
29
30 type PassthroughWriter struct {
31 writer content.Writer
32 pipew *io.PipeWriter
33 digester digest.Digester
34 size int64
35 underlyingWriter *underlyingWriter
36 reader *io.PipeReader
37 hash *digest.Digest
38 done chan error
39 }
40
41
42
43
44
45 func NewPassthroughWriter(writer content.Writer, f func(r io.Reader, w io.Writer, done chan<- error), opts ...WriterOpt) content.Writer {
46
47 wOpts := DefaultWriterOpts()
48 for _, opt := range opts {
49 if err := opt(&wOpts); err != nil {
50 return nil
51 }
52 }
53
54 r, w := io.Pipe()
55 pw := &PassthroughWriter{
56 writer: writer,
57 pipew: w,
58 digester: digest.Canonical.Digester(),
59 underlyingWriter: &underlyingWriter{
60 writer: writer,
61 digester: digest.Canonical.Digester(),
62 hash: wOpts.OutputHash,
63 },
64 reader: r,
65 hash: wOpts.InputHash,
66 done: make(chan error, 1),
67 }
68 go f(r, pw.underlyingWriter, pw.done)
69 return pw
70 }
71
72 func (pw *PassthroughWriter) Write(p []byte) (n int, err error) {
73 n, err = pw.pipew.Write(p)
74 if pw.hash == nil {
75 pw.digester.Hash().Write(p[:n])
76 }
77 pw.size += int64(n)
78 return
79 }
80
81 func (pw *PassthroughWriter) Close() error {
82 if pw.pipew != nil {
83 pw.pipew.Close()
84 }
85 pw.writer.Close()
86 return nil
87 }
88
89
90 func (pw *PassthroughWriter) Digest() digest.Digest {
91 if pw.hash != nil {
92 return *pw.hash
93 }
94 return pw.digester.Digest()
95 }
96
97
98
99
100
101 func (pw *PassthroughWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
102 if pw.pipew != nil {
103 pw.pipew.Close()
104 }
105 err := <-pw.done
106 if pw.reader != nil {
107 pw.reader.Close()
108 }
109 if err != nil && err != io.EOF {
110 return err
111 }
112
113
114
115 return pw.writer.Commit(ctx, pw.underlyingWriter.size, pw.underlyingWriter.Digest(), opts...)
116 }
117
118
119 func (pw *PassthroughWriter) Status() (content.Status, error) {
120 return pw.writer.Status()
121 }
122
123
124 func (pw *PassthroughWriter) Truncate(size int64) error {
125 return pw.writer.Truncate(size)
126 }
127
128
129
130 type underlyingWriter struct {
131 writer content.Writer
132 digester digest.Digester
133 size int64
134 hash *digest.Digest
135 }
136
137
138 func (u *underlyingWriter) Write(p []byte) (int, error) {
139 n, err := u.writer.Write(p)
140 if err != nil {
141 return 0, err
142 }
143
144 if u.hash == nil {
145 u.digester.Hash().Write(p)
146 }
147 u.size += int64(len(p))
148 return n, nil
149 }
150
151
152 func (u *underlyingWriter) Size() int64 {
153 return u.size
154 }
155
156
157 func (u *underlyingWriter) Digest() digest.Digest {
158 if u.hash != nil {
159 return *u.hash
160 }
161 return u.digester.Digest()
162 }
163
164
165
166 type PassthroughMultiWriter struct {
167 writers []*PassthroughWriter
168 pipew *io.PipeWriter
169 digester digest.Digester
170 size int64
171 reader *io.PipeReader
172 hash *digest.Digest
173 done chan error
174 startedAt time.Time
175 updatedAt time.Time
176 }
177
178 func NewPassthroughMultiWriter(writers func(name string) (content.Writer, error), f func(r io.Reader, getwriter func(name string) io.Writer, done chan<- error), opts ...WriterOpt) content.Writer {
179
180 wOpts := DefaultWriterOpts()
181 for _, opt := range opts {
182 if err := opt(&wOpts); err != nil {
183 return nil
184 }
185 }
186
187 r, w := io.Pipe()
188
189 pmw := &PassthroughMultiWriter{
190 startedAt: time.Now(),
191 updatedAt: time.Now(),
192 done: make(chan error, 1),
193 digester: digest.Canonical.Digester(),
194 hash: wOpts.InputHash,
195 pipew: w,
196 reader: r,
197 }
198
199
200 getwriter := func(name string) io.Writer {
201 writer, err := writers(name)
202 if err != nil || writer == nil {
203 return nil
204 }
205 pw := &PassthroughWriter{
206 writer: writer,
207 digester: digest.Canonical.Digester(),
208 underlyingWriter: &underlyingWriter{
209 writer: writer,
210 digester: digest.Canonical.Digester(),
211 hash: wOpts.OutputHash,
212 },
213 done: make(chan error, 1),
214 }
215 pmw.writers = append(pmw.writers, pw)
216 return pw.underlyingWriter
217 }
218 go f(r, getwriter, pmw.done)
219 return pmw
220 }
221
222 func (pmw *PassthroughMultiWriter) Write(p []byte) (n int, err error) {
223 n, err = pmw.pipew.Write(p)
224 if pmw.hash == nil {
225 pmw.digester.Hash().Write(p[:n])
226 }
227 pmw.size += int64(n)
228 pmw.updatedAt = time.Now()
229 return
230 }
231
232 func (pmw *PassthroughMultiWriter) Close() error {
233 pmw.pipew.Close()
234 for _, w := range pmw.writers {
235 w.Close()
236 }
237 return nil
238 }
239
240
241 func (pmw *PassthroughMultiWriter) Digest() digest.Digest {
242 if pmw.hash != nil {
243 return *pmw.hash
244 }
245 return pmw.digester.Digest()
246 }
247
248
249
250
251
252 func (pmw *PassthroughMultiWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
253 pmw.pipew.Close()
254 err := <-pmw.done
255 if pmw.reader != nil {
256 pmw.reader.Close()
257 }
258 if err != nil && err != io.EOF {
259 return err
260 }
261
262
263
264 for _, w := range pmw.writers {
265
266 w.done <- err
267 if err := w.Commit(ctx, size, expected, opts...); err != nil {
268 return err
269 }
270 }
271 return nil
272 }
273
274
275 func (pmw *PassthroughMultiWriter) Status() (content.Status, error) {
276 return content.Status{
277 StartedAt: pmw.startedAt,
278 UpdatedAt: pmw.updatedAt,
279 Total: pmw.size,
280 }, nil
281 }
282
283
284 func (pmw *PassthroughMultiWriter) Truncate(size int64) error {
285 return errors.New("truncate unavailable on multiwriter")
286 }
287
View as plain text