1 package storage
2
3
4
5
6 import (
7 "bytes"
8 "encoding/base64"
9 "encoding/json"
10 "errors"
11 "fmt"
12 "io/ioutil"
13 "net/http"
14 "net/url"
15 "strconv"
16 "strings"
17 "time"
18
19 "github.com/gofrs/uuid"
20 )
21
22
23
24 const (
25 partitionKeyNode = "PartitionKey"
26 rowKeyNode = "RowKey"
27 etagErrorTemplate = "Etag didn't match: %v"
28 )
29
30 var (
31 errEmptyPayload = errors.New("Empty payload is not a valid metadata level for this operation")
32 errNilPreviousResult = errors.New("The previous results page is nil")
33 errNilNextLink = errors.New("There are no more pages in this query results")
34 )
35
36
37 type Entity struct {
38 Table *Table
39 PartitionKey string
40 RowKey string
41 TimeStamp time.Time
42 OdataMetadata string
43 OdataType string
44 OdataID string
45 OdataEtag string
46 OdataEditLink string
47 Properties map[string]interface{}
48 }
49
50
51
52 func (t *Table) GetEntityReference(partitionKey, rowKey string) *Entity {
53 return &Entity{
54 PartitionKey: partitionKey,
55 RowKey: rowKey,
56 Table: t,
57 }
58 }
59
60
61 type EntityOptions struct {
62 Timeout uint
63 RequestID string `header:"x-ms-client-request-id"`
64 }
65
66
67 type GetEntityOptions struct {
68 Select []string
69 RequestID string `header:"x-ms-client-request-id"`
70 }
71
72
73
74
75
76
77 func (e *Entity) Get(timeout uint, ml MetadataLevel, options *GetEntityOptions) error {
78 if ml == EmptyPayload {
79 return errEmptyPayload
80 }
81
82
83 rk := e.RowKey
84 pk := e.PartitionKey
85
86 query := url.Values{
87 "timeout": {strconv.FormatUint(uint64(timeout), 10)},
88 }
89 headers := e.Table.tsc.client.getStandardHeaders()
90 headers[headerAccept] = string(ml)
91
92 if options != nil {
93 if len(options.Select) > 0 {
94 query.Add("$select", strings.Join(options.Select, ","))
95 }
96 headers = mergeHeaders(headers, headersFromStruct(*options))
97 }
98
99 uri := e.Table.tsc.client.getEndpoint(tableServiceName, e.buildPath(), query)
100 resp, err := e.Table.tsc.client.exec(http.MethodGet, uri, headers, nil, e.Table.tsc.auth)
101 if err != nil {
102 return err
103 }
104 defer drainRespBody(resp)
105
106 if err = checkRespCode(resp, []int{http.StatusOK}); err != nil {
107 return err
108 }
109
110 respBody, err := ioutil.ReadAll(resp.Body)
111 if err != nil {
112 return err
113 }
114 err = json.Unmarshal(respBody, e)
115 if err != nil {
116 return err
117 }
118 e.PartitionKey = pk
119 e.RowKey = rk
120
121 return nil
122 }
123
124
125
126
127
128
129
130 func (e *Entity) Insert(ml MetadataLevel, options *EntityOptions) error {
131 query, headers := options.getParameters()
132 headers = mergeHeaders(headers, e.Table.tsc.client.getStandardHeaders())
133
134 body, err := json.Marshal(e)
135 if err != nil {
136 return err
137 }
138 headers = addBodyRelatedHeaders(headers, len(body))
139 headers = addReturnContentHeaders(headers, ml)
140
141 uri := e.Table.tsc.client.getEndpoint(tableServiceName, e.Table.buildPath(), query)
142 resp, err := e.Table.tsc.client.exec(http.MethodPost, uri, headers, bytes.NewReader(body), e.Table.tsc.auth)
143 if err != nil {
144 return err
145 }
146 defer drainRespBody(resp)
147
148 if ml != EmptyPayload {
149 if err = checkRespCode(resp, []int{http.StatusCreated}); err != nil {
150 return err
151 }
152 data, err := ioutil.ReadAll(resp.Body)
153 if err != nil {
154 return err
155 }
156 if err = e.UnmarshalJSON(data); err != nil {
157 return err
158 }
159 } else {
160 if err = checkRespCode(resp, []int{http.StatusNoContent}); err != nil {
161 return err
162 }
163 }
164
165 return nil
166 }
167
168
169
170
171
172 func (e *Entity) Update(force bool, options *EntityOptions) error {
173 return e.updateMerge(force, http.MethodPut, options)
174 }
175
176
177
178
179
180
181 func (e *Entity) Merge(force bool, options *EntityOptions) error {
182 return e.updateMerge(force, "MERGE", options)
183 }
184
185
186
187
188
189 func (e *Entity) Delete(force bool, options *EntityOptions) error {
190 query, headers := options.getParameters()
191 headers = mergeHeaders(headers, e.Table.tsc.client.getStandardHeaders())
192
193 headers = addIfMatchHeader(headers, force, e.OdataEtag)
194 headers = addReturnContentHeaders(headers, EmptyPayload)
195
196 uri := e.Table.tsc.client.getEndpoint(tableServiceName, e.buildPath(), query)
197 resp, err := e.Table.tsc.client.exec(http.MethodDelete, uri, headers, nil, e.Table.tsc.auth)
198 if err != nil {
199 if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
200 return fmt.Errorf(etagErrorTemplate, err)
201 }
202 return err
203 }
204 defer drainRespBody(resp)
205
206 if err = checkRespCode(resp, []int{http.StatusNoContent}); err != nil {
207 return err
208 }
209
210 return e.updateTimestamp(resp.Header)
211 }
212
213
214
215 func (e *Entity) InsertOrReplace(options *EntityOptions) error {
216 return e.insertOr(http.MethodPut, options)
217 }
218
219
220
221 func (e *Entity) InsertOrMerge(options *EntityOptions) error {
222 return e.insertOr("MERGE", options)
223 }
224
225 func (e *Entity) buildPath() string {
226 return fmt.Sprintf("%s(PartitionKey='%s',RowKey='%s')", e.Table.buildPath(), e.PartitionKey, e.RowKey)
227 }
228
229
230 func (e *Entity) MarshalJSON() ([]byte, error) {
231 completeMap := map[string]interface{}{}
232 completeMap[partitionKeyNode] = e.PartitionKey
233 completeMap[rowKeyNode] = e.RowKey
234 for k, v := range e.Properties {
235 typeKey := strings.Join([]string{k, OdataTypeSuffix}, "")
236 switch t := v.(type) {
237 case []byte:
238 completeMap[typeKey] = OdataBinary
239 completeMap[k] = t
240 case time.Time:
241 completeMap[typeKey] = OdataDateTime
242 completeMap[k] = t.Format(time.RFC3339Nano)
243 case uuid.UUID:
244 completeMap[typeKey] = OdataGUID
245 completeMap[k] = t.String()
246 case int64:
247 completeMap[typeKey] = OdataInt64
248 completeMap[k] = fmt.Sprintf("%v", v)
249 case float32, float64:
250 completeMap[typeKey] = OdataDouble
251 completeMap[k] = fmt.Sprintf("%v", v)
252 default:
253 completeMap[k] = v
254 }
255 if strings.HasSuffix(k, OdataTypeSuffix) {
256 if !(completeMap[k] == OdataBinary ||
257 completeMap[k] == OdataDateTime ||
258 completeMap[k] == OdataGUID ||
259 completeMap[k] == OdataInt64 ||
260 completeMap[k] == OdataDouble) {
261 return nil, fmt.Errorf("Odata.type annotation %v value is not valid", k)
262 }
263 valueKey := strings.TrimSuffix(k, OdataTypeSuffix)
264 if _, ok := completeMap[valueKey]; !ok {
265 return nil, fmt.Errorf("Odata.type annotation %v defined without value defined", k)
266 }
267 }
268 }
269 return json.Marshal(completeMap)
270 }
271
272
273 func (e *Entity) UnmarshalJSON(data []byte) error {
274 errorTemplate := "Deserializing error: %v"
275
276 props := map[string]interface{}{}
277 err := json.Unmarshal(data, &props)
278 if err != nil {
279 return err
280 }
281
282
283 e.OdataMetadata = stringFromMap(props, "odata.metadata")
284 e.OdataType = stringFromMap(props, "odata.type")
285 e.OdataID = stringFromMap(props, "odata.id")
286 e.OdataEtag = stringFromMap(props, "odata.etag")
287 e.OdataEditLink = stringFromMap(props, "odata.editLink")
288 e.PartitionKey = stringFromMap(props, partitionKeyNode)
289 e.RowKey = stringFromMap(props, rowKeyNode)
290
291
292 timeStamp, ok := props["Timestamp"]
293 if ok {
294 str, ok := timeStamp.(string)
295 if !ok {
296 return fmt.Errorf(errorTemplate, "Timestamp casting error")
297 }
298 t, err := time.Parse(time.RFC3339Nano, str)
299 if err != nil {
300 return fmt.Errorf(errorTemplate, err)
301 }
302 e.TimeStamp = t
303 }
304 delete(props, "Timestamp")
305 delete(props, "Timestamp@odata.type")
306
307
308 for k, v := range props {
309 if strings.HasSuffix(k, OdataTypeSuffix) {
310 valueKey := strings.TrimSuffix(k, OdataTypeSuffix)
311 str, ok := props[valueKey].(string)
312 if !ok {
313 return fmt.Errorf(errorTemplate, fmt.Sprintf("%v casting error", v))
314 }
315 switch v {
316 case OdataBinary:
317 props[valueKey], err = base64.StdEncoding.DecodeString(str)
318 if err != nil {
319 return fmt.Errorf(errorTemplate, err)
320 }
321 case OdataDateTime:
322 t, err := time.Parse("2006-01-02T15:04:05Z", str)
323 if err != nil {
324 return fmt.Errorf(errorTemplate, err)
325 }
326 props[valueKey] = t
327 case OdataGUID:
328 props[valueKey] = uuid.FromStringOrNil(str)
329 case OdataInt64:
330 i, err := strconv.ParseInt(str, 10, 64)
331 if err != nil {
332 return fmt.Errorf(errorTemplate, err)
333 }
334 props[valueKey] = i
335 case OdataDouble:
336 f, err := strconv.ParseFloat(str, 64)
337 if err != nil {
338 return fmt.Errorf(errorTemplate, err)
339 }
340 props[valueKey] = f
341 default:
342 return fmt.Errorf(errorTemplate, fmt.Sprintf("%v is not supported", v))
343 }
344 delete(props, k)
345 }
346 }
347
348 e.Properties = props
349 return nil
350 }
351
352 func getAndDelete(props map[string]interface{}, key string) interface{} {
353 if value, ok := props[key]; ok {
354 delete(props, key)
355 return value
356 }
357 return nil
358 }
359
360 func addIfMatchHeader(h map[string]string, force bool, etag string) map[string]string {
361 if force {
362 h[headerIfMatch] = "*"
363 } else {
364 h[headerIfMatch] = etag
365 }
366 return h
367 }
368
369
370 func (e *Entity) updateEtagAndTimestamp(headers http.Header) error {
371 e.OdataEtag = headers.Get(headerEtag)
372 return e.updateTimestamp(headers)
373 }
374
375 func (e *Entity) updateTimestamp(headers http.Header) error {
376 str := headers.Get(headerDate)
377 t, err := time.Parse(time.RFC1123, str)
378 if err != nil {
379 return fmt.Errorf("Update timestamp error: %v", err)
380 }
381 e.TimeStamp = t
382 return nil
383 }
384
385 func (e *Entity) insertOr(verb string, options *EntityOptions) error {
386 query, headers := options.getParameters()
387 headers = mergeHeaders(headers, e.Table.tsc.client.getStandardHeaders())
388
389 body, err := json.Marshal(e)
390 if err != nil {
391 return err
392 }
393 headers = addBodyRelatedHeaders(headers, len(body))
394 headers = addReturnContentHeaders(headers, EmptyPayload)
395
396 uri := e.Table.tsc.client.getEndpoint(tableServiceName, e.buildPath(), query)
397 resp, err := e.Table.tsc.client.exec(verb, uri, headers, bytes.NewReader(body), e.Table.tsc.auth)
398 if err != nil {
399 return err
400 }
401 defer drainRespBody(resp)
402
403 if err = checkRespCode(resp, []int{http.StatusNoContent}); err != nil {
404 return err
405 }
406
407 return e.updateEtagAndTimestamp(resp.Header)
408 }
409
410 func (e *Entity) updateMerge(force bool, verb string, options *EntityOptions) error {
411 query, headers := options.getParameters()
412 headers = mergeHeaders(headers, e.Table.tsc.client.getStandardHeaders())
413
414 body, err := json.Marshal(e)
415 if err != nil {
416 return err
417 }
418 headers = addBodyRelatedHeaders(headers, len(body))
419 headers = addIfMatchHeader(headers, force, e.OdataEtag)
420 headers = addReturnContentHeaders(headers, EmptyPayload)
421
422 uri := e.Table.tsc.client.getEndpoint(tableServiceName, e.buildPath(), query)
423 resp, err := e.Table.tsc.client.exec(verb, uri, headers, bytes.NewReader(body), e.Table.tsc.auth)
424 if err != nil {
425 if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
426 return fmt.Errorf(etagErrorTemplate, err)
427 }
428 return err
429 }
430 defer drainRespBody(resp)
431
432 if err = checkRespCode(resp, []int{http.StatusNoContent}); err != nil {
433 return err
434 }
435
436 return e.updateEtagAndTimestamp(resp.Header)
437 }
438
439 func stringFromMap(props map[string]interface{}, key string) string {
440 value := getAndDelete(props, key)
441 if value != nil {
442 return value.(string)
443 }
444 return ""
445 }
446
447 func (options *EntityOptions) getParameters() (url.Values, map[string]string) {
448 query := url.Values{}
449 headers := map[string]string{}
450 if options != nil {
451 query = addTimeout(query, options.Timeout)
452 headers = headersFromStruct(*options)
453 }
454 return query, headers
455 }
456
View as plain text