...
1 package queue
2
3 import (
4 "errors"
5 "sync"
6 )
7
8 var ErrQueueClosed = errors.New("the queue is closed for reading and writing")
9
10
11
12 type MessageQueue struct {
13 m *sync.RWMutex
14 c *sync.Cond
15 messages []interface{}
16 closed bool
17 }
18
19
20 func NewMessageQueue() *MessageQueue {
21 m := &sync.RWMutex{}
22 return &MessageQueue{
23 m: m,
24 c: sync.NewCond(m),
25 messages: []interface{}{},
26 }
27 }
28
29
30 func (mq *MessageQueue) Enqueue(msg interface{}) error {
31 mq.m.Lock()
32 defer mq.m.Unlock()
33
34 if mq.closed {
35 return ErrQueueClosed
36 }
37 mq.messages = append(mq.messages, msg)
38
39 mq.c.Signal()
40 return nil
41 }
42
43
44
45 func (mq *MessageQueue) Dequeue() (interface{}, error) {
46 mq.m.Lock()
47 defer mq.m.Unlock()
48
49 for !mq.closed && mq.size() == 0 {
50 mq.c.Wait()
51 }
52
53
54 if mq.closed {
55 return nil, ErrQueueClosed
56 }
57
58 val := mq.messages[0]
59 mq.messages[0] = nil
60 mq.messages = mq.messages[1:]
61 return val, nil
62 }
63
64
65 func (mq *MessageQueue) Size() int {
66 mq.m.RLock()
67 defer mq.m.RUnlock()
68 return mq.size()
69 }
70
71
72 func (mq *MessageQueue) size() int {
73 return len(mq.messages)
74 }
75
76
77
78 func (mq *MessageQueue) Close() {
79 mq.m.Lock()
80 defer mq.m.Unlock()
81
82
83 if mq.closed {
84 return
85 }
86
87 mq.messages = nil
88 mq.closed = true
89
90
91 mq.c.Broadcast()
92 }
93
View as plain text