...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package wait
18
19 import (
20 "log"
21 "sync"
22 )
23
24 const (
25
26
27
28 defaultListElementLength = 64
29 )
30
31
32
33 type Wait interface {
34
35
36
37 Register(id uint64) <-chan interface{}
38
39 Trigger(id uint64, x interface{})
40 IsRegistered(id uint64) bool
41 }
42
43 type list struct {
44 e []listElement
45 }
46
47 type listElement struct {
48 l sync.RWMutex
49 m map[uint64]chan interface{}
50 }
51
52
53 func New() Wait {
54 res := list{
55 e: make([]listElement, defaultListElementLength),
56 }
57 for i := 0; i < len(res.e); i++ {
58 res.e[i].m = make(map[uint64]chan interface{})
59 }
60 return &res
61 }
62
63 func (w *list) Register(id uint64) <-chan interface{} {
64 idx := id % defaultListElementLength
65 newCh := make(chan interface{}, 1)
66 w.e[idx].l.Lock()
67 defer w.e[idx].l.Unlock()
68 if _, ok := w.e[idx].m[id]; !ok {
69 w.e[idx].m[id] = newCh
70 } else {
71 log.Panicf("dup id %x", id)
72 }
73 return newCh
74 }
75
76 func (w *list) Trigger(id uint64, x interface{}) {
77 idx := id % defaultListElementLength
78 w.e[idx].l.Lock()
79 ch := w.e[idx].m[id]
80 delete(w.e[idx].m, id)
81 w.e[idx].l.Unlock()
82 if ch != nil {
83 ch <- x
84 close(ch)
85 }
86 }
87
88 func (w *list) IsRegistered(id uint64) bool {
89 idx := id % defaultListElementLength
90 w.e[idx].l.RLock()
91 defer w.e[idx].l.RUnlock()
92 _, ok := w.e[idx].m[id]
93 return ok
94 }
95
96 type waitWithResponse struct {
97 ch <-chan interface{}
98 }
99
100 func NewWithResponse(ch <-chan interface{}) Wait {
101 return &waitWithResponse{ch: ch}
102 }
103
104 func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
105 return w.ch
106 }
107 func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
108 func (w *waitWithResponse) IsRegistered(id uint64) bool {
109 panic("waitWithResponse.IsRegistered() shouldn't be called")
110 }
111
View as plain text