...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package ocimem
16
17 import (
18 "bytes"
19 "crypto/rand"
20 "fmt"
21 "sync"
22
23 "github.com/opencontainers/go-digest"
24
25 "cuelabs.dev/go/oci/ociregistry"
26 )
27
28
29
30
31 func NewBytesReader(data []byte, desc ociregistry.Descriptor) ociregistry.BlobReader {
32 r := &bytesReader{
33 desc: desc,
34 }
35 r.r.Reset(data)
36 return r
37 }
38
39 type bytesReader struct {
40 r bytes.Reader
41 desc ociregistry.Descriptor
42 }
43
44 func (r *bytesReader) Close() error {
45 return nil
46 }
47
48
49 func (r *bytesReader) Descriptor() ociregistry.Descriptor {
50 return r.desc
51 }
52
53 func (r *bytesReader) Read(data []byte) (int, error) {
54 return r.r.Read(data)
55 }
56
57
58 type Buffer struct {
59 commit func(b *Buffer) error
60 mu sync.Mutex
61 buf []byte
62 checkStartOffset int64
63 uuid string
64 committed bool
65 desc ociregistry.Descriptor
66 commitErr error
67 }
68
69
70
71
72
73 func NewBuffer(commit func(b *Buffer) error, uuid string) *Buffer {
74 if uuid == "" {
75 uuid = newUUID()
76 }
77 return &Buffer{
78 commit: commit,
79 uuid: uuid,
80 }
81 }
82
83 func (b *Buffer) Cancel() error {
84 b.mu.Lock()
85 defer b.mu.Unlock()
86 b.commitErr = fmt.Errorf("upload canceled")
87 return nil
88 }
89
90 func (b *Buffer) Close() error {
91 return nil
92 }
93
94 func (b *Buffer) Size() int64 {
95 b.mu.Lock()
96 defer b.mu.Unlock()
97 return int64(len(b.buf))
98 }
99
100 func (b *Buffer) ChunkSize() int {
101 return 8 * 1024
102 }
103
104
105
106 func (b *Buffer) GetBlob() (ociregistry.Descriptor, []byte, error) {
107 b.mu.Lock()
108 defer b.mu.Unlock()
109 if !b.committed {
110 return ociregistry.Descriptor{}, nil, fmt.Errorf("blob not committed")
111 }
112 if b.commitErr != nil {
113 return ociregistry.Descriptor{}, nil, b.commitErr
114 }
115 return b.desc, b.buf, nil
116 }
117
118
119 func (b *Buffer) Write(data []byte) (int, error) {
120 b.mu.Lock()
121 defer b.mu.Unlock()
122 if offset := b.checkStartOffset; offset != -1 {
123
124 if int64(len(b.buf)) != offset {
125 return 0, fmt.Errorf("invalid offset %d in resumed upload (actual offset %d): %w", offset, len(b.buf), ociregistry.ErrRangeInvalid)
126 }
127
128 b.checkStartOffset = -1
129 }
130 b.buf = append(b.buf, data...)
131 return len(data), nil
132 }
133
134 func newUUID() string {
135 buf := make([]byte, 32)
136 if _, err := rand.Read(buf); err != nil {
137 panic(err)
138 }
139 return fmt.Sprintf("%x", buf)
140 }
141
142
143
144 func (b *Buffer) ID() string {
145 return b.uuid
146 }
147
148
149
150 func (b *Buffer) Commit(dig ociregistry.Digest) (_ ociregistry.Descriptor, err error) {
151 if err := b.checkCommit(dig); err != nil {
152 return ociregistry.Descriptor{}, err
153 }
154
155
156 if err := b.commit(b); err != nil {
157 b.mu.Lock()
158 defer b.mu.Unlock()
159
160 b.commitErr = err
161 return ociregistry.Descriptor{}, err
162 }
163 return ociregistry.Descriptor{
164 MediaType: "application/octet-stream",
165 Size: int64(len(b.buf)),
166 Digest: dig,
167 }, nil
168 }
169
170 func (b *Buffer) checkCommit(dig ociregistry.Digest) (err error) {
171 b.mu.Lock()
172 defer b.mu.Unlock()
173 if b.commitErr != nil {
174 return b.commitErr
175 }
176 defer func() {
177 if err != nil {
178 b.commitErr = err
179 }
180 }()
181 if digest.FromBytes(b.buf) != dig {
182 return fmt.Errorf("digest mismatch (sha256(%q) != %s): %w", b.buf, dig, ociregistry.ErrDigestInvalid)
183 }
184 b.desc = ociregistry.Descriptor{
185 MediaType: "application/octet-stream",
186 Digest: dig,
187 Size: int64(len(b.buf)),
188 }
189 b.committed = true
190 return nil
191 }
192
View as plain text