...
1 package events
2
3 import (
4 "container/list"
5 "sync"
6
7 "github.com/sirupsen/logrus"
8 )
9
10
11
12
13 type Queue struct {
14 dst Sink
15 events *list.List
16 cond *sync.Cond
17 mu sync.Mutex
18 closed bool
19 }
20
21
22 func NewQueue(dst Sink) *Queue {
23 eq := Queue{
24 dst: dst,
25 events: list.New(),
26 }
27
28 eq.cond = sync.NewCond(&eq.mu)
29 go eq.run()
30 return &eq
31 }
32
33
34
35 func (eq *Queue) Write(event Event) error {
36 eq.mu.Lock()
37 defer eq.mu.Unlock()
38
39 if eq.closed {
40 return ErrSinkClosed
41 }
42
43 eq.events.PushBack(event)
44 eq.cond.Signal()
45
46 return nil
47 }
48
49
50 func (eq *Queue) Close() error {
51 eq.mu.Lock()
52 defer eq.mu.Unlock()
53
54 if eq.closed {
55 return nil
56 }
57
58
59 eq.closed = true
60 eq.cond.Signal()
61 eq.cond.Wait()
62 return eq.dst.Close()
63 }
64
65
66 func (eq *Queue) run() {
67 for {
68 event := eq.next()
69
70 if event == nil {
71 return
72 }
73
74 if err := eq.dst.Write(event); err != nil {
75
76
77
78
79
80
81
82 logrus.WithFields(logrus.Fields{
83 "event": event,
84 "sink": eq.dst,
85 }).WithError(err).Debug("eventqueue: dropped event")
86 }
87 }
88 }
89
90
91
92
93 func (eq *Queue) next() Event {
94 eq.mu.Lock()
95 defer eq.mu.Unlock()
96
97 for eq.events.Len() < 1 {
98 if eq.closed {
99 eq.cond.Broadcast()
100 return nil
101 }
102
103 eq.cond.Wait()
104 }
105
106 front := eq.events.Front()
107 block := front.Value.(Event)
108 eq.events.Remove(front)
109
110 return block
111 }
112
View as plain text