...
1
2
3
4
5
6 package par
7
8 import (
9 "math/rand"
10 "sync"
11 "sync/atomic"
12 )
13
14
15
16 type Work struct {
17 f func(interface{})
18 running int
19
20 mu sync.Mutex
21 added map[interface{}]bool
22 todo []interface{}
23 wait sync.Cond
24 waiting int
25 }
26
27 func (w *Work) init() {
28 if w.added == nil {
29 w.added = make(map[interface{}]bool)
30 }
31 }
32
33
34 func (w *Work) Add(item interface{}) {
35 w.mu.Lock()
36 w.init()
37 if !w.added[item] {
38 w.added[item] = true
39 w.todo = append(w.todo, item)
40 if w.waiting > 0 {
41 w.wait.Signal()
42 }
43 }
44 w.mu.Unlock()
45 }
46
47
48
49
50
51
52
53
54 func (w *Work) Do(n int, f func(item interface{})) {
55 if n < 1 {
56 panic("par.Work.Do: n < 1")
57 }
58 if w.running >= 1 {
59 panic("par.Work.Do: already called Do")
60 }
61
62 w.running = n
63 w.f = f
64 w.wait.L = &w.mu
65
66 for i := 0; i < n-1; i++ {
67 go w.runner()
68 }
69 w.runner()
70 }
71
72
73
74
75 func (w *Work) runner() {
76 for {
77
78 w.mu.Lock()
79 for len(w.todo) == 0 {
80 w.waiting++
81 if w.waiting == w.running {
82
83 w.wait.Broadcast()
84 w.mu.Unlock()
85 return
86 }
87 w.wait.Wait()
88 w.waiting--
89 }
90
91
92
93
94
95 i := rand.Intn(len(w.todo))
96 item := w.todo[i]
97 w.todo[i] = w.todo[len(w.todo)-1]
98 w.todo = w.todo[:len(w.todo)-1]
99 w.mu.Unlock()
100
101 w.f(item)
102 }
103 }
104
105
106 type Cache struct {
107 m sync.Map
108 }
109
110 type cacheEntry struct {
111 done uint32
112 mu sync.Mutex
113 result interface{}
114 }
115
116
117
118
119 func (c *Cache) Do(key interface{}, f func() interface{}) interface{} {
120 entryIface, ok := c.m.Load(key)
121 if !ok {
122 entryIface, _ = c.m.LoadOrStore(key, new(cacheEntry))
123 }
124 e := entryIface.(*cacheEntry)
125 if atomic.LoadUint32(&e.done) == 0 {
126 e.mu.Lock()
127 if atomic.LoadUint32(&e.done) == 0 {
128 e.result = f()
129 atomic.StoreUint32(&e.done, 1)
130 }
131 e.mu.Unlock()
132 }
133 return e.result
134 }
135
136
137
138
139 func (c *Cache) Get(key interface{}) interface{} {
140 entryIface, ok := c.m.Load(key)
141 if !ok {
142 return nil
143 }
144 e := entryIface.(*cacheEntry)
145 if atomic.LoadUint32(&e.done) == 0 {
146 return nil
147 }
148 return e.result
149 }
150
View as plain text