...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package audit
18
19 import (
20 "crypto/tls"
21 "errors"
22 "time"
23
24 "github.com/sassoftware/relic/config"
25 "github.com/sassoftware/relic/lib/certloader"
26 "github.com/sassoftware/relic/lib/x509tools"
27 "github.com/streadway/amqp"
28 )
29
30
31 func (info *Info) Publish(aconf *config.AmqpConfig) error {
32 blob, err := info.Marshal()
33 if err != nil {
34 return err
35 }
36 msg := amqp.Publishing{
37 DeliveryMode: amqp.Persistent,
38 Timestamp: time.Now(),
39 ContentType: "application/json",
40 Body: blob,
41 }
42 conn, err := Connect(aconf)
43 if err != nil {
44 return err
45 }
46 defer conn.Close()
47 ch, err := conn.Channel()
48 if err != nil {
49 return err
50 }
51 defer ch.Close()
52 if err := ch.ExchangeDeclare(aconf.ExchangeName(), amqp.ExchangeFanout, true, false, false, false, nil); err != nil {
53 return err
54 }
55 ch.Confirm(false)
56 notify := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
57 if err := ch.Publish(aconf.ExchangeName(), aconf.RoutingKey(), false, false, msg); err != nil {
58 return err
59 }
60 confirm := <-notify
61 if !confirm.Ack {
62 return errors.New("message was NACKed")
63 }
64 return nil
65 }
66
67
68 func Connect(aconf *config.AmqpConfig) (*amqp.Connection, error) {
69 uri, err := amqp.ParseURI(aconf.URL)
70 if err != nil {
71 return nil, err
72 }
73 var tconf *tls.Config
74 var auth []amqp.Authentication
75 if uri.Scheme == "amqps" {
76 tconf = &tls.Config{}
77 if aconf.CaCert != "" {
78 if err := x509tools.LoadCertPool(aconf.CaCert, tconf); err != nil {
79 return nil, err
80 }
81 }
82 if aconf.CertFile != "" {
83 cert, err := certloader.LoadX509KeyPair(aconf.CertFile, aconf.KeyFile)
84 if err != nil {
85 return nil, err
86 }
87 tconf.Certificates = []tls.Certificate{cert.TLS()}
88 }
89 x509tools.SetKeyLogFile(tconf)
90 if len(tconf.Certificates) != 0 {
91 auth = append(auth, externalAuth{})
92 }
93 }
94 if uri.Password != "" {
95 auth = append(auth, uri.PlainAuth())
96 }
97 qconf := amqp.Config{SASL: auth, TLSClientConfig: tconf}
98 return amqp.DialConfig(aconf.URL, qconf)
99 }
100
101 type externalAuth struct{}
102
103 func (externalAuth) Mechanism() string { return "EXTERNAL" }
104 func (externalAuth) Response() string { return "" }
105
View as plain text