...

Source file src/github.com/Azure/azure-sdk-for-go/storage/queue.go

Documentation: github.com/Azure/azure-sdk-for-go/storage

     1  package storage
     2  
     3  // Copyright (c) Microsoft Corporation. All rights reserved.
     4  // Licensed under the MIT License. See License.txt in the project root for license information.
     5  
     6  import (
     7  	"encoding/xml"
     8  	"fmt"
     9  	"io"
    10  	"net/http"
    11  	"net/url"
    12  	"strconv"
    13  	"time"
    14  )
    15  
    16  const (
    17  	// casing is per Golang's http.Header canonicalizing the header names.
    18  	approximateMessagesCountHeader = "X-Ms-Approximate-Messages-Count"
    19  )
    20  
    21  // QueueAccessPolicy represents each access policy in the queue ACL.
    22  type QueueAccessPolicy struct {
    23  	ID         string
    24  	StartTime  time.Time
    25  	ExpiryTime time.Time
    26  	CanRead    bool
    27  	CanAdd     bool
    28  	CanUpdate  bool
    29  	CanProcess bool
    30  }
    31  
    32  // QueuePermissions represents the queue ACLs.
    33  type QueuePermissions struct {
    34  	AccessPolicies []QueueAccessPolicy
    35  }
    36  
    37  // SetQueuePermissionOptions includes options for a set queue permissions operation
    38  type SetQueuePermissionOptions struct {
    39  	Timeout   uint
    40  	RequestID string `header:"x-ms-client-request-id"`
    41  }
    42  
    43  // Queue represents an Azure queue.
    44  type Queue struct {
    45  	qsc               *QueueServiceClient
    46  	Name              string
    47  	Metadata          map[string]string
    48  	AproxMessageCount uint64
    49  }
    50  
    51  func (q *Queue) buildPath() string {
    52  	return fmt.Sprintf("/%s", q.Name)
    53  }
    54  
    55  func (q *Queue) buildPathMessages() string {
    56  	return fmt.Sprintf("%s/messages", q.buildPath())
    57  }
    58  
    59  // QueueServiceOptions includes options for some queue service operations
    60  type QueueServiceOptions struct {
    61  	Timeout   uint
    62  	RequestID string `header:"x-ms-client-request-id"`
    63  }
    64  
    65  // Create operation creates a queue under the given account.
    66  //
    67  // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Create-Queue4
    68  func (q *Queue) Create(options *QueueServiceOptions) error {
    69  	params := url.Values{}
    70  	headers := q.qsc.client.getStandardHeaders()
    71  	headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata)
    72  
    73  	if options != nil {
    74  		params = addTimeout(params, options.Timeout)
    75  		headers = mergeHeaders(headers, headersFromStruct(*options))
    76  	}
    77  	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
    78  
    79  	resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth)
    80  	if err != nil {
    81  		return err
    82  	}
    83  	defer drainRespBody(resp)
    84  	return checkRespCode(resp, []int{http.StatusCreated})
    85  }
    86  
    87  // Delete operation permanently deletes the specified queue.
    88  //
    89  // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Delete-Queue3
    90  func (q *Queue) Delete(options *QueueServiceOptions) error {
    91  	params := url.Values{}
    92  	headers := q.qsc.client.getStandardHeaders()
    93  
    94  	if options != nil {
    95  		params = addTimeout(params, options.Timeout)
    96  		headers = mergeHeaders(headers, headersFromStruct(*options))
    97  	}
    98  	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
    99  	resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth)
   100  	if err != nil {
   101  		return err
   102  	}
   103  	defer drainRespBody(resp)
   104  	return checkRespCode(resp, []int{http.StatusNoContent})
   105  }
   106  
   107  // Exists returns true if a queue with given name exists.
   108  func (q *Queue) Exists() (bool, error) {
   109  	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), url.Values{"comp": {"metadata"}})
   110  	resp, err := q.qsc.client.exec(http.MethodGet, uri, q.qsc.client.getStandardHeaders(), nil, q.qsc.auth)
   111  	if resp != nil {
   112  		defer drainRespBody(resp)
   113  		if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNotFound {
   114  			return resp.StatusCode == http.StatusOK, nil
   115  		}
   116  		err = getErrorFromResponse(resp)
   117  	}
   118  	return false, err
   119  }
   120  
   121  // SetMetadata operation sets user-defined metadata on the specified queue.
   122  // Metadata is associated with the queue as name-value pairs.
   123  //
   124  // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata
   125  func (q *Queue) SetMetadata(options *QueueServiceOptions) error {
   126  	params := url.Values{"comp": {"metadata"}}
   127  	headers := q.qsc.client.getStandardHeaders()
   128  	headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata)
   129  
   130  	if options != nil {
   131  		params = addTimeout(params, options.Timeout)
   132  		headers = mergeHeaders(headers, headersFromStruct(*options))
   133  	}
   134  	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
   135  
   136  	resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth)
   137  	if err != nil {
   138  		return err
   139  	}
   140  	defer drainRespBody(resp)
   141  	return checkRespCode(resp, []int{http.StatusNoContent})
   142  }
   143  
   144  // GetMetadata operation retrieves user-defined metadata and queue
   145  // properties on the specified queue. Metadata is associated with
   146  // the queue as name-values pairs.
   147  //
   148  // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata
   149  //
   150  // Because the way Golang's http client (and http.Header in particular)
   151  // canonicalize header names, the returned metadata names would always
   152  // be all lower case.
   153  func (q *Queue) GetMetadata(options *QueueServiceOptions) error {
   154  	params := url.Values{"comp": {"metadata"}}
   155  	headers := q.qsc.client.getStandardHeaders()
   156  
   157  	if options != nil {
   158  		params = addTimeout(params, options.Timeout)
   159  		headers = mergeHeaders(headers, headersFromStruct(*options))
   160  	}
   161  	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
   162  
   163  	resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
   164  	if err != nil {
   165  		return err
   166  	}
   167  	defer drainRespBody(resp)
   168  
   169  	if err := checkRespCode(resp, []int{http.StatusOK}); err != nil {
   170  		return err
   171  	}
   172  
   173  	aproxMessagesStr := resp.Header.Get(http.CanonicalHeaderKey(approximateMessagesCountHeader))
   174  	if aproxMessagesStr != "" {
   175  		aproxMessages, err := strconv.ParseUint(aproxMessagesStr, 10, 64)
   176  		if err != nil {
   177  			return err
   178  		}
   179  		q.AproxMessageCount = aproxMessages
   180  	}
   181  
   182  	q.Metadata = getMetadataFromHeaders(resp.Header)
   183  	return nil
   184  }
   185  
   186  // GetMessageReference returns a message object with the specified text.
   187  func (q *Queue) GetMessageReference(text string) *Message {
   188  	return &Message{
   189  		Queue: q,
   190  		Text:  text,
   191  	}
   192  }
   193  
   194  // GetMessagesOptions is the set of options can be specified for Get
   195  // Messsages operation. A zero struct does not use any preferences for the
   196  // request.
   197  type GetMessagesOptions struct {
   198  	Timeout           uint
   199  	NumOfMessages     int
   200  	VisibilityTimeout int
   201  	RequestID         string `header:"x-ms-client-request-id"`
   202  }
   203  
   204  type messages struct {
   205  	XMLName  xml.Name  `xml:"QueueMessagesList"`
   206  	Messages []Message `xml:"QueueMessage"`
   207  }
   208  
   209  // GetMessages operation retrieves one or more messages from the front of the
   210  // queue.
   211  //
   212  // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Get-Messages
   213  func (q *Queue) GetMessages(options *GetMessagesOptions) ([]Message, error) {
   214  	query := url.Values{}
   215  	headers := q.qsc.client.getStandardHeaders()
   216  
   217  	if options != nil {
   218  		if options.NumOfMessages != 0 {
   219  			query.Set("numofmessages", strconv.Itoa(options.NumOfMessages))
   220  		}
   221  		if options.VisibilityTimeout != 0 {
   222  			query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
   223  		}
   224  		query = addTimeout(query, options.Timeout)
   225  		headers = mergeHeaders(headers, headersFromStruct(*options))
   226  	}
   227  	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query)
   228  
   229  	resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
   230  	if err != nil {
   231  		return []Message{}, err
   232  	}
   233  	defer resp.Body.Close()
   234  
   235  	var out messages
   236  	err = xmlUnmarshal(resp.Body, &out)
   237  	if err != nil {
   238  		return []Message{}, err
   239  	}
   240  	for i := range out.Messages {
   241  		out.Messages[i].Queue = q
   242  	}
   243  	return out.Messages, err
   244  }
   245  
   246  // PeekMessagesOptions is the set of options can be specified for Peek
   247  // Messsage operation. A zero struct does not use any preferences for the
   248  // request.
   249  type PeekMessagesOptions struct {
   250  	Timeout       uint
   251  	NumOfMessages int
   252  	RequestID     string `header:"x-ms-client-request-id"`
   253  }
   254  
   255  // PeekMessages retrieves one or more messages from the front of the queue, but
   256  // does not alter the visibility of the message.
   257  //
   258  // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Peek-Messages
   259  func (q *Queue) PeekMessages(options *PeekMessagesOptions) ([]Message, error) {
   260  	query := url.Values{"peekonly": {"true"}} // Required for peek operation
   261  	headers := q.qsc.client.getStandardHeaders()
   262  
   263  	if options != nil {
   264  		if options.NumOfMessages != 0 {
   265  			query.Set("numofmessages", strconv.Itoa(options.NumOfMessages))
   266  		}
   267  		query = addTimeout(query, options.Timeout)
   268  		headers = mergeHeaders(headers, headersFromStruct(*options))
   269  	}
   270  	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query)
   271  
   272  	resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
   273  	if err != nil {
   274  		return []Message{}, err
   275  	}
   276  	defer resp.Body.Close()
   277  
   278  	var out messages
   279  	err = xmlUnmarshal(resp.Body, &out)
   280  	if err != nil {
   281  		return []Message{}, err
   282  	}
   283  	for i := range out.Messages {
   284  		out.Messages[i].Queue = q
   285  	}
   286  	return out.Messages, err
   287  }
   288  
   289  // ClearMessages operation deletes all messages from the specified queue.
   290  //
   291  // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Clear-Messages
   292  func (q *Queue) ClearMessages(options *QueueServiceOptions) error {
   293  	params := url.Values{}
   294  	headers := q.qsc.client.getStandardHeaders()
   295  
   296  	if options != nil {
   297  		params = addTimeout(params, options.Timeout)
   298  		headers = mergeHeaders(headers, headersFromStruct(*options))
   299  	}
   300  	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), params)
   301  
   302  	resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth)
   303  	if err != nil {
   304  		return err
   305  	}
   306  	defer drainRespBody(resp)
   307  	return checkRespCode(resp, []int{http.StatusNoContent})
   308  }
   309  
   310  // SetPermissions sets up queue permissions
   311  // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/set-queue-acl
   312  func (q *Queue) SetPermissions(permissions QueuePermissions, options *SetQueuePermissionOptions) error {
   313  	body, length, err := generateQueueACLpayload(permissions.AccessPolicies)
   314  	if err != nil {
   315  		return err
   316  	}
   317  
   318  	params := url.Values{
   319  		"comp": {"acl"},
   320  	}
   321  	headers := q.qsc.client.getStandardHeaders()
   322  	headers["Content-Length"] = strconv.Itoa(length)
   323  
   324  	if options != nil {
   325  		params = addTimeout(params, options.Timeout)
   326  		headers = mergeHeaders(headers, headersFromStruct(*options))
   327  	}
   328  	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
   329  	resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, body, q.qsc.auth)
   330  	if err != nil {
   331  		return err
   332  	}
   333  	defer drainRespBody(resp)
   334  	return checkRespCode(resp, []int{http.StatusNoContent})
   335  }
   336  
   337  func generateQueueACLpayload(policies []QueueAccessPolicy) (io.Reader, int, error) {
   338  	sil := SignedIdentifiers{
   339  		SignedIdentifiers: []SignedIdentifier{},
   340  	}
   341  	for _, qapd := range policies {
   342  		permission := qapd.generateQueuePermissions()
   343  		signedIdentifier := convertAccessPolicyToXMLStructs(qapd.ID, qapd.StartTime, qapd.ExpiryTime, permission)
   344  		sil.SignedIdentifiers = append(sil.SignedIdentifiers, signedIdentifier)
   345  	}
   346  	return xmlMarshal(sil)
   347  }
   348  
   349  func (qapd *QueueAccessPolicy) generateQueuePermissions() (permissions string) {
   350  	// generate the permissions string (raup).
   351  	// still want the end user API to have bool flags.
   352  	permissions = ""
   353  
   354  	if qapd.CanRead {
   355  		permissions += "r"
   356  	}
   357  
   358  	if qapd.CanAdd {
   359  		permissions += "a"
   360  	}
   361  
   362  	if qapd.CanUpdate {
   363  		permissions += "u"
   364  	}
   365  
   366  	if qapd.CanProcess {
   367  		permissions += "p"
   368  	}
   369  
   370  	return permissions
   371  }
   372  
   373  // GetQueuePermissionOptions includes options for a get queue permissions operation
   374  type GetQueuePermissionOptions struct {
   375  	Timeout   uint
   376  	RequestID string `header:"x-ms-client-request-id"`
   377  }
   378  
   379  // GetPermissions gets the queue permissions as per https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/get-queue-acl
   380  // If timeout is 0 then it will not be passed to Azure
   381  func (q *Queue) GetPermissions(options *GetQueuePermissionOptions) (*QueuePermissions, error) {
   382  	params := url.Values{
   383  		"comp": {"acl"},
   384  	}
   385  	headers := q.qsc.client.getStandardHeaders()
   386  
   387  	if options != nil {
   388  		params = addTimeout(params, options.Timeout)
   389  		headers = mergeHeaders(headers, headersFromStruct(*options))
   390  	}
   391  	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
   392  	resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
   393  	if err != nil {
   394  		return nil, err
   395  	}
   396  	defer resp.Body.Close()
   397  
   398  	var ap AccessPolicy
   399  	err = xmlUnmarshal(resp.Body, &ap.SignedIdentifiersList)
   400  	if err != nil {
   401  		return nil, err
   402  	}
   403  	return buildQueueAccessPolicy(ap, &resp.Header), nil
   404  }
   405  
   406  func buildQueueAccessPolicy(ap AccessPolicy, headers *http.Header) *QueuePermissions {
   407  	permissions := QueuePermissions{
   408  		AccessPolicies: []QueueAccessPolicy{},
   409  	}
   410  
   411  	for _, policy := range ap.SignedIdentifiersList.SignedIdentifiers {
   412  		qapd := QueueAccessPolicy{
   413  			ID:         policy.ID,
   414  			StartTime:  policy.AccessPolicy.StartTime,
   415  			ExpiryTime: policy.AccessPolicy.ExpiryTime,
   416  		}
   417  		qapd.CanRead = updatePermissions(policy.AccessPolicy.Permission, "r")
   418  		qapd.CanAdd = updatePermissions(policy.AccessPolicy.Permission, "a")
   419  		qapd.CanUpdate = updatePermissions(policy.AccessPolicy.Permission, "u")
   420  		qapd.CanProcess = updatePermissions(policy.AccessPolicy.Permission, "p")
   421  
   422  		permissions.AccessPolicies = append(permissions.AccessPolicies, qapd)
   423  	}
   424  	return &permissions
   425  }
   426  

View as plain text