...
1 package watcherx
2
3 import (
4 "bytes"
5 "encoding/json"
6 "fmt"
7 "io"
8
9 "github.com/pkg/errors"
10 )
11
12 type (
13 Event interface {
14
15 json.Marshaler
16
17 Reader() io.Reader
18 Source() string
19 String() string
20 setSource(string)
21 }
22 source string
23 ErrorEvent struct {
24 error
25 source
26 }
27 ChangeEvent struct {
28 data []byte
29 source
30 }
31 RemoveEvent struct {
32 source
33 }
34 serialEventType string
35 serialEvent struct {
36 Type serialEventType `json:"type"`
37 Data []byte `json:"data"`
38 Source source `json:"source"`
39 }
40 )
41
42 const (
43 serialTypeChange serialEventType = "change"
44 serialTypeRemove serialEventType = "remove"
45 serialTypeError serialEventType = "error"
46 )
47
48 var errUnknownEvent = errors.New("unknown event type")
49
50 func (e *ErrorEvent) Reader() io.Reader {
51 return bytes.NewBufferString(e.Error())
52 }
53
54 func (e *ErrorEvent) MarshalJSON() ([]byte, error) {
55 return json.Marshal(serialEvent{
56 Type: serialTypeError,
57 Data: []byte(e.Error()),
58 Source: e.source,
59 })
60 }
61
62 func (e *ErrorEvent) String() string {
63 return fmt.Sprintf("error: %+v; source: %s", e.error, e.source)
64 }
65
66 func (e source) Source() string {
67 return string(e)
68 }
69
70 func (e *source) setSource(nsrc string) {
71 *e = source(nsrc)
72 }
73
74 func (e *ChangeEvent) Reader() io.Reader {
75 return bytes.NewBuffer(e.data)
76 }
77
78 func (e *ChangeEvent) MarshalJSON() ([]byte, error) {
79 return json.Marshal(serialEvent{
80 Type: serialTypeChange,
81 Data: e.data,
82 Source: e.source,
83 })
84 }
85
86 func (e *ChangeEvent) String() string {
87 return fmt.Sprintf("data: %s; source: %s", e.data, e.source)
88 }
89
90 func (e *RemoveEvent) Reader() io.Reader {
91 return nil
92 }
93
94 func (e *RemoveEvent) MarshalJSON() ([]byte, error) {
95 return json.Marshal(serialEvent{
96 Type: serialTypeRemove,
97 Source: e.source,
98 })
99 }
100
101 func (e *RemoveEvent) String() string {
102 return fmt.Sprintf("removed source: %s", e.source)
103 }
104
105 func unmarshalEvent(data []byte) (Event, error) {
106 var serialEvent serialEvent
107 if err := json.Unmarshal(data, &serialEvent); err != nil {
108 return nil, errors.WithStack(err)
109 }
110 switch serialEvent.Type {
111 case serialTypeRemove:
112 return &RemoveEvent{
113 source: serialEvent.Source,
114 }, nil
115 case serialTypeChange:
116 return &ChangeEvent{
117 data: serialEvent.Data,
118 source: serialEvent.Source,
119 }, nil
120 case serialTypeError:
121 return &ErrorEvent{
122 error: errors.New(string(serialEvent.Data)),
123 source: serialEvent.Source,
124 }, nil
125 }
126 return nil, errUnknownEvent
127 }
128
View as plain text