1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package auditor
18
19 import (
20 "database/sql"
21 "encoding/base64"
22 "fmt"
23 "os"
24 "time"
25
26 "github.com/sassoftware/relic/cmdline/shared"
27 "github.com/sassoftware/relic/config"
28 "github.com/sassoftware/relic/internal/activation"
29 "github.com/sassoftware/relic/lib/audit"
30 "github.com/spf13/cobra"
31 "github.com/streadway/amqp"
32
33 _ "github.com/lib/pq"
34 )
35
36 var AuditCmd = &cobra.Command{
37 Use: "audit",
38 Short: "Receive audit data from relic servers",
39 RunE: auditCmd,
40 }
41
42 var argConfigFile string
43
44 func init() {
45 shared.RootCmd.AddCommand(AuditCmd)
46 AuditCmd.Flags().StringVarP(&argConfigFile, "config", "c", "", "Name of relic-audit configuration file")
47 }
48
49 func auditCmd(cmd *cobra.Command, args []string) error {
50 if err := readConfig(); err != nil {
51 return err
52 }
53 configs, err := getServerConfs()
54 if err != nil {
55 return err
56 }
57
58 if auditConfig.LogFile != "" {
59 f, err := os.OpenFile(auditConfig.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
60 if err != nil {
61 return err
62 }
63 f.Close()
64 }
65
66 db, err := openDb()
67 if err != nil {
68 return err
69 }
70 if err := db.Ping(); err != nil {
71 return err
72 }
73
74 for _, cfg := range configs {
75 if err := startListener(cfg, db); err != nil {
76 return fmt.Errorf("%s: %s", cfg.Path(), err)
77 }
78 }
79 activation.DaemonReady()
80
81 time.Sleep(1<<63 - 1)
82 return nil
83 }
84
85 func openDb() (*sql.DB, error) {
86 return sql.Open("postgres", auditConfig.DatabaseURI)
87 }
88
89 func startListener(conf *config.Config, db *sql.DB) error {
90 aconf := conf.Amqp
91 l, err := NewListener(aconf)
92 if err != nil {
93 return err
94 }
95 fmt.Fprintf(os.Stderr, "%s: connected\n", conf.Path())
96 go func() {
97 l2 := l
98 var start time.Time
99 delay := new(expBackoff)
100 for {
101 if l2 != nil {
102 if err := l2.Loop(db); err != nil {
103 fmt.Fprintf(os.Stderr, "%s: %s\n", conf.Path(), err)
104 }
105 l2.Close()
106 l2 = nil
107 }
108 delay.CancelReset()
109 if time.Now().Sub(start) < time.Second {
110 delay.Sleep()
111 }
112 var err error
113 start = time.Now()
114 l2, err = NewListener(aconf)
115 if err != nil {
116 fmt.Fprintf(os.Stderr, "%s: %s\n", conf.Path(), err)
117 } else {
118 fmt.Fprintf(os.Stderr, "%s: connection reestablished\n", conf.Path())
119 delay.ResetAfter(60 * time.Second)
120 }
121 }
122 }()
123 return nil
124 }
125
126 type Listener struct {
127 aconf *config.AmqpConfig
128 conn *amqp.Connection
129 ch *amqp.Channel
130 qname string
131 }
132
133 func NewListener(aconf *config.AmqpConfig) (*Listener, error) {
134 conn, err := audit.Connect(aconf)
135 if err != nil {
136 return nil, err
137 }
138 ch, err := conn.Channel()
139 if err != nil {
140 conn.Close()
141 return nil, err
142 }
143 hostname, _ := os.Hostname()
144 if hostname == "" {
145 hostname = "unknown"
146 }
147 qname := "audit." + hostname
148 l := &Listener{aconf, conn, ch, qname}
149 if err := ch.ExchangeDeclarePassive(aconf.ExchangeName(), amqp.ExchangeFanout, true, false, false, false, nil); err != nil {
150 l.Close()
151 return nil, err
152 }
153 if _, err := ch.QueueDeclare(qname, true, false, false, false, nil); err != nil {
154 l.Close()
155 return nil, err
156 }
157 if err := ch.QueueBind(qname, "", aconf.ExchangeName(), false, nil); err != nil {
158 l.Close()
159 return nil, err
160 }
161 return l, nil
162 }
163
164 func (l *Listener) Loop(db *sql.DB) error {
165 errch := l.conn.NotifyClose(make(chan *amqp.Error, 1))
166 delivery, err := l.ch.Consume(l.qname, "", false, true, false, false, nil)
167 if err != nil {
168 return err
169 }
170 for {
171 d, ok := <-delivery
172 if !ok {
173 break
174 }
175 info, err := audit.Parse(d.Body)
176 if err != nil {
177 fmt.Fprintf(os.Stderr, "parse failed: %s\n", err)
178 d.Ack(false)
179 continue
180 }
181 if err := logToAll(db, info); err != nil {
182
183 fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
184 d.Reject(true)
185 return err
186 }
187 d.Ack(false)
188 }
189 return <-errch
190 }
191
192 func logToAll(db *sql.DB, info *audit.Info) (err error) {
193 tx, err := db.Begin()
194 if err != nil {
195 return err
196 }
197 defer tx.Rollback()
198 rowid, err := insertRow(db, info)
199 if err != nil {
200 return err
201 }
202 if err := logGraylog(info, rowid); err != nil {
203 return err
204 }
205 if err := logToFile(info, rowid); err != nil {
206 return err
207 }
208 return tx.Commit()
209 }
210
211 func insertRow(db *sql.DB, info *audit.Info) (int64, error) {
212 blob, err := info.Marshal()
213 if err != nil {
214 return 0, err
215 }
216 attrs64 := base64.StdEncoding.EncodeToString(blob)
217 var rowid int64
218 row := db.QueryRow("INSERT INTO signatures (timestamp, client_name, client_ip, client_dn, client_filename, sig_hostname, sig_type, sig_keyname, attributes) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING signature_id",
219 info.Attributes["sig.timestamp"],
220 info.Attributes["client.name"],
221 info.Attributes["client.ip"],
222 info.Attributes["client.dn"],
223 info.Attributes["client.filename"],
224 info.Attributes["sig.hostname"],
225 info.Attributes["sig.type"],
226 info.Attributes["sig.keyname"],
227 attrs64,
228 )
229 if err := row.Scan(&rowid); err != nil {
230 return 0, err
231 }
232 return rowid, nil
233 }
234
235 func logToFile(info *audit.Info, rowid int64) error {
236 formatted := fmtRow(info, rowid)
237 if auditConfig.LogFile != "" {
238 f, err := os.OpenFile(auditConfig.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
239 if err != nil {
240 return err
241 }
242 defer f.Close()
243 if _, err := fmt.Fprintln(f, formatted); err != nil {
244 return err
245 }
246 } else {
247 fmt.Println(formatted)
248 }
249 return nil
250 }
251
252 func fmtRow(info *audit.Info, rowid int64) string {
253 client := info.Attributes["client.name"]
254 if client == nil {
255 client = ""
256 }
257 ip := info.Attributes["client.ip"]
258 if ip == nil {
259 ip = ""
260 }
261 dn := info.Attributes["client.dn"]
262 if dn == nil {
263 dn = ""
264 }
265 return fmt.Sprintf("[%s] client=%s dn=%s ip=%s server=%s sigtype=%s filename=%s key=%s rowid=%d",
266 info.Attributes["sig.timestamp"],
267 client,
268 dn,
269 ip,
270 info.Attributes["sig.hostname"],
271 info.Attributes["sig.type"],
272 info.Attributes["client.filename"],
273 info.Attributes["sig.keyname"],
274 rowid,
275 )
276 }
277
278 func (l *Listener) Close() error {
279 if l.ch != nil {
280 l.ch.Close()
281 l.ch = nil
282 }
283 if l.conn != nil {
284 l.conn.Close()
285 l.conn = nil
286 }
287 return nil
288 }
289
290 const (
291 backoffMin = 1
292 backoffMax = 60
293 backoffE = 2.7182818284590451
294 )
295
296 type expBackoff struct {
297 e float32
298 t *time.Timer
299 }
300
301 func (e *expBackoff) Sleep() {
302 if e.e == 0 {
303 e.e = backoffMin
304 }
305 time.Sleep(time.Duration(e.e * float32(time.Second)))
306 e.e *= backoffE
307 if e.e > backoffMax {
308 e.e = backoffMax
309 }
310 }
311
312 func (e *expBackoff) ResetAfter(d time.Duration) {
313 if e.t != nil {
314 e.t.Stop()
315 }
316 e.t = time.AfterFunc(d, func() {
317 e.e = 0
318 })
319 }
320
321 func (e *expBackoff) CancelReset() {
322 if e.t != nil {
323 e.t.Stop()
324 e.t = nil
325 }
326 }
327
View as plain text