...

Source file src/go.opencensus.io/stats/view/worker_commands.go

Documentation: go.opencensus.io/stats/view

     1  // Copyright 2017, OpenCensus Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package view
    17  
    18  import (
    19  	"errors"
    20  	"fmt"
    21  	"strings"
    22  	"time"
    23  
    24  	"go.opencensus.io/stats"
    25  	"go.opencensus.io/stats/internal"
    26  	"go.opencensus.io/tag"
    27  )
    28  
    29  type command interface {
    30  	handleCommand(w *worker)
    31  }
    32  
    33  // getViewByNameReq is the command to get a view given its name.
    34  type getViewByNameReq struct {
    35  	name string
    36  	c    chan *getViewByNameResp
    37  }
    38  
    39  type getViewByNameResp struct {
    40  	v *View
    41  }
    42  
    43  func (cmd *getViewByNameReq) handleCommand(w *worker) {
    44  	v := w.views[cmd.name]
    45  	if v == nil {
    46  		cmd.c <- &getViewByNameResp{nil}
    47  		return
    48  	}
    49  	cmd.c <- &getViewByNameResp{v.view}
    50  }
    51  
    52  // registerViewReq is the command to register a view.
    53  type registerViewReq struct {
    54  	views []*View
    55  	err   chan error
    56  }
    57  
    58  func (cmd *registerViewReq) handleCommand(w *worker) {
    59  	for _, v := range cmd.views {
    60  		if err := v.canonicalize(); err != nil {
    61  			cmd.err <- err
    62  			return
    63  		}
    64  	}
    65  	var errstr []string
    66  	for _, view := range cmd.views {
    67  		vi, err := w.tryRegisterView(view)
    68  		if err != nil {
    69  			errstr = append(errstr, fmt.Sprintf("%s: %v", view.Name, err))
    70  			continue
    71  		}
    72  		internal.SubscriptionReporter(view.Measure.Name())
    73  		vi.subscribe()
    74  	}
    75  	if len(errstr) > 0 {
    76  		cmd.err <- errors.New(strings.Join(errstr, "\n"))
    77  	} else {
    78  		cmd.err <- nil
    79  	}
    80  }
    81  
    82  // unregisterFromViewReq is the command to unregister to a view. Has no
    83  // impact on the data collection for client that are pulling data from the
    84  // library.
    85  type unregisterFromViewReq struct {
    86  	views []string
    87  	done  chan struct{}
    88  }
    89  
    90  func (cmd *unregisterFromViewReq) handleCommand(w *worker) {
    91  	for _, name := range cmd.views {
    92  		vi, ok := w.views[name]
    93  		if !ok {
    94  			continue
    95  		}
    96  
    97  		// Report pending data for this view before removing it.
    98  		w.reportView(vi)
    99  
   100  		vi.unsubscribe()
   101  		if !vi.isSubscribed() {
   102  			// this was the last subscription and view is not collecting anymore.
   103  			// The collected data can be cleared.
   104  			vi.clearRows()
   105  		}
   106  		w.unregisterView(vi)
   107  	}
   108  	cmd.done <- struct{}{}
   109  }
   110  
   111  // retrieveDataReq is the command to retrieve data for a view.
   112  type retrieveDataReq struct {
   113  	now time.Time
   114  	v   string
   115  	c   chan *retrieveDataResp
   116  }
   117  
   118  type retrieveDataResp struct {
   119  	rows []*Row
   120  	err  error
   121  }
   122  
   123  func (cmd *retrieveDataReq) handleCommand(w *worker) {
   124  	w.mu.Lock()
   125  	defer w.mu.Unlock()
   126  	vi, ok := w.views[cmd.v]
   127  	if !ok {
   128  		cmd.c <- &retrieveDataResp{
   129  			nil,
   130  			fmt.Errorf("cannot retrieve data; view %q is not registered", cmd.v),
   131  		}
   132  		return
   133  	}
   134  
   135  	if !vi.isSubscribed() {
   136  		cmd.c <- &retrieveDataResp{
   137  			nil,
   138  			fmt.Errorf("cannot retrieve data; view %q has no subscriptions or collection is not forcibly started", cmd.v),
   139  		}
   140  		return
   141  	}
   142  	cmd.c <- &retrieveDataResp{
   143  		vi.collectedRows(),
   144  		nil,
   145  	}
   146  }
   147  
   148  // recordReq is the command to record data related to multiple measures
   149  // at once.
   150  type recordReq struct {
   151  	tm          *tag.Map
   152  	ms          []stats.Measurement
   153  	attachments map[string]interface{}
   154  	t           time.Time
   155  }
   156  
   157  func (cmd *recordReq) handleCommand(w *worker) {
   158  	w.mu.Lock()
   159  	defer w.mu.Unlock()
   160  	for _, m := range cmd.ms {
   161  		if (m == stats.Measurement{}) { // not registered
   162  			continue
   163  		}
   164  		ref := w.getMeasureRef(m.Measure().Name())
   165  		for v := range ref.views {
   166  			v.addSample(cmd.tm, m.Value(), cmd.attachments, cmd.t)
   167  		}
   168  	}
   169  }
   170  
   171  // setReportingPeriodReq is the command to modify the duration between
   172  // reporting the collected data to the registered clients.
   173  type setReportingPeriodReq struct {
   174  	d time.Duration
   175  	c chan bool
   176  }
   177  
   178  func (cmd *setReportingPeriodReq) handleCommand(w *worker) {
   179  	w.timer.Stop()
   180  	if cmd.d <= 0 {
   181  		w.timer = time.NewTicker(defaultReportingDuration)
   182  	} else {
   183  		w.timer = time.NewTicker(cmd.d)
   184  	}
   185  	cmd.c <- true
   186  }
   187  

View as plain text