1 package storage
2
3
4
5
6 import (
7 "bytes"
8 "encoding/xml"
9 "fmt"
10 "io"
11 "net/http"
12 "net/url"
13 "strconv"
14 "strings"
15 "time"
16 )
17
18
19
20
21
22
23 type BlockListType string
24
25
26 const (
27 BlockListTypeAll BlockListType = "all"
28 BlockListTypeCommitted BlockListType = "committed"
29 BlockListTypeUncommitted BlockListType = "uncommitted"
30 )
31
32
33 const (
34 MaxBlobBlockSize = 100 * 1024 * 1024
35 MaxBlobPageSize = 4 * 1024 * 1024
36 )
37
38
39
40 type BlockStatus string
41
42
43 const (
44 BlockStatusUncommitted BlockStatus = "Uncommitted"
45 BlockStatusCommitted BlockStatus = "Committed"
46 BlockStatusLatest BlockStatus = "Latest"
47 )
48
49
50
51 type Block struct {
52 ID string
53 Status BlockStatus
54 }
55
56
57
58
59 type BlockListResponse struct {
60 XMLName xml.Name `xml:"BlockList"`
61 CommittedBlocks []BlockResponse `xml:"CommittedBlocks>Block"`
62 UncommittedBlocks []BlockResponse `xml:"UncommittedBlocks>Block"`
63 }
64
65
66
67 type BlockResponse struct {
68 Name string `xml:"Name"`
69 Size int64 `xml:"Size"`
70 }
71
72
73
74
75
76
77 func (b *Blob) CreateBlockBlob(options *PutBlobOptions) error {
78 return b.CreateBlockBlobFromReader(nil, options)
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 func (b *Blob) CreateBlockBlobFromReader(blob io.Reader, options *PutBlobOptions) error {
98 params := url.Values{}
99 headers := b.Container.bsc.client.getStandardHeaders()
100 headers["x-ms-blob-type"] = string(BlobTypeBlock)
101
102 headers["Content-Length"] = "0"
103 var n int64
104 var err error
105 if blob != nil {
106 type lener interface {
107 Len() int
108 }
109
110 if l, ok := blob.(lener); ok {
111 n = int64(l.Len())
112 } else {
113 var buf bytes.Buffer
114 n, err = io.Copy(&buf, blob)
115 if err != nil {
116 return err
117 }
118 blob = &buf
119 }
120
121 headers["Content-Length"] = strconv.FormatInt(n, 10)
122 }
123 b.Properties.ContentLength = n
124
125 headers = mergeHeaders(headers, headersFromStruct(b.Properties))
126 headers = b.Container.bsc.client.addMetadataToHeaders(headers, b.Metadata)
127
128 if options != nil {
129 params = addTimeout(params, options.Timeout)
130 headers = mergeHeaders(headers, headersFromStruct(*options))
131 }
132 uri := b.Container.bsc.client.getEndpoint(blobServiceName, b.buildPath(), params)
133
134 resp, err := b.Container.bsc.client.exec(http.MethodPut, uri, headers, blob, b.Container.bsc.auth)
135 if err != nil {
136 return err
137 }
138 return b.respondCreation(resp, BlobTypeBlock)
139 }
140
141
142 type PutBlockOptions struct {
143 Timeout uint
144 LeaseID string `header:"x-ms-lease-id"`
145 ContentMD5 string `header:"Content-MD5"`
146 RequestID string `header:"x-ms-client-request-id"`
147 }
148
149
150
151
152
153
154
155
156 func (b *Blob) PutBlock(blockID string, chunk []byte, options *PutBlockOptions) error {
157 return b.PutBlockWithLength(blockID, uint64(len(chunk)), bytes.NewReader(chunk), options)
158 }
159
160
161
162
163
164
165
166
167
168 func (b *Blob) PutBlockWithLength(blockID string, size uint64, blob io.Reader, options *PutBlockOptions) error {
169 query := url.Values{
170 "comp": {"block"},
171 "blockid": {blockID},
172 }
173 headers := b.Container.bsc.client.getStandardHeaders()
174 headers["Content-Length"] = fmt.Sprintf("%v", size)
175
176 if options != nil {
177 query = addTimeout(query, options.Timeout)
178 headers = mergeHeaders(headers, headersFromStruct(*options))
179 }
180 uri := b.Container.bsc.client.getEndpoint(blobServiceName, b.buildPath(), query)
181
182 resp, err := b.Container.bsc.client.exec(http.MethodPut, uri, headers, blob, b.Container.bsc.auth)
183 if err != nil {
184 return err
185 }
186 return b.respondCreation(resp, BlobTypeBlock)
187 }
188
189
190 type PutBlockFromURLOptions struct {
191 PutBlockOptions
192
193 SourceContentMD5 string `header:"x-ms-source-content-md5"`
194 SourceContentCRC64 string `header:"x-ms-source-content-crc64"`
195 }
196
197
198
199
200
201
202
203
204
205 func (b *Blob) PutBlockFromURL(blockID string, blobURL string, offset int64, size uint64, options *PutBlockFromURLOptions) error {
206 query := url.Values{
207 "comp": {"block"},
208 "blockid": {blockID},
209 }
210 headers := b.Container.bsc.client.getStandardHeaders()
211
212
213 headers["Content-Length"] = "0"
214 headers["x-ms-copy-source"] = blobURL
215 headers["x-ms-source-range"] = fmt.Sprintf("bytes=%d-%d", offset, uint64(offset)+size-1)
216
217 if options != nil {
218 query = addTimeout(query, options.Timeout)
219 headers = mergeHeaders(headers, headersFromStruct(*options))
220 }
221 uri := b.Container.bsc.client.getEndpoint(blobServiceName, b.buildPath(), query)
222
223 resp, err := b.Container.bsc.client.exec(http.MethodPut, uri, headers, nil, b.Container.bsc.auth)
224 if err != nil {
225 return err
226 }
227 return b.respondCreation(resp, BlobTypeBlock)
228 }
229
230
231 type PutBlockListOptions struct {
232 Timeout uint
233 LeaseID string `header:"x-ms-lease-id"`
234 IfModifiedSince *time.Time `header:"If-Modified-Since"`
235 IfUnmodifiedSince *time.Time `header:"If-Unmodified-Since"`
236 IfMatch string `header:"If-Match"`
237 IfNoneMatch string `header:"If-None-Match"`
238 RequestID string `header:"x-ms-client-request-id"`
239 }
240
241
242
243
244 func (b *Blob) PutBlockList(blocks []Block, options *PutBlockListOptions) error {
245 params := url.Values{"comp": {"blocklist"}}
246 blockListXML := prepareBlockListRequest(blocks)
247 headers := b.Container.bsc.client.getStandardHeaders()
248 headers["Content-Length"] = fmt.Sprintf("%v", len(blockListXML))
249 headers = mergeHeaders(headers, headersFromStruct(b.Properties))
250 headers = b.Container.bsc.client.addMetadataToHeaders(headers, b.Metadata)
251
252 if options != nil {
253 params = addTimeout(params, options.Timeout)
254 headers = mergeHeaders(headers, headersFromStruct(*options))
255 }
256 uri := b.Container.bsc.client.getEndpoint(blobServiceName, b.buildPath(), params)
257
258 resp, err := b.Container.bsc.client.exec(http.MethodPut, uri, headers, strings.NewReader(blockListXML), b.Container.bsc.auth)
259 if err != nil {
260 return err
261 }
262 defer drainRespBody(resp)
263 return checkRespCode(resp, []int{http.StatusCreated})
264 }
265
266
267 type GetBlockListOptions struct {
268 Timeout uint
269 Snapshot *time.Time
270 LeaseID string `header:"x-ms-lease-id"`
271 RequestID string `header:"x-ms-client-request-id"`
272 }
273
274
275
276
277 func (b *Blob) GetBlockList(blockType BlockListType, options *GetBlockListOptions) (BlockListResponse, error) {
278 params := url.Values{
279 "comp": {"blocklist"},
280 "blocklisttype": {string(blockType)},
281 }
282 headers := b.Container.bsc.client.getStandardHeaders()
283
284 if options != nil {
285 params = addTimeout(params, options.Timeout)
286 params = addSnapshot(params, options.Snapshot)
287 headers = mergeHeaders(headers, headersFromStruct(*options))
288 }
289 uri := b.Container.bsc.client.getEndpoint(blobServiceName, b.buildPath(), params)
290
291 var out BlockListResponse
292 resp, err := b.Container.bsc.client.exec(http.MethodGet, uri, headers, nil, b.Container.bsc.auth)
293 if err != nil {
294 return out, err
295 }
296 defer resp.Body.Close()
297
298 err = xmlUnmarshal(resp.Body, &out)
299 return out, err
300 }
301
View as plain text