...

Source file src/github.com/coreos/go-systemd/v22/sdjournal/read.go

Documentation: github.com/coreos/go-systemd/v22/sdjournal

     1  // Copyright 2015 RedHat, Inc.
     2  // Copyright 2015 CoreOS, Inc.
     3  //
     4  // Licensed under the Apache License, Version 2.0 (the "License");
     5  // you may not use this file except in compliance with the License.
     6  // You may obtain a copy of the License at
     7  //
     8  //     http://www.apache.org/licenses/LICENSE-2.0
     9  //
    10  // Unless required by applicable law or agreed to in writing, software
    11  // distributed under the License is distributed on an "AS IS" BASIS,
    12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  // See the License for the specific language governing permissions and
    14  // limitations under the License.
    15  
    16  package sdjournal
    17  
    18  import (
    19  	"errors"
    20  	"fmt"
    21  	"io"
    22  	"log"
    23  	"strings"
    24  	"sync"
    25  	"time"
    26  )
    27  
    28  var (
    29  	// ErrExpired gets returned when the Follow function runs into the
    30  	// specified timeout.
    31  	ErrExpired = errors.New("Timeout expired")
    32  )
    33  
    34  // JournalReaderConfig represents options to drive the behavior of a JournalReader.
    35  type JournalReaderConfig struct {
    36  	// The Since, NumFromTail and Cursor options are mutually exclusive and
    37  	// determine where the reading begins within the journal. The order in which
    38  	// options are written is exactly the order of precedence.
    39  	Since       time.Duration // start relative to a Duration from now
    40  	NumFromTail uint64        // start relative to the tail
    41  	Cursor      string        // start relative to the cursor
    42  
    43  	// Show only journal entries whose fields match the supplied values. If
    44  	// the array is empty, entries will not be filtered.
    45  	Matches []Match
    46  
    47  	// If not empty, the journal instance will point to a journal residing
    48  	// in this directory. The supplied path may be relative or absolute.
    49  	Path string
    50  
    51  	// If not nil, Formatter will be used to translate the resulting entries
    52  	// into strings. If not set, the default format (timestamp and message field)
    53  	// will be used. If Formatter returns an error, Read will stop and return the error.
    54  	Formatter func(entry *JournalEntry) (string, error)
    55  }
    56  
    57  // JournalReader is an io.ReadCloser which provides a simple interface for iterating through the
    58  // systemd journal. A JournalReader is not safe for concurrent use by multiple goroutines.
    59  type JournalReader struct {
    60  	journal   *Journal
    61  	msgReader *strings.Reader
    62  	formatter func(entry *JournalEntry) (string, error)
    63  }
    64  
    65  // NewJournalReader creates a new JournalReader with configuration options that are similar to the
    66  // systemd journalctl tool's iteration and filtering features.
    67  func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) {
    68  	// use simpleMessageFormatter as default formatter.
    69  	if config.Formatter == nil {
    70  		config.Formatter = simpleMessageFormatter
    71  	}
    72  
    73  	r := &JournalReader{
    74  		formatter: config.Formatter,
    75  	}
    76  
    77  	// Open the journal
    78  	var err error
    79  	if config.Path != "" {
    80  		r.journal, err = NewJournalFromDir(config.Path)
    81  	} else {
    82  		r.journal, err = NewJournal()
    83  	}
    84  	if err != nil {
    85  		return nil, err
    86  	}
    87  
    88  	// Add any supplied matches
    89  	for _, m := range config.Matches {
    90  		if err = r.journal.AddMatch(m.String()); err != nil {
    91  			return nil, err
    92  		}
    93  	}
    94  
    95  	// Set the start position based on options
    96  	if config.Since != 0 {
    97  		// Start based on a relative time
    98  		start := time.Now().Add(config.Since)
    99  		if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil {
   100  			return nil, err
   101  		}
   102  	} else if config.NumFromTail != 0 {
   103  		// Start based on a number of lines before the tail
   104  		if err := r.journal.SeekTail(); err != nil {
   105  			return nil, err
   106  		}
   107  
   108  		// Move the read pointer into position near the tail. Go one further than
   109  		// the option so that the initial cursor advancement positions us at the
   110  		// correct starting point.
   111  		skip, err := r.journal.PreviousSkip(config.NumFromTail + 1)
   112  		if err != nil {
   113  			return nil, err
   114  		}
   115  		// If we skipped fewer lines than expected, we have reached journal start.
   116  		// Thus, we seek to head so that next invocation can read the first line.
   117  		if skip != config.NumFromTail+1 {
   118  			if err := r.journal.SeekHead(); err != nil {
   119  				return nil, err
   120  			}
   121  		}
   122  	} else if config.Cursor != "" {
   123  		// Start based on a custom cursor
   124  		if err := r.journal.SeekCursor(config.Cursor); err != nil {
   125  			return nil, err
   126  		}
   127  	}
   128  
   129  	return r, nil
   130  }
   131  
   132  // Read reads entries from the journal. Read follows the Reader interface so
   133  // it must be able to read a specific amount of bytes. Journald on the other
   134  // hand only allows us to read full entries of arbitrary size (without byte
   135  // granularity). JournalReader is therefore internally buffering entries that
   136  // don't fit in the read buffer. Callers should keep calling until 0 and/or an
   137  // error is returned.
   138  func (r *JournalReader) Read(b []byte) (int, error) {
   139  	if r.msgReader == nil {
   140  		// Advance the journal cursor. It has to be called at least one time
   141  		// before reading
   142  		c, err := r.journal.Next()
   143  
   144  		// An unexpected error
   145  		if err != nil {
   146  			return 0, err
   147  		}
   148  
   149  		// EOF detection
   150  		if c == 0 {
   151  			return 0, io.EOF
   152  		}
   153  
   154  		entry, err := r.journal.GetEntry()
   155  		if err != nil {
   156  			return 0, err
   157  		}
   158  
   159  		// Build a message
   160  		msg, err := r.formatter(entry)
   161  		if err != nil {
   162  			return 0, err
   163  		}
   164  		r.msgReader = strings.NewReader(msg)
   165  	}
   166  
   167  	// Copy and return the message
   168  	sz, err := r.msgReader.Read(b)
   169  	if err == io.EOF {
   170  		// The current entry has been fully read. Don't propagate this
   171  		// EOF, so the next entry can be read at the next Read()
   172  		// iteration.
   173  		r.msgReader = nil
   174  		return sz, nil
   175  	}
   176  	if err != nil {
   177  		return sz, err
   178  	}
   179  	if r.msgReader.Len() == 0 {
   180  		r.msgReader = nil
   181  	}
   182  
   183  	return sz, nil
   184  }
   185  
   186  // Close closes the JournalReader's handle to the journal.
   187  func (r *JournalReader) Close() error {
   188  	return r.journal.Close()
   189  }
   190  
   191  // Rewind attempts to rewind the JournalReader to the first entry.
   192  func (r *JournalReader) Rewind() error {
   193  	r.msgReader = nil
   194  	return r.journal.SeekHead()
   195  }
   196  
   197  // Follow synchronously follows the JournalReader, writing each new journal entry to writer. The
   198  // follow will continue until a single time.Time is received on the until channel.
   199  func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) error {
   200  
   201  	// Process journal entries and events. Entries are flushed until the tail or
   202  	// timeout is reached, and then we wait for new events or the timeout.
   203  	var msg = make([]byte, 64*1<<(10))
   204  	var waitCh = make(chan int, 1)
   205  	var waitGroup sync.WaitGroup
   206  	defer waitGroup.Wait()
   207  
   208  process:
   209  	for {
   210  		c, err := r.Read(msg)
   211  		if err != nil && err != io.EOF {
   212  			return err
   213  		}
   214  
   215  		select {
   216  		case <-until:
   217  			return ErrExpired
   218  		default:
   219  		}
   220  		if c > 0 {
   221  			if _, err = writer.Write(msg[:c]); err != nil {
   222  				return err
   223  			}
   224  			continue process
   225  		}
   226  
   227  		// We're at the tail, so wait for new events or time out.
   228  		// Holds journal events to process. Tightly bounded for now unless there's a
   229  		// reason to unblock the journal watch routine more quickly.
   230  		for {
   231  			waitGroup.Add(1)
   232  			go func() {
   233  				status := r.journal.Wait(100 * time.Millisecond)
   234  				waitCh <- status
   235  				waitGroup.Done()
   236  			}()
   237  
   238  			select {
   239  			case <-until:
   240  				return ErrExpired
   241  			case e := <-waitCh:
   242  				switch e {
   243  				case SD_JOURNAL_NOP:
   244  					// the journal did not change since the last invocation
   245  				case SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
   246  					continue process
   247  				default:
   248  					if e < 0 {
   249  						return fmt.Errorf("received error event: %d", e)
   250  					}
   251  
   252  					log.Printf("received unknown event: %d\n", e)
   253  				}
   254  			}
   255  		}
   256  	}
   257  }
   258  
   259  // simpleMessageFormatter is the default formatter.
   260  // It returns a string representing the current journal entry in a simple format which
   261  // includes the entry timestamp and MESSAGE field.
   262  func simpleMessageFormatter(entry *JournalEntry) (string, error) {
   263  	msg, ok := entry.Fields["MESSAGE"]
   264  	if !ok {
   265  		return "", fmt.Errorf("no MESSAGE field present in journal entry")
   266  	}
   267  
   268  	usec := entry.RealtimeTimestamp
   269  	timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond))
   270  
   271  	return fmt.Sprintf("%s %s\n", timestamp, msg), nil
   272  }
   273  

View as plain text