1 package monitor
2
3 import (
4 "context"
5 "sync"
6 "time"
7
8 "github.com/LINBIT/golinstor/client"
9 "github.com/LINBIT/golinstor/devicelayerkind"
10 )
11
12 type resourceState struct {
13 hasQuorum bool
14 isWatched bool
15 }
16
17 type haResources struct {
18 resources map[string]resourceState
19 sync.Mutex
20 }
21
22
23
24
25
26 type LostResourceUser struct {
27 ctx context.Context
28 cancel context.CancelFunc
29 client *client.Client
30 mayPromoteStream *client.DRBDMayPromoteStream
31 haResources haResources
32 initialDelay time.Duration
33 existingDelay time.Duration
34
35 C chan string
36 }
37
38 const (
39 INITIAL_DELAY_DEFAULT = 1 * time.Minute
40 EXISTING_DELAY_DEFAULT = 45 * time.Second
41 )
42
43
44 type Option func(*LostResourceUser) error
45
46
47 func WithDelay(initial, existing time.Duration) Option {
48 return func(ha *LostResourceUser) error {
49 ha.initialDelay = initial
50 ha.existingDelay = existing
51 return nil
52 }
53 }
54
55
56 func NewLostResourceUser(ctx context.Context, client *client.Client, options ...Option) (*LostResourceUser, error) {
57
58 mayPromoteStream, err := client.Events.DRBDPromotion(ctx, "current")
59 if err != nil {
60 return nil, err
61 }
62
63 ctx, cancel := context.WithCancel(ctx)
64
65 lr := &LostResourceUser{
66 ctx: ctx,
67 cancel: cancel,
68 client: client,
69 mayPromoteStream: mayPromoteStream,
70
71 haResources: haResources{
72 resources: make(map[string]resourceState),
73 },
74 initialDelay: INITIAL_DELAY_DEFAULT,
75 existingDelay: EXISTING_DELAY_DEFAULT,
76 C: make(chan string),
77 }
78
79 for _, opt := range options {
80 if err := opt(lr); err != nil {
81 return nil, err
82 }
83 }
84
85 go func() {
86 for {
87 select {
88 case ev, ok := <-lr.mayPromoteStream.Events:
89 if !ok {
90 lr.Stop()
91 close(lr.C)
92 return
93 }
94 if !ev.MayPromote {
95 continue
96 }
97
98 resName := ev.ResourceName
99
100 watch, dur := lr.resShouldWatch(resName)
101 if !watch {
102 continue
103 }
104 go lr.watch(resName, dur)
105 case <-lr.ctx.Done():
106 lr.mayPromoteStream.Close()
107
108 close(lr.C)
109 return
110 }
111 }
112 }()
113
114 return lr, nil
115 }
116
117
118 func (rl *LostResourceUser) Stop() {
119 rl.cancel()
120 }
121
122 func (lr *LostResourceUser) watch(resName string, dur time.Duration) {
123 ticker := time.NewTicker(dur)
124 defer ticker.Stop()
125
126 select {
127 case <-ticker.C:
128 break
129 case <-lr.ctx.Done():
130 return
131 }
132
133
134 ress, err := lr.client.Resources.GetAll(lr.ctx, resName)
135
136 lr.haResources.Lock()
137 defer lr.haResources.Unlock()
138
139 if err == client.NotFoundError {
140
141 delete(lr.haResources.resources, resName)
142 return
143 } else if err != nil {
144 lr.Stop()
145 return
146 }
147
148 oneMayPromote := false
149 for _, r := range ress {
150 if r.LayerObject.Type != devicelayerkind.Drbd {
151 delete(lr.haResources.resources, resName)
152 return
153 }
154 if r.LayerObject.Drbd.MayPromote {
155 oneMayPromote = true
156 break
157 }
158 }
159
160 if oneMayPromote {
161 lr.C <- resName
162 }
163
164 res := lr.haResources.resources[resName]
165
166
167 res.isWatched = false
168 lr.haResources.resources[resName] = res
169 }
170
171 func (ha *LostResourceUser) resHasQuorum(resName string) (bool, error) {
172 rd, err := ha.client.ResourceDefinitions.Get(ha.ctx, resName)
173 if err != nil {
174 return false, err
175 }
176
177 val, ok := rd.Props["DrbdOptions/Resource/quorum"]
178 if !ok || val == "off" {
179 return false, nil
180 }
181
182 return true, nil
183 }
184
185 func (ha *LostResourceUser) resShouldWatch(resName string) (bool, time.Duration) {
186 long, short := ha.initialDelay, ha.existingDelay
187
188 ha.haResources.Lock()
189 defer ha.haResources.Unlock()
190
191 res, ok := ha.haResources.resources[resName]
192
193 if ok {
194 if res.isWatched {
195 return false, 0
196 }
197
198 if !res.hasQuorum {
199 return false, 0
200 }
201
202 res.isWatched = true
203 ha.haResources.resources[resName] = res
204 return true, short
205 }
206
207
208 hasQuorum, err := ha.resHasQuorum(resName)
209 if err != nil {
210
211 return false, 0
212 }
213
214 ha.haResources.resources[resName] = resourceState{
215 hasQuorum: hasQuorum,
216 isWatched: hasQuorum,
217 }
218 if !hasQuorum {
219 return false, 0
220 }
221
222 return true, long
223 }
224
View as plain text