...

Source file src/go.opencensus.io/metric/metricexport/reader.go

Documentation: go.opencensus.io/metric/metricexport

     1  // Copyright 2019, OpenCensus Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package metricexport
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"sync"
    22  	"time"
    23  
    24  	"go.opencensus.io/metric/metricdata"
    25  	"go.opencensus.io/metric/metricproducer"
    26  	"go.opencensus.io/trace"
    27  )
    28  
    29  var (
    30  	defaultSampler             = trace.ProbabilitySampler(0.0001)
    31  	errReportingIntervalTooLow = fmt.Errorf("reporting interval less than %d", minimumReportingDuration)
    32  	errAlreadyStarted          = fmt.Errorf("already started")
    33  	errIntervalReaderNil       = fmt.Errorf("interval reader is nil")
    34  	errExporterNil             = fmt.Errorf("exporter is nil")
    35  	errReaderNil               = fmt.Errorf("reader is nil")
    36  )
    37  
    38  const (
    39  	defaultReportingDuration = 60 * time.Second
    40  	minimumReportingDuration = 1 * time.Second
    41  	defaultSpanName          = "ExportMetrics"
    42  )
    43  
    44  // ReaderOptions contains options pertaining to metrics reader.
    45  type ReaderOptions struct {
    46  	// SpanName is the name used for span created to export metrics.
    47  	SpanName string
    48  }
    49  
    50  // Reader reads metrics from all producers registered
    51  // with producer manager and exports those metrics using provided
    52  // exporter.
    53  type Reader struct {
    54  	sampler trace.Sampler
    55  
    56  	spanName string
    57  }
    58  
    59  // IntervalReader periodically reads metrics from all producers registered
    60  // with producer manager and exports those metrics using provided
    61  // exporter. Call Reader.Stop() to stop the reader.
    62  type IntervalReader struct {
    63  	// ReportingInterval it the time duration between two consecutive
    64  	// metrics reporting. defaultReportingDuration  is used if it is not set.
    65  	// It cannot be set lower than minimumReportingDuration.
    66  	ReportingInterval time.Duration
    67  
    68  	exporter   Exporter
    69  	timer      *time.Ticker
    70  	quit, done chan bool
    71  	mu         sync.RWMutex
    72  	reader     *Reader
    73  }
    74  
    75  // ReaderOption apply changes to ReaderOptions.
    76  type ReaderOption func(*ReaderOptions)
    77  
    78  // WithSpanName makes new reader to use given span name when exporting metrics.
    79  func WithSpanName(spanName string) ReaderOption {
    80  	return func(o *ReaderOptions) {
    81  		o.SpanName = spanName
    82  	}
    83  }
    84  
    85  // NewReader returns a reader configured with specified options.
    86  func NewReader(o ...ReaderOption) *Reader {
    87  	var opts ReaderOptions
    88  	for _, op := range o {
    89  		op(&opts)
    90  	}
    91  	reader := &Reader{defaultSampler, defaultSpanName}
    92  	if opts.SpanName != "" {
    93  		reader.spanName = opts.SpanName
    94  	}
    95  	return reader
    96  }
    97  
    98  // NewIntervalReader creates a reader. Once started it periodically
    99  // reads metrics from all producers and exports them using provided exporter.
   100  func NewIntervalReader(reader *Reader, exporter Exporter) (*IntervalReader, error) {
   101  	if exporter == nil {
   102  		return nil, errExporterNil
   103  	}
   104  	if reader == nil {
   105  		return nil, errReaderNil
   106  	}
   107  
   108  	r := &IntervalReader{
   109  		exporter: exporter,
   110  		reader:   reader,
   111  	}
   112  	return r, nil
   113  }
   114  
   115  // Start starts the IntervalReader which periodically reads metrics from all
   116  // producers registered with global producer manager. If the reporting interval
   117  // is not set prior to calling this function then default reporting interval
   118  // is used.
   119  func (ir *IntervalReader) Start() error {
   120  	if ir == nil {
   121  		return errIntervalReaderNil
   122  	}
   123  	ir.mu.Lock()
   124  	defer ir.mu.Unlock()
   125  	var reportingInterval = defaultReportingDuration
   126  	if ir.ReportingInterval != 0 {
   127  		if ir.ReportingInterval < minimumReportingDuration {
   128  			return errReportingIntervalTooLow
   129  		}
   130  		reportingInterval = ir.ReportingInterval
   131  	}
   132  
   133  	if ir.quit != nil {
   134  		return errAlreadyStarted
   135  	}
   136  	ir.timer = time.NewTicker(reportingInterval)
   137  	ir.quit = make(chan bool)
   138  	ir.done = make(chan bool)
   139  
   140  	go ir.startInternal()
   141  	return nil
   142  }
   143  
   144  func (ir *IntervalReader) startInternal() {
   145  	for {
   146  		select {
   147  		case <-ir.timer.C:
   148  			ir.reader.ReadAndExport(ir.exporter)
   149  		case <-ir.quit:
   150  			ir.timer.Stop()
   151  			ir.done <- true
   152  			return
   153  		}
   154  	}
   155  }
   156  
   157  // Stop stops the reader from reading and exporting metrics.
   158  // Additional call to Stop are no-ops.
   159  func (ir *IntervalReader) Stop() {
   160  	if ir == nil {
   161  		return
   162  	}
   163  	ir.mu.Lock()
   164  	defer ir.mu.Unlock()
   165  	if ir.quit == nil {
   166  		return
   167  	}
   168  	ir.quit <- true
   169  	<-ir.done
   170  	close(ir.quit)
   171  	close(ir.done)
   172  	ir.quit = nil
   173  }
   174  
   175  // Flush flushes the metrics if IntervalReader is stopped, otherwise no-op.
   176  func (ir *IntervalReader) Flush() {
   177  	ir.mu.Lock()
   178  	defer ir.mu.Unlock()
   179  
   180  	// No-op if IntervalReader is not stopped
   181  	if ir.quit != nil {
   182  		return
   183  	}
   184  
   185  	ir.reader.ReadAndExport(ir.exporter)
   186  }
   187  
   188  // ReadAndExport reads metrics from all producer registered with
   189  // producer manager and then exports them using provided exporter.
   190  func (r *Reader) ReadAndExport(exporter Exporter) {
   191  	ctx, span := trace.StartSpan(context.Background(), r.spanName, trace.WithSampler(r.sampler))
   192  	defer span.End()
   193  	producers := metricproducer.GlobalManager().GetAll()
   194  	data := []*metricdata.Metric{}
   195  	for _, producer := range producers {
   196  		data = append(data, producer.Read()...)
   197  	}
   198  	// TODO: [rghetia] add metrics for errors.
   199  	exporter.ExportMetrics(ctx, data)
   200  }
   201  

View as plain text