...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package sdjournal
17
18 import (
19 "errors"
20 "fmt"
21 "io"
22 "log"
23 "strings"
24 "sync"
25 "time"
26 )
27
28 var (
29
30
31 ErrExpired = errors.New("Timeout expired")
32 )
33
34
35 type JournalReaderConfig struct {
36
37
38
39 Since time.Duration
40 NumFromTail uint64
41 Cursor string
42
43
44
45 Matches []Match
46
47
48
49 Path string
50
51
52
53
54 Formatter func(entry *JournalEntry) (string, error)
55 }
56
57
58
59 type JournalReader struct {
60 journal *Journal
61 msgReader *strings.Reader
62 formatter func(entry *JournalEntry) (string, error)
63 }
64
65
66
67 func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) {
68
69 if config.Formatter == nil {
70 config.Formatter = simpleMessageFormatter
71 }
72
73 r := &JournalReader{
74 formatter: config.Formatter,
75 }
76
77
78 var err error
79 if config.Path != "" {
80 r.journal, err = NewJournalFromDir(config.Path)
81 } else {
82 r.journal, err = NewJournal()
83 }
84 if err != nil {
85 return nil, err
86 }
87
88
89 for _, m := range config.Matches {
90 if err = r.journal.AddMatch(m.String()); err != nil {
91 return nil, err
92 }
93 }
94
95
96 if config.Since != 0 {
97
98 start := time.Now().Add(config.Since)
99 if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil {
100 return nil, err
101 }
102 } else if config.NumFromTail != 0 {
103
104 if err := r.journal.SeekTail(); err != nil {
105 return nil, err
106 }
107
108
109
110
111 skip, err := r.journal.PreviousSkip(config.NumFromTail + 1)
112 if err != nil {
113 return nil, err
114 }
115
116
117 if skip != config.NumFromTail+1 {
118 if err := r.journal.SeekHead(); err != nil {
119 return nil, err
120 }
121 }
122 } else if config.Cursor != "" {
123
124 if err := r.journal.SeekCursor(config.Cursor); err != nil {
125 return nil, err
126 }
127 }
128
129 return r, nil
130 }
131
132
133
134
135
136
137
138 func (r *JournalReader) Read(b []byte) (int, error) {
139 if r.msgReader == nil {
140
141
142 c, err := r.journal.Next()
143
144
145 if err != nil {
146 return 0, err
147 }
148
149
150 if c == 0 {
151 return 0, io.EOF
152 }
153
154 entry, err := r.journal.GetEntry()
155 if err != nil {
156 return 0, err
157 }
158
159
160 msg, err := r.formatter(entry)
161 if err != nil {
162 return 0, err
163 }
164 r.msgReader = strings.NewReader(msg)
165 }
166
167
168 sz, err := r.msgReader.Read(b)
169 if err == io.EOF {
170
171
172
173 r.msgReader = nil
174 return sz, nil
175 }
176 if err != nil {
177 return sz, err
178 }
179 if r.msgReader.Len() == 0 {
180 r.msgReader = nil
181 }
182
183 return sz, nil
184 }
185
186
187 func (r *JournalReader) Close() error {
188 return r.journal.Close()
189 }
190
191
192 func (r *JournalReader) Rewind() error {
193 r.msgReader = nil
194 return r.journal.SeekHead()
195 }
196
197
198
199 func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) error {
200
201
202
203 var msg = make([]byte, 64*1<<(10))
204 var waitCh = make(chan int, 1)
205 var waitGroup sync.WaitGroup
206 defer waitGroup.Wait()
207
208 process:
209 for {
210 c, err := r.Read(msg)
211 if err != nil && err != io.EOF {
212 return err
213 }
214
215 select {
216 case <-until:
217 return ErrExpired
218 default:
219 }
220 if c > 0 {
221 if _, err = writer.Write(msg[:c]); err != nil {
222 return err
223 }
224 continue process
225 }
226
227
228
229
230 for {
231 waitGroup.Add(1)
232 go func() {
233 status := r.journal.Wait(100 * time.Millisecond)
234 waitCh <- status
235 waitGroup.Done()
236 }()
237
238 select {
239 case <-until:
240 return ErrExpired
241 case e := <-waitCh:
242 switch e {
243 case SD_JOURNAL_NOP:
244
245 case SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
246 continue process
247 default:
248 if e < 0 {
249 return fmt.Errorf("received error event: %d", e)
250 }
251
252 log.Printf("received unknown event: %d\n", e)
253 }
254 }
255 }
256 }
257 }
258
259
260
261
262 func simpleMessageFormatter(entry *JournalEntry) (string, error) {
263 msg, ok := entry.Fields["MESSAGE"]
264 if !ok {
265 return "", fmt.Errorf("no MESSAGE field present in journal entry")
266 }
267
268 usec := entry.RealtimeTimestamp
269 timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond))
270
271 return fmt.Sprintf("%s %s\n", timestamp, msg), nil
272 }
273
View as plain text