...

Source file src/go.opencensus.io/stats/view/worker.go

Documentation: go.opencensus.io/stats/view

     1  // Copyright 2017, OpenCensus Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package view
    17  
    18  import (
    19  	"fmt"
    20  	"sync"
    21  	"time"
    22  
    23  	"go.opencensus.io/resource"
    24  
    25  	"go.opencensus.io/metric/metricdata"
    26  	"go.opencensus.io/metric/metricproducer"
    27  	"go.opencensus.io/stats"
    28  	"go.opencensus.io/stats/internal"
    29  	"go.opencensus.io/tag"
    30  )
    31  
    32  func init() {
    33  	defaultWorker = NewMeter().(*worker)
    34  	go defaultWorker.start()
    35  	internal.DefaultRecorder = record
    36  	internal.MeasurementRecorder = recordMeasurement
    37  }
    38  
    39  type measureRef struct {
    40  	measure string
    41  	views   map[*viewInternal]struct{}
    42  }
    43  
    44  type worker struct {
    45  	measures       map[string]*measureRef
    46  	views          map[string]*viewInternal
    47  	viewStartTimes map[*viewInternal]time.Time
    48  
    49  	timer      *time.Ticker
    50  	c          chan command
    51  	quit, done chan bool
    52  	mu         sync.RWMutex
    53  	r          *resource.Resource
    54  
    55  	exportersMu sync.RWMutex
    56  	exporters   map[Exporter]struct{}
    57  }
    58  
    59  // Meter defines an interface which allows a single process to maintain
    60  // multiple sets of metrics exports (intended for the advanced case where a
    61  // single process wants to report metrics about multiple objects, such as
    62  // multiple databases or HTTP services).
    63  //
    64  // Note that this is an advanced use case, and the static functions in this
    65  // module should cover the common use cases.
    66  type Meter interface {
    67  	stats.Recorder
    68  	// Find returns a registered view associated with this name.
    69  	// If no registered view is found, nil is returned.
    70  	Find(name string) *View
    71  	// Register begins collecting data for the given views.
    72  	// Once a view is registered, it reports data to the registered exporters.
    73  	Register(views ...*View) error
    74  	// Unregister the given views. Data will not longer be exported for these views
    75  	// after Unregister returns.
    76  	// It is not necessary to unregister from views you expect to collect for the
    77  	// duration of your program execution.
    78  	Unregister(views ...*View)
    79  	// SetReportingPeriod sets the interval between reporting aggregated views in
    80  	// the program. If duration is less than or equal to zero, it enables the
    81  	// default behavior.
    82  	//
    83  	// Note: each exporter makes different promises about what the lowest supported
    84  	// duration is. For example, the Stackdriver exporter recommends a value no
    85  	// lower than 1 minute. Consult each exporter per your needs.
    86  	SetReportingPeriod(time.Duration)
    87  
    88  	// RegisterExporter registers an exporter.
    89  	// Collected data will be reported via all the
    90  	// registered exporters. Once you no longer
    91  	// want data to be exported, invoke UnregisterExporter
    92  	// with the previously registered exporter.
    93  	//
    94  	// Binaries can register exporters, libraries shouldn't register exporters.
    95  	RegisterExporter(Exporter)
    96  	// UnregisterExporter unregisters an exporter.
    97  	UnregisterExporter(Exporter)
    98  	// SetResource may be used to set the Resource associated with this registry.
    99  	// This is intended to be used in cases where a single process exports metrics
   100  	// for multiple Resources, typically in a multi-tenant situation.
   101  	SetResource(*resource.Resource)
   102  
   103  	// Start causes the Meter to start processing Record calls and aggregating
   104  	// statistics as well as exporting data.
   105  	Start()
   106  	// Stop causes the Meter to stop processing calls and terminate data export.
   107  	Stop()
   108  
   109  	// RetrieveData gets a snapshot of the data collected for the the view registered
   110  	// with the given name. It is intended for testing only.
   111  	RetrieveData(viewName string) ([]*Row, error)
   112  }
   113  
   114  var _ Meter = (*worker)(nil)
   115  
   116  var defaultWorker *worker
   117  
   118  var defaultReportingDuration = 10 * time.Second
   119  
   120  // Find returns a registered view associated with this name.
   121  // If no registered view is found, nil is returned.
   122  func Find(name string) (v *View) {
   123  	return defaultWorker.Find(name)
   124  }
   125  
   126  // Find returns a registered view associated with this name.
   127  // If no registered view is found, nil is returned.
   128  func (w *worker) Find(name string) (v *View) {
   129  	req := &getViewByNameReq{
   130  		name: name,
   131  		c:    make(chan *getViewByNameResp),
   132  	}
   133  	w.c <- req
   134  	resp := <-req.c
   135  	return resp.v
   136  }
   137  
   138  // Register begins collecting data for the given views.
   139  // Once a view is registered, it reports data to the registered exporters.
   140  func Register(views ...*View) error {
   141  	return defaultWorker.Register(views...)
   142  }
   143  
   144  // Register begins collecting data for the given views.
   145  // Once a view is registered, it reports data to the registered exporters.
   146  func (w *worker) Register(views ...*View) error {
   147  	req := &registerViewReq{
   148  		views: views,
   149  		err:   make(chan error),
   150  	}
   151  	w.c <- req
   152  	return <-req.err
   153  }
   154  
   155  // Unregister the given views. Data will not longer be exported for these views
   156  // after Unregister returns.
   157  // It is not necessary to unregister from views you expect to collect for the
   158  // duration of your program execution.
   159  func Unregister(views ...*View) {
   160  	defaultWorker.Unregister(views...)
   161  }
   162  
   163  // Unregister the given views. Data will not longer be exported for these views
   164  // after Unregister returns.
   165  // It is not necessary to unregister from views you expect to collect for the
   166  // duration of your program execution.
   167  func (w *worker) Unregister(views ...*View) {
   168  	names := make([]string, len(views))
   169  	for i := range views {
   170  		names[i] = views[i].Name
   171  	}
   172  	req := &unregisterFromViewReq{
   173  		views: names,
   174  		done:  make(chan struct{}),
   175  	}
   176  	w.c <- req
   177  	<-req.done
   178  }
   179  
   180  // RetrieveData gets a snapshot of the data collected for the the view registered
   181  // with the given name. It is intended for testing only.
   182  func RetrieveData(viewName string) ([]*Row, error) {
   183  	return defaultWorker.RetrieveData(viewName)
   184  }
   185  
   186  // RetrieveData gets a snapshot of the data collected for the the view registered
   187  // with the given name. It is intended for testing only.
   188  func (w *worker) RetrieveData(viewName string) ([]*Row, error) {
   189  	req := &retrieveDataReq{
   190  		now: time.Now(),
   191  		v:   viewName,
   192  		c:   make(chan *retrieveDataResp),
   193  	}
   194  	w.c <- req
   195  	resp := <-req.c
   196  	return resp.rows, resp.err
   197  }
   198  
   199  func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
   200  	defaultWorker.Record(tags, ms, attachments)
   201  }
   202  
   203  func recordMeasurement(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) {
   204  	defaultWorker.recordMeasurement(tags, ms, attachments)
   205  }
   206  
   207  // Record records a set of measurements ms associated with the given tags and attachments.
   208  func (w *worker) Record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
   209  	w.recordMeasurement(tags, ms.([]stats.Measurement), attachments)
   210  }
   211  
   212  // recordMeasurement records a set of measurements ms associated with the given tags and attachments.
   213  // This is the same as Record but without an interface{} type to avoid allocations
   214  func (w *worker) recordMeasurement(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) {
   215  	req := &recordReq{
   216  		tm:          tags,
   217  		ms:          ms,
   218  		attachments: attachments,
   219  		t:           time.Now(),
   220  	}
   221  	w.c <- req
   222  }
   223  
   224  // SetReportingPeriod sets the interval between reporting aggregated views in
   225  // the program. If duration is less than or equal to zero, it enables the
   226  // default behavior.
   227  //
   228  // Note: each exporter makes different promises about what the lowest supported
   229  // duration is. For example, the Stackdriver exporter recommends a value no
   230  // lower than 1 minute. Consult each exporter per your needs.
   231  func SetReportingPeriod(d time.Duration) {
   232  	defaultWorker.SetReportingPeriod(d)
   233  }
   234  
   235  // Stop stops the default worker.
   236  func Stop() {
   237  	defaultWorker.Stop()
   238  }
   239  
   240  // SetReportingPeriod sets the interval between reporting aggregated views in
   241  // the program. If duration is less than or equal to zero, it enables the
   242  // default behavior.
   243  //
   244  // Note: each exporter makes different promises about what the lowest supported
   245  // duration is. For example, the Stackdriver exporter recommends a value no
   246  // lower than 1 minute. Consult each exporter per your needs.
   247  func (w *worker) SetReportingPeriod(d time.Duration) {
   248  	// TODO(acetechnologist): ensure that the duration d is more than a certain
   249  	// value. e.g. 1s
   250  	req := &setReportingPeriodReq{
   251  		d: d,
   252  		c: make(chan bool),
   253  	}
   254  	w.c <- req
   255  	<-req.c // don't return until the timer is set to the new duration.
   256  }
   257  
   258  // NewMeter constructs a Meter instance. You should only need to use this if
   259  // you need to separate out Measurement recordings and View aggregations within
   260  // a single process.
   261  func NewMeter() Meter {
   262  	return &worker{
   263  		measures:       make(map[string]*measureRef),
   264  		views:          make(map[string]*viewInternal),
   265  		viewStartTimes: make(map[*viewInternal]time.Time),
   266  		timer:          time.NewTicker(defaultReportingDuration),
   267  		c:              make(chan command, 1024),
   268  		quit:           make(chan bool),
   269  		done:           make(chan bool),
   270  
   271  		exporters: make(map[Exporter]struct{}),
   272  	}
   273  }
   274  
   275  // SetResource associates all data collected by this Meter with the specified
   276  // resource. This resource is reported when using metricexport.ReadAndExport;
   277  // it is not provided when used with ExportView/RegisterExporter, because that
   278  // interface does not provide a means for reporting the Resource.
   279  func (w *worker) SetResource(r *resource.Resource) {
   280  	w.r = r
   281  }
   282  
   283  func (w *worker) Start() {
   284  	go w.start()
   285  }
   286  
   287  func (w *worker) start() {
   288  	prodMgr := metricproducer.GlobalManager()
   289  	prodMgr.AddProducer(w)
   290  
   291  	for {
   292  		select {
   293  		case cmd := <-w.c:
   294  			cmd.handleCommand(w)
   295  		case <-w.timer.C:
   296  			w.reportUsage()
   297  		case <-w.quit:
   298  			w.timer.Stop()
   299  			close(w.c)
   300  			close(w.done)
   301  			return
   302  		}
   303  	}
   304  }
   305  
   306  func (w *worker) Stop() {
   307  	prodMgr := metricproducer.GlobalManager()
   308  	prodMgr.DeleteProducer(w)
   309  	select {
   310  	case <-w.quit:
   311  	default:
   312  		close(w.quit)
   313  	}
   314  	<-w.done
   315  }
   316  
   317  func (w *worker) getMeasureRef(name string) *measureRef {
   318  	if mr, ok := w.measures[name]; ok {
   319  		return mr
   320  	}
   321  	mr := &measureRef{
   322  		measure: name,
   323  		views:   make(map[*viewInternal]struct{}),
   324  	}
   325  	w.measures[name] = mr
   326  	return mr
   327  }
   328  
   329  func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
   330  	w.mu.Lock()
   331  	defer w.mu.Unlock()
   332  	vi, err := newViewInternal(v)
   333  	if err != nil {
   334  		return nil, err
   335  	}
   336  	if x, ok := w.views[vi.view.Name]; ok {
   337  		if !x.view.same(vi.view) {
   338  			return nil, fmt.Errorf("cannot register view %q; a different view with the same name is already registered", v.Name)
   339  		}
   340  
   341  		// the view is already registered so there is nothing to do and the
   342  		// command is considered successful.
   343  		return x, nil
   344  	}
   345  	w.views[vi.view.Name] = vi
   346  	w.viewStartTimes[vi] = time.Now()
   347  	ref := w.getMeasureRef(vi.view.Measure.Name())
   348  	ref.views[vi] = struct{}{}
   349  	return vi, nil
   350  }
   351  
   352  func (w *worker) unregisterView(v *viewInternal) {
   353  	w.mu.Lock()
   354  	defer w.mu.Unlock()
   355  	delete(w.views, v.view.Name)
   356  	delete(w.viewStartTimes, v)
   357  	if measure := w.measures[v.view.Measure.Name()]; measure != nil {
   358  		delete(measure.views, v)
   359  	}
   360  }
   361  
   362  func (w *worker) reportView(v *viewInternal) {
   363  	if !v.isSubscribed() {
   364  		return
   365  	}
   366  	rows := v.collectedRows()
   367  	viewData := &Data{
   368  		View:  v.view,
   369  		Start: w.viewStartTimes[v],
   370  		End:   time.Now(),
   371  		Rows:  rows,
   372  	}
   373  	w.exportersMu.Lock()
   374  	defer w.exportersMu.Unlock()
   375  	for e := range w.exporters {
   376  		e.ExportView(viewData)
   377  	}
   378  }
   379  
   380  func (w *worker) reportUsage() {
   381  	w.mu.Lock()
   382  	defer w.mu.Unlock()
   383  	for _, v := range w.views {
   384  		w.reportView(v)
   385  	}
   386  }
   387  
   388  func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric {
   389  	if !v.isSubscribed() {
   390  		return nil
   391  	}
   392  
   393  	return viewToMetric(v, w.r, now)
   394  }
   395  
   396  // Read reads all view data and returns them as metrics.
   397  // It is typically invoked by metric reader to export stats in metric format.
   398  func (w *worker) Read() []*metricdata.Metric {
   399  	w.mu.Lock()
   400  	defer w.mu.Unlock()
   401  	now := time.Now()
   402  	metrics := make([]*metricdata.Metric, 0, len(w.views))
   403  	for _, v := range w.views {
   404  		metric := w.toMetric(v, now)
   405  		if metric != nil {
   406  			metrics = append(metrics, metric)
   407  		}
   408  	}
   409  	return metrics
   410  }
   411  
   412  func (w *worker) RegisterExporter(e Exporter) {
   413  	w.exportersMu.Lock()
   414  	defer w.exportersMu.Unlock()
   415  
   416  	w.exporters[e] = struct{}{}
   417  }
   418  
   419  func (w *worker) UnregisterExporter(e Exporter) {
   420  	w.exportersMu.Lock()
   421  	defer w.exportersMu.Unlock()
   422  
   423  	delete(w.exporters, e)
   424  }
   425  

View as plain text