...

Source file src/github.com/sassoftware/relic/cmdline/auditor/auditor.go

Documentation: github.com/sassoftware/relic/cmdline/auditor

     1  //
     2  // Copyright (c) SAS Institute Inc.
     3  //
     4  // Licensed under the Apache License, Version 2.0 (the "License");
     5  // you may not use this file except in compliance with the License.
     6  // You may obtain a copy of the License at
     7  //
     8  //     http://www.apache.org/licenses/LICENSE-2.0
     9  //
    10  // Unless required by applicable law or agreed to in writing, software
    11  // distributed under the License is distributed on an "AS IS" BASIS,
    12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  // See the License for the specific language governing permissions and
    14  // limitations under the License.
    15  //
    16  
    17  package auditor
    18  
    19  import (
    20  	"database/sql"
    21  	"encoding/base64"
    22  	"fmt"
    23  	"os"
    24  	"time"
    25  
    26  	"github.com/sassoftware/relic/cmdline/shared"
    27  	"github.com/sassoftware/relic/config"
    28  	"github.com/sassoftware/relic/internal/activation"
    29  	"github.com/sassoftware/relic/lib/audit"
    30  	"github.com/spf13/cobra"
    31  	"github.com/streadway/amqp"
    32  
    33  	_ "github.com/lib/pq"
    34  )
    35  
    36  var AuditCmd = &cobra.Command{
    37  	Use:   "audit",
    38  	Short: "Receive audit data from relic servers",
    39  	RunE:  auditCmd,
    40  }
    41  
    42  var argConfigFile string
    43  
    44  func init() {
    45  	shared.RootCmd.AddCommand(AuditCmd)
    46  	AuditCmd.Flags().StringVarP(&argConfigFile, "config", "c", "", "Name of relic-audit configuration file")
    47  }
    48  
    49  func auditCmd(cmd *cobra.Command, args []string) error {
    50  	if err := readConfig(); err != nil {
    51  		return err
    52  	}
    53  	configs, err := getServerConfs()
    54  	if err != nil {
    55  		return err
    56  	}
    57  	// test logfile but open it as-needed to make rotation simpler
    58  	if auditConfig.LogFile != "" {
    59  		f, err := os.OpenFile(auditConfig.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
    60  		if err != nil {
    61  			return err
    62  		}
    63  		f.Close()
    64  	}
    65  	// open and test database connection. the sql module manages a pool for goroutines as needed.
    66  	db, err := openDb()
    67  	if err != nil {
    68  		return err
    69  	}
    70  	if err := db.Ping(); err != nil {
    71  		return err
    72  	}
    73  	// start listeners for each broker
    74  	for _, cfg := range configs {
    75  		if err := startListener(cfg, db); err != nil {
    76  			return fmt.Errorf("%s: %s", cfg.Path(), err)
    77  		}
    78  	}
    79  	activation.DaemonReady()
    80  	// nothing left to do in this goroutine
    81  	time.Sleep(1<<63 - 1)
    82  	return nil
    83  }
    84  
    85  func openDb() (*sql.DB, error) {
    86  	return sql.Open("postgres", auditConfig.DatabaseURI)
    87  }
    88  
    89  func startListener(conf *config.Config, db *sql.DB) error {
    90  	aconf := conf.Amqp
    91  	l, err := NewListener(aconf)
    92  	if err != nil {
    93  		return err
    94  	}
    95  	fmt.Fprintf(os.Stderr, "%s: connected\n", conf.Path())
    96  	go func() {
    97  		l2 := l
    98  		var start time.Time
    99  		delay := new(expBackoff)
   100  		for {
   101  			if l2 != nil {
   102  				if err := l2.Loop(db); err != nil {
   103  					fmt.Fprintf(os.Stderr, "%s: %s\n", conf.Path(), err)
   104  				}
   105  				l2.Close()
   106  				l2 = nil
   107  			}
   108  			delay.CancelReset()
   109  			if time.Now().Sub(start) < time.Second {
   110  				delay.Sleep()
   111  			}
   112  			var err error
   113  			start = time.Now()
   114  			l2, err = NewListener(aconf)
   115  			if err != nil {
   116  				fmt.Fprintf(os.Stderr, "%s: %s\n", conf.Path(), err)
   117  			} else {
   118  				fmt.Fprintf(os.Stderr, "%s: connection reestablished\n", conf.Path())
   119  				delay.ResetAfter(60 * time.Second)
   120  			}
   121  		}
   122  	}()
   123  	return nil
   124  }
   125  
   126  type Listener struct {
   127  	aconf *config.AmqpConfig
   128  	conn  *amqp.Connection
   129  	ch    *amqp.Channel
   130  	qname string
   131  }
   132  
   133  func NewListener(aconf *config.AmqpConfig) (*Listener, error) {
   134  	conn, err := audit.Connect(aconf)
   135  	if err != nil {
   136  		return nil, err
   137  	}
   138  	ch, err := conn.Channel()
   139  	if err != nil {
   140  		conn.Close()
   141  		return nil, err
   142  	}
   143  	hostname, _ := os.Hostname()
   144  	if hostname == "" {
   145  		hostname = "unknown"
   146  	}
   147  	qname := "audit." + hostname
   148  	l := &Listener{aconf, conn, ch, qname}
   149  	if err := ch.ExchangeDeclarePassive(aconf.ExchangeName(), amqp.ExchangeFanout, true, false, false, false, nil); err != nil {
   150  		l.Close()
   151  		return nil, err
   152  	}
   153  	if _, err := ch.QueueDeclare(qname, true, false, false, false, nil); err != nil {
   154  		l.Close()
   155  		return nil, err
   156  	}
   157  	if err := ch.QueueBind(qname, "", aconf.ExchangeName(), false, nil); err != nil {
   158  		l.Close()
   159  		return nil, err
   160  	}
   161  	return l, nil
   162  }
   163  
   164  func (l *Listener) Loop(db *sql.DB) error {
   165  	errch := l.conn.NotifyClose(make(chan *amqp.Error, 1))
   166  	delivery, err := l.ch.Consume(l.qname, "", false, true, false, false, nil)
   167  	if err != nil {
   168  		return err
   169  	}
   170  	for {
   171  		d, ok := <-delivery
   172  		if !ok {
   173  			break
   174  		}
   175  		info, err := audit.Parse(d.Body)
   176  		if err != nil {
   177  			fmt.Fprintf(os.Stderr, "parse failed: %s\n", err)
   178  			d.Ack(false)
   179  			continue
   180  		}
   181  		if err := logToAll(db, info); err != nil {
   182  			// reject the message, disconnect, and start a timeout
   183  			fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
   184  			d.Reject(true)
   185  			return err
   186  		}
   187  		d.Ack(false)
   188  	}
   189  	return <-errch
   190  }
   191  
   192  func logToAll(db *sql.DB, info *audit.Info) (err error) {
   193  	tx, err := db.Begin()
   194  	if err != nil {
   195  		return err
   196  	}
   197  	defer tx.Rollback()
   198  	rowid, err := insertRow(db, info)
   199  	if err != nil {
   200  		return err
   201  	}
   202  	if err := logGraylog(info, rowid); err != nil {
   203  		return err
   204  	}
   205  	if err := logToFile(info, rowid); err != nil {
   206  		return err
   207  	}
   208  	return tx.Commit()
   209  }
   210  
   211  func insertRow(db *sql.DB, info *audit.Info) (int64, error) {
   212  	blob, err := info.Marshal()
   213  	if err != nil {
   214  		return 0, err
   215  	}
   216  	attrs64 := base64.StdEncoding.EncodeToString(blob)
   217  	var rowid int64
   218  	row := db.QueryRow("INSERT INTO signatures (timestamp, client_name, client_ip, client_dn, client_filename, sig_hostname, sig_type, sig_keyname, attributes) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING signature_id",
   219  		info.Attributes["sig.timestamp"],
   220  		info.Attributes["client.name"],
   221  		info.Attributes["client.ip"],
   222  		info.Attributes["client.dn"],
   223  		info.Attributes["client.filename"],
   224  		info.Attributes["sig.hostname"],
   225  		info.Attributes["sig.type"],
   226  		info.Attributes["sig.keyname"],
   227  		attrs64,
   228  	)
   229  	if err := row.Scan(&rowid); err != nil {
   230  		return 0, err
   231  	}
   232  	return rowid, nil
   233  }
   234  
   235  func logToFile(info *audit.Info, rowid int64) error {
   236  	formatted := fmtRow(info, rowid)
   237  	if auditConfig.LogFile != "" {
   238  		f, err := os.OpenFile(auditConfig.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
   239  		if err != nil {
   240  			return err
   241  		}
   242  		defer f.Close()
   243  		if _, err := fmt.Fprintln(f, formatted); err != nil {
   244  			return err
   245  		}
   246  	} else {
   247  		fmt.Println(formatted)
   248  	}
   249  	return nil
   250  }
   251  
   252  func fmtRow(info *audit.Info, rowid int64) string {
   253  	client := info.Attributes["client.name"]
   254  	if client == nil {
   255  		client = ""
   256  	}
   257  	ip := info.Attributes["client.ip"]
   258  	if ip == nil {
   259  		ip = ""
   260  	}
   261  	dn := info.Attributes["client.dn"]
   262  	if dn == nil {
   263  		dn = ""
   264  	}
   265  	return fmt.Sprintf("[%s] client=%s dn=%s ip=%s server=%s sigtype=%s filename=%s key=%s rowid=%d",
   266  		info.Attributes["sig.timestamp"],
   267  		client,
   268  		dn,
   269  		ip,
   270  		info.Attributes["sig.hostname"],
   271  		info.Attributes["sig.type"],
   272  		info.Attributes["client.filename"],
   273  		info.Attributes["sig.keyname"],
   274  		rowid,
   275  	)
   276  }
   277  
   278  func (l *Listener) Close() error {
   279  	if l.ch != nil {
   280  		l.ch.Close()
   281  		l.ch = nil
   282  	}
   283  	if l.conn != nil {
   284  		l.conn.Close()
   285  		l.conn = nil
   286  	}
   287  	return nil
   288  }
   289  
   290  const (
   291  	backoffMin = 1
   292  	backoffMax = 60
   293  	backoffE   = 2.7182818284590451
   294  )
   295  
   296  type expBackoff struct {
   297  	e float32
   298  	t *time.Timer
   299  }
   300  
   301  func (e *expBackoff) Sleep() {
   302  	if e.e == 0 {
   303  		e.e = backoffMin
   304  	}
   305  	time.Sleep(time.Duration(e.e * float32(time.Second)))
   306  	e.e *= backoffE
   307  	if e.e > backoffMax {
   308  		e.e = backoffMax
   309  	}
   310  }
   311  
   312  func (e *expBackoff) ResetAfter(d time.Duration) {
   313  	if e.t != nil {
   314  		e.t.Stop()
   315  	}
   316  	e.t = time.AfterFunc(d, func() {
   317  		e.e = 0
   318  	})
   319  }
   320  
   321  func (e *expBackoff) CancelReset() {
   322  	if e.t != nil {
   323  		e.t.Stop()
   324  		e.t = nil
   325  	}
   326  }
   327  

View as plain text