...

Source file src/github.com/sassoftware/relic/lib/audit/amqp.go

Documentation: github.com/sassoftware/relic/lib/audit

     1  //
     2  // Copyright (c) SAS Institute Inc.
     3  //
     4  // Licensed under the Apache License, Version 2.0 (the "License");
     5  // you may not use this file except in compliance with the License.
     6  // You may obtain a copy of the License at
     7  //
     8  //     http://www.apache.org/licenses/LICENSE-2.0
     9  //
    10  // Unless required by applicable law or agreed to in writing, software
    11  // distributed under the License is distributed on an "AS IS" BASIS,
    12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  // See the License for the specific language governing permissions and
    14  // limitations under the License.
    15  //
    16  
    17  package audit
    18  
    19  import (
    20  	"crypto/tls"
    21  	"errors"
    22  	"time"
    23  
    24  	"github.com/sassoftware/relic/config"
    25  	"github.com/sassoftware/relic/lib/certloader"
    26  	"github.com/sassoftware/relic/lib/x509tools"
    27  	"github.com/streadway/amqp"
    28  )
    29  
    30  // Publish audit record to a AMQP exchange
    31  func (info *Info) Publish(aconf *config.AmqpConfig) error {
    32  	blob, err := info.Marshal()
    33  	if err != nil {
    34  		return err
    35  	}
    36  	msg := amqp.Publishing{
    37  		DeliveryMode: amqp.Persistent,
    38  		Timestamp:    time.Now(),
    39  		ContentType:  "application/json",
    40  		Body:         blob,
    41  	}
    42  	conn, err := Connect(aconf)
    43  	if err != nil {
    44  		return err
    45  	}
    46  	defer conn.Close()
    47  	ch, err := conn.Channel()
    48  	if err != nil {
    49  		return err
    50  	}
    51  	defer ch.Close()
    52  	if err := ch.ExchangeDeclare(aconf.ExchangeName(), amqp.ExchangeFanout, true, false, false, false, nil); err != nil {
    53  		return err
    54  	}
    55  	ch.Confirm(false)
    56  	notify := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
    57  	if err := ch.Publish(aconf.ExchangeName(), aconf.RoutingKey(), false, false, msg); err != nil {
    58  		return err
    59  	}
    60  	confirm := <-notify
    61  	if !confirm.Ack {
    62  		return errors.New("message was NACKed")
    63  	}
    64  	return nil
    65  }
    66  
    67  // Connect to the configured AMQP broker
    68  func Connect(aconf *config.AmqpConfig) (*amqp.Connection, error) {
    69  	uri, err := amqp.ParseURI(aconf.URL)
    70  	if err != nil {
    71  		return nil, err
    72  	}
    73  	var tconf *tls.Config
    74  	var auth []amqp.Authentication
    75  	if uri.Scheme == "amqps" {
    76  		tconf = &tls.Config{}
    77  		if aconf.CaCert != "" {
    78  			if err := x509tools.LoadCertPool(aconf.CaCert, tconf); err != nil {
    79  				return nil, err
    80  			}
    81  		}
    82  		if aconf.CertFile != "" {
    83  			cert, err := certloader.LoadX509KeyPair(aconf.CertFile, aconf.KeyFile)
    84  			if err != nil {
    85  				return nil, err
    86  			}
    87  			tconf.Certificates = []tls.Certificate{cert.TLS()}
    88  		}
    89  		x509tools.SetKeyLogFile(tconf)
    90  		if len(tconf.Certificates) != 0 {
    91  			auth = append(auth, externalAuth{})
    92  		}
    93  	}
    94  	if uri.Password != "" {
    95  		auth = append(auth, uri.PlainAuth())
    96  	}
    97  	qconf := amqp.Config{SASL: auth, TLSClientConfig: tconf}
    98  	return amqp.DialConfig(aconf.URL, qconf)
    99  }
   100  
   101  type externalAuth struct{}
   102  
   103  func (externalAuth) Mechanism() string { return "EXTERNAL" }
   104  func (externalAuth) Response() string  { return "" }
   105  

View as plain text