...
1 package diodes
2
3 import (
4 "context"
5 "sync"
6 )
7
8
9
10 type Waiter struct {
11 Diode
12 mu sync.Mutex
13 c *sync.Cond
14 ctx context.Context
15 }
16
17
18 type WaiterConfigOption func(*Waiter)
19
20
21
22
23 func WithWaiterContext(ctx context.Context) WaiterConfigOption {
24 return func(c *Waiter) {
25 c.ctx = ctx
26 }
27 }
28
29
30 func NewWaiter(d Diode, opts ...WaiterConfigOption) *Waiter {
31 w := new(Waiter)
32 w.Diode = d
33 w.c = sync.NewCond(&w.mu)
34 w.ctx = context.Background()
35
36 for _, opt := range opts {
37 opt(w)
38 }
39
40 go func() {
41 <-w.ctx.Done()
42 w.c.Broadcast()
43 }()
44
45 return w
46 }
47
48
49
50 func (w *Waiter) Set(data GenericDataType) {
51 w.Diode.Set(data)
52 w.c.Broadcast()
53 }
54
55
56
57
58 func (w *Waiter) Next() GenericDataType {
59 w.mu.Lock()
60 defer w.mu.Unlock()
61
62 for {
63 data, ok := w.Diode.TryNext()
64 if !ok {
65 if w.isDone() {
66 return nil
67 }
68
69 w.c.Wait()
70 continue
71 }
72 return data
73 }
74 }
75
76 func (w *Waiter) isDone() bool {
77 select {
78 case <-w.ctx.Done():
79 return true
80 default:
81 return false
82 }
83 }
84
View as plain text