...
1 package client
2
3 import (
4 "context"
5 "encoding/json"
6
7 "github.com/donovanhide/eventsource"
8 )
9
10 type EventMayPromoteChange struct {
11 ResourceName string `json:"resource_name,omitempty"`
12 NodeName string `json:"node_name,omitempty"`
13 MayPromote bool `json:"may_promote,omitempty"`
14 }
15
16
17
18
19
20 type EventProvider interface {
21
22 DRBDPromotion(ctx context.Context, lastEventId string) (*DRBDMayPromoteStream, error)
23 }
24
25 const mayPromoteChange = "may-promote-change"
26
27
28 type EventService struct {
29 client *Client
30 }
31
32
33
34 type DRBDMayPromoteStream struct {
35 Events chan EventMayPromoteChange
36 stream *eventsource.Stream
37 }
38
39
40 func (dmp *DRBDMayPromoteStream) Close() {
41 dmp.stream.Close()
42 }
43
44
45 func (e *EventService) subscribe(ctx context.Context, url, event, lastEventId string) (*eventsource.Stream, chan interface{}, error) {
46 stream, err := e.client.doEvent(ctx, url, lastEventId)
47 if err != nil {
48 return nil, nil, err
49 }
50
51 ch := make(chan interface{})
52 go func() {
53 defer close(ch)
54 for {
55 select {
56 case ev, ok := <-stream.Events:
57 if !ok {
58 return
59 }
60 if ev.Event() == event {
61 switch event {
62 case mayPromoteChange:
63 var empc EventMayPromoteChange
64 if err := json.Unmarshal([]byte(ev.Data()), &empc); err == nil {
65 ch <- empc
66 }
67 }
68 }
69 case <-ctx.Done():
70 return
71 }
72 }
73 }()
74
75 return stream, ch, nil
76 }
77
78
79 func (e *EventService) DRBDPromotion(ctx context.Context, lastEventId string) (*DRBDMayPromoteStream, error) {
80 stream, ch, err := e.subscribe(ctx, "/v1/events/drbd/promotion", mayPromoteChange, lastEventId)
81 if err != nil {
82 return nil, err
83 }
84
85 empch := make(chan EventMayPromoteChange)
86 go func() {
87 defer close(empch)
88 for ev := range ch {
89 if e, ok := ev.(EventMayPromoteChange); ok {
90 empch <- e
91 }
92 }
93 }()
94
95 return &DRBDMayPromoteStream{
96 Events: empch,
97 stream: stream,
98 }, nil
99 }
100
View as plain text