...

Source file src/github.com/Microsoft/hcsshim/internal/queue/mq.go

Documentation: github.com/Microsoft/hcsshim/internal/queue

     1  package queue
     2  
     3  import (
     4  	"errors"
     5  	"sync"
     6  )
     7  
     8  var ErrQueueClosed = errors.New("the queue is closed for reading and writing")
     9  
    10  // MessageQueue represents a threadsafe message queue to be used to retrieve or
    11  // write messages to.
    12  type MessageQueue struct {
    13  	m        *sync.RWMutex
    14  	c        *sync.Cond
    15  	messages []interface{}
    16  	closed   bool
    17  }
    18  
    19  // NewMessageQueue returns a new MessageQueue.
    20  func NewMessageQueue() *MessageQueue {
    21  	m := &sync.RWMutex{}
    22  	return &MessageQueue{
    23  		m:        m,
    24  		c:        sync.NewCond(m),
    25  		messages: []interface{}{},
    26  	}
    27  }
    28  
    29  // Enqueue writes `msg` to the queue.
    30  func (mq *MessageQueue) Enqueue(msg interface{}) error {
    31  	mq.m.Lock()
    32  	defer mq.m.Unlock()
    33  
    34  	if mq.closed {
    35  		return ErrQueueClosed
    36  	}
    37  	mq.messages = append(mq.messages, msg)
    38  	// Signal a waiter that there is now a value available in the queue.
    39  	mq.c.Signal()
    40  	return nil
    41  }
    42  
    43  // Dequeue will read a value from the queue and remove it. If the queue
    44  // is empty, this will block until the queue is closed or a value gets enqueued.
    45  func (mq *MessageQueue) Dequeue() (interface{}, error) {
    46  	mq.m.Lock()
    47  	defer mq.m.Unlock()
    48  
    49  	for !mq.closed && mq.size() == 0 {
    50  		mq.c.Wait()
    51  	}
    52  
    53  	// We got woken up, check if it's because the queue got closed.
    54  	if mq.closed {
    55  		return nil, ErrQueueClosed
    56  	}
    57  
    58  	val := mq.messages[0]
    59  	mq.messages[0] = nil
    60  	mq.messages = mq.messages[1:]
    61  	return val, nil
    62  }
    63  
    64  // Size returns the size of the queue.
    65  func (mq *MessageQueue) Size() int {
    66  	mq.m.RLock()
    67  	defer mq.m.RUnlock()
    68  	return mq.size()
    69  }
    70  
    71  // Nonexported size check to check if the queue is empty inside already locked functions.
    72  func (mq *MessageQueue) size() int {
    73  	return len(mq.messages)
    74  }
    75  
    76  // Close closes the queue for future writes or reads. Any attempts to read or write from the
    77  // queue after close will return ErrQueueClosed. This is safe to call multiple times.
    78  func (mq *MessageQueue) Close() {
    79  	mq.m.Lock()
    80  	defer mq.m.Unlock()
    81  
    82  	// Already closed, noop
    83  	if mq.closed {
    84  		return
    85  	}
    86  
    87  	mq.messages = nil
    88  	mq.closed = true
    89  	// If there's anybody currently waiting on a value from Dequeue, we need to
    90  	// broadcast so the read(s) can return ErrQueueClosed.
    91  	mq.c.Broadcast()
    92  }
    93  

View as plain text