1 package storage
2
3
4
5
6 import (
7 "encoding/xml"
8 "fmt"
9 "io"
10 "net/http"
11 "net/url"
12 "strconv"
13 "time"
14 )
15
16 const (
17
18 approximateMessagesCountHeader = "X-Ms-Approximate-Messages-Count"
19 )
20
21
22 type QueueAccessPolicy struct {
23 ID string
24 StartTime time.Time
25 ExpiryTime time.Time
26 CanRead bool
27 CanAdd bool
28 CanUpdate bool
29 CanProcess bool
30 }
31
32
33 type QueuePermissions struct {
34 AccessPolicies []QueueAccessPolicy
35 }
36
37
38 type SetQueuePermissionOptions struct {
39 Timeout uint
40 RequestID string `header:"x-ms-client-request-id"`
41 }
42
43
44 type Queue struct {
45 qsc *QueueServiceClient
46 Name string
47 Metadata map[string]string
48 AproxMessageCount uint64
49 }
50
51 func (q *Queue) buildPath() string {
52 return fmt.Sprintf("/%s", q.Name)
53 }
54
55 func (q *Queue) buildPathMessages() string {
56 return fmt.Sprintf("%s/messages", q.buildPath())
57 }
58
59
60 type QueueServiceOptions struct {
61 Timeout uint
62 RequestID string `header:"x-ms-client-request-id"`
63 }
64
65
66
67
68 func (q *Queue) Create(options *QueueServiceOptions) error {
69 params := url.Values{}
70 headers := q.qsc.client.getStandardHeaders()
71 headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata)
72
73 if options != nil {
74 params = addTimeout(params, options.Timeout)
75 headers = mergeHeaders(headers, headersFromStruct(*options))
76 }
77 uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
78
79 resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth)
80 if err != nil {
81 return err
82 }
83 defer drainRespBody(resp)
84 return checkRespCode(resp, []int{http.StatusCreated})
85 }
86
87
88
89
90 func (q *Queue) Delete(options *QueueServiceOptions) error {
91 params := url.Values{}
92 headers := q.qsc.client.getStandardHeaders()
93
94 if options != nil {
95 params = addTimeout(params, options.Timeout)
96 headers = mergeHeaders(headers, headersFromStruct(*options))
97 }
98 uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
99 resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth)
100 if err != nil {
101 return err
102 }
103 defer drainRespBody(resp)
104 return checkRespCode(resp, []int{http.StatusNoContent})
105 }
106
107
108 func (q *Queue) Exists() (bool, error) {
109 uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), url.Values{"comp": {"metadata"}})
110 resp, err := q.qsc.client.exec(http.MethodGet, uri, q.qsc.client.getStandardHeaders(), nil, q.qsc.auth)
111 if resp != nil {
112 defer drainRespBody(resp)
113 if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNotFound {
114 return resp.StatusCode == http.StatusOK, nil
115 }
116 err = getErrorFromResponse(resp)
117 }
118 return false, err
119 }
120
121
122
123
124
125 func (q *Queue) SetMetadata(options *QueueServiceOptions) error {
126 params := url.Values{"comp": {"metadata"}}
127 headers := q.qsc.client.getStandardHeaders()
128 headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata)
129
130 if options != nil {
131 params = addTimeout(params, options.Timeout)
132 headers = mergeHeaders(headers, headersFromStruct(*options))
133 }
134 uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
135
136 resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth)
137 if err != nil {
138 return err
139 }
140 defer drainRespBody(resp)
141 return checkRespCode(resp, []int{http.StatusNoContent})
142 }
143
144
145
146
147
148
149
150
151
152
153 func (q *Queue) GetMetadata(options *QueueServiceOptions) error {
154 params := url.Values{"comp": {"metadata"}}
155 headers := q.qsc.client.getStandardHeaders()
156
157 if options != nil {
158 params = addTimeout(params, options.Timeout)
159 headers = mergeHeaders(headers, headersFromStruct(*options))
160 }
161 uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
162
163 resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
164 if err != nil {
165 return err
166 }
167 defer drainRespBody(resp)
168
169 if err := checkRespCode(resp, []int{http.StatusOK}); err != nil {
170 return err
171 }
172
173 aproxMessagesStr := resp.Header.Get(http.CanonicalHeaderKey(approximateMessagesCountHeader))
174 if aproxMessagesStr != "" {
175 aproxMessages, err := strconv.ParseUint(aproxMessagesStr, 10, 64)
176 if err != nil {
177 return err
178 }
179 q.AproxMessageCount = aproxMessages
180 }
181
182 q.Metadata = getMetadataFromHeaders(resp.Header)
183 return nil
184 }
185
186
187 func (q *Queue) GetMessageReference(text string) *Message {
188 return &Message{
189 Queue: q,
190 Text: text,
191 }
192 }
193
194
195
196
197 type GetMessagesOptions struct {
198 Timeout uint
199 NumOfMessages int
200 VisibilityTimeout int
201 RequestID string `header:"x-ms-client-request-id"`
202 }
203
204 type messages struct {
205 XMLName xml.Name `xml:"QueueMessagesList"`
206 Messages []Message `xml:"QueueMessage"`
207 }
208
209
210
211
212
213 func (q *Queue) GetMessages(options *GetMessagesOptions) ([]Message, error) {
214 query := url.Values{}
215 headers := q.qsc.client.getStandardHeaders()
216
217 if options != nil {
218 if options.NumOfMessages != 0 {
219 query.Set("numofmessages", strconv.Itoa(options.NumOfMessages))
220 }
221 if options.VisibilityTimeout != 0 {
222 query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
223 }
224 query = addTimeout(query, options.Timeout)
225 headers = mergeHeaders(headers, headersFromStruct(*options))
226 }
227 uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query)
228
229 resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
230 if err != nil {
231 return []Message{}, err
232 }
233 defer resp.Body.Close()
234
235 var out messages
236 err = xmlUnmarshal(resp.Body, &out)
237 if err != nil {
238 return []Message{}, err
239 }
240 for i := range out.Messages {
241 out.Messages[i].Queue = q
242 }
243 return out.Messages, err
244 }
245
246
247
248
249 type PeekMessagesOptions struct {
250 Timeout uint
251 NumOfMessages int
252 RequestID string `header:"x-ms-client-request-id"`
253 }
254
255
256
257
258
259 func (q *Queue) PeekMessages(options *PeekMessagesOptions) ([]Message, error) {
260 query := url.Values{"peekonly": {"true"}}
261 headers := q.qsc.client.getStandardHeaders()
262
263 if options != nil {
264 if options.NumOfMessages != 0 {
265 query.Set("numofmessages", strconv.Itoa(options.NumOfMessages))
266 }
267 query = addTimeout(query, options.Timeout)
268 headers = mergeHeaders(headers, headersFromStruct(*options))
269 }
270 uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query)
271
272 resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
273 if err != nil {
274 return []Message{}, err
275 }
276 defer resp.Body.Close()
277
278 var out messages
279 err = xmlUnmarshal(resp.Body, &out)
280 if err != nil {
281 return []Message{}, err
282 }
283 for i := range out.Messages {
284 out.Messages[i].Queue = q
285 }
286 return out.Messages, err
287 }
288
289
290
291
292 func (q *Queue) ClearMessages(options *QueueServiceOptions) error {
293 params := url.Values{}
294 headers := q.qsc.client.getStandardHeaders()
295
296 if options != nil {
297 params = addTimeout(params, options.Timeout)
298 headers = mergeHeaders(headers, headersFromStruct(*options))
299 }
300 uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), params)
301
302 resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth)
303 if err != nil {
304 return err
305 }
306 defer drainRespBody(resp)
307 return checkRespCode(resp, []int{http.StatusNoContent})
308 }
309
310
311
312 func (q *Queue) SetPermissions(permissions QueuePermissions, options *SetQueuePermissionOptions) error {
313 body, length, err := generateQueueACLpayload(permissions.AccessPolicies)
314 if err != nil {
315 return err
316 }
317
318 params := url.Values{
319 "comp": {"acl"},
320 }
321 headers := q.qsc.client.getStandardHeaders()
322 headers["Content-Length"] = strconv.Itoa(length)
323
324 if options != nil {
325 params = addTimeout(params, options.Timeout)
326 headers = mergeHeaders(headers, headersFromStruct(*options))
327 }
328 uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
329 resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, body, q.qsc.auth)
330 if err != nil {
331 return err
332 }
333 defer drainRespBody(resp)
334 return checkRespCode(resp, []int{http.StatusNoContent})
335 }
336
337 func generateQueueACLpayload(policies []QueueAccessPolicy) (io.Reader, int, error) {
338 sil := SignedIdentifiers{
339 SignedIdentifiers: []SignedIdentifier{},
340 }
341 for _, qapd := range policies {
342 permission := qapd.generateQueuePermissions()
343 signedIdentifier := convertAccessPolicyToXMLStructs(qapd.ID, qapd.StartTime, qapd.ExpiryTime, permission)
344 sil.SignedIdentifiers = append(sil.SignedIdentifiers, signedIdentifier)
345 }
346 return xmlMarshal(sil)
347 }
348
349 func (qapd *QueueAccessPolicy) generateQueuePermissions() (permissions string) {
350
351
352 permissions = ""
353
354 if qapd.CanRead {
355 permissions += "r"
356 }
357
358 if qapd.CanAdd {
359 permissions += "a"
360 }
361
362 if qapd.CanUpdate {
363 permissions += "u"
364 }
365
366 if qapd.CanProcess {
367 permissions += "p"
368 }
369
370 return permissions
371 }
372
373
374 type GetQueuePermissionOptions struct {
375 Timeout uint
376 RequestID string `header:"x-ms-client-request-id"`
377 }
378
379
380
381 func (q *Queue) GetPermissions(options *GetQueuePermissionOptions) (*QueuePermissions, error) {
382 params := url.Values{
383 "comp": {"acl"},
384 }
385 headers := q.qsc.client.getStandardHeaders()
386
387 if options != nil {
388 params = addTimeout(params, options.Timeout)
389 headers = mergeHeaders(headers, headersFromStruct(*options))
390 }
391 uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
392 resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
393 if err != nil {
394 return nil, err
395 }
396 defer resp.Body.Close()
397
398 var ap AccessPolicy
399 err = xmlUnmarshal(resp.Body, &ap.SignedIdentifiersList)
400 if err != nil {
401 return nil, err
402 }
403 return buildQueueAccessPolicy(ap, &resp.Header), nil
404 }
405
406 func buildQueueAccessPolicy(ap AccessPolicy, headers *http.Header) *QueuePermissions {
407 permissions := QueuePermissions{
408 AccessPolicies: []QueueAccessPolicy{},
409 }
410
411 for _, policy := range ap.SignedIdentifiersList.SignedIdentifiers {
412 qapd := QueueAccessPolicy{
413 ID: policy.ID,
414 StartTime: policy.AccessPolicy.StartTime,
415 ExpiryTime: policy.AccessPolicy.ExpiryTime,
416 }
417 qapd.CanRead = updatePermissions(policy.AccessPolicy.Permission, "r")
418 qapd.CanAdd = updatePermissions(policy.AccessPolicy.Permission, "a")
419 qapd.CanUpdate = updatePermissions(policy.AccessPolicy.Permission, "u")
420 qapd.CanProcess = updatePermissions(policy.AccessPolicy.Permission, "p")
421
422 permissions.AccessPolicies = append(permissions.AccessPolicies, qapd)
423 }
424 return &permissions
425 }
426
View as plain text