...
1 package storage
2
3
4
5
6 import (
7 "encoding/xml"
8 "fmt"
9 "net/http"
10 "net/url"
11 "strconv"
12 "time"
13 )
14
15
16 type Message struct {
17 Queue *Queue
18 Text string `xml:"MessageText"`
19 ID string `xml:"MessageId"`
20 Insertion TimeRFC1123 `xml:"InsertionTime"`
21 Expiration TimeRFC1123 `xml:"ExpirationTime"`
22 PopReceipt string `xml:"PopReceipt"`
23 NextVisible TimeRFC1123 `xml:"TimeNextVisible"`
24 DequeueCount int `xml:"DequeueCount"`
25 }
26
27 func (m *Message) buildPath() string {
28 return fmt.Sprintf("%s/%s", m.Queue.buildPathMessages(), m.ID)
29 }
30
31
32
33 type PutMessageOptions struct {
34 Timeout uint
35 VisibilityTimeout int
36 MessageTTL int
37 RequestID string `header:"x-ms-client-request-id"`
38 }
39
40
41
42
43 func (m *Message) Put(options *PutMessageOptions) error {
44 query := url.Values{}
45 headers := m.Queue.qsc.client.getStandardHeaders()
46
47 req := putMessageRequest{MessageText: m.Text}
48 body, nn, err := xmlMarshal(req)
49 if err != nil {
50 return err
51 }
52 headers["Content-Length"] = strconv.Itoa(nn)
53
54 if options != nil {
55 if options.VisibilityTimeout != 0 {
56 query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
57 }
58 if options.MessageTTL != 0 {
59 query.Set("messagettl", strconv.Itoa(options.MessageTTL))
60 }
61 query = addTimeout(query, options.Timeout)
62 headers = mergeHeaders(headers, headersFromStruct(*options))
63 }
64
65 uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.Queue.buildPathMessages(), query)
66 resp, err := m.Queue.qsc.client.exec(http.MethodPost, uri, headers, body, m.Queue.qsc.auth)
67 if err != nil {
68 return err
69 }
70 defer drainRespBody(resp)
71 err = checkRespCode(resp, []int{http.StatusCreated})
72 if err != nil {
73 return err
74 }
75 err = xmlUnmarshal(resp.Body, m)
76 if err != nil {
77 return err
78 }
79 return nil
80 }
81
82
83
84 type UpdateMessageOptions struct {
85 Timeout uint
86 VisibilityTimeout int
87 RequestID string `header:"x-ms-client-request-id"`
88 }
89
90
91
92
93 func (m *Message) Update(options *UpdateMessageOptions) error {
94 query := url.Values{}
95 if m.PopReceipt != "" {
96 query.Set("popreceipt", m.PopReceipt)
97 }
98
99 headers := m.Queue.qsc.client.getStandardHeaders()
100 req := putMessageRequest{MessageText: m.Text}
101 body, nn, err := xmlMarshal(req)
102 if err != nil {
103 return err
104 }
105 headers["Content-Length"] = strconv.Itoa(nn)
106
107 query.Set("visibilitytimeout", "0")
108 if options != nil {
109 if options.VisibilityTimeout != 0 {
110 query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
111 }
112 query = addTimeout(query, options.Timeout)
113 headers = mergeHeaders(headers, headersFromStruct(*options))
114 }
115 uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.buildPath(), query)
116
117 resp, err := m.Queue.qsc.client.exec(http.MethodPut, uri, headers, body, m.Queue.qsc.auth)
118 if err != nil {
119 return err
120 }
121 defer drainRespBody(resp)
122
123 m.PopReceipt = resp.Header.Get("x-ms-popreceipt")
124 nextTimeStr := resp.Header.Get("x-ms-time-next-visible")
125 if nextTimeStr != "" {
126 nextTime, err := time.Parse(time.RFC1123, nextTimeStr)
127 if err != nil {
128 return err
129 }
130 m.NextVisible = TimeRFC1123(nextTime)
131 }
132
133 return checkRespCode(resp, []int{http.StatusNoContent})
134 }
135
136
137
138
139 func (m *Message) Delete(options *QueueServiceOptions) error {
140 params := url.Values{"popreceipt": {m.PopReceipt}}
141 headers := m.Queue.qsc.client.getStandardHeaders()
142
143 if options != nil {
144 params = addTimeout(params, options.Timeout)
145 headers = mergeHeaders(headers, headersFromStruct(*options))
146 }
147 uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.buildPath(), params)
148
149 resp, err := m.Queue.qsc.client.exec(http.MethodDelete, uri, headers, nil, m.Queue.qsc.auth)
150 if err != nil {
151 return err
152 }
153 defer drainRespBody(resp)
154 return checkRespCode(resp, []int{http.StatusNoContent})
155 }
156
157 type putMessageRequest struct {
158 XMLName xml.Name `xml:"QueueMessage"`
159 MessageText string `xml:"MessageText"`
160 }
161
View as plain text