...

Source file src/go.opencensus.io/zpages/rpcz.go

Documentation: go.opencensus.io/zpages

     1  // Copyright 2017, OpenCensus Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package zpages
    17  
    18  import (
    19  	"fmt"
    20  	"io"
    21  	"log"
    22  	"math"
    23  	"net/http"
    24  	"sort"
    25  	"sync"
    26  	"text/tabwriter"
    27  	"time"
    28  
    29  	"go.opencensus.io/plugin/ocgrpc"
    30  	"go.opencensus.io/stats/view"
    31  )
    32  
    33  const bytesPerKb = 1024
    34  
    35  var (
    36  	programStartTime = time.Now()
    37  	mu               sync.Mutex // protects snaps
    38  	snaps            = make(map[methodKey]*statSnapshot)
    39  
    40  	// viewType lists the views we are interested in for RPC stats.
    41  	// A view's map value indicates whether that view contains data for received
    42  	// RPCs.
    43  	viewType = map[*view.View]bool{
    44  		ocgrpc.ClientCompletedRPCsView:          false,
    45  		ocgrpc.ClientSentBytesPerRPCView:        false,
    46  		ocgrpc.ClientSentMessagesPerRPCView:     false,
    47  		ocgrpc.ClientReceivedBytesPerRPCView:    false,
    48  		ocgrpc.ClientReceivedMessagesPerRPCView: false,
    49  		ocgrpc.ClientRoundtripLatencyView:       false,
    50  		ocgrpc.ServerCompletedRPCsView:          true,
    51  		ocgrpc.ServerReceivedBytesPerRPCView:    true,
    52  		ocgrpc.ServerReceivedMessagesPerRPCView: true,
    53  		ocgrpc.ServerSentBytesPerRPCView:        true,
    54  		ocgrpc.ServerSentMessagesPerRPCView:     true,
    55  		ocgrpc.ServerLatencyView:                true,
    56  	}
    57  )
    58  
    59  func registerRPCViews() {
    60  	views := make([]*view.View, 0, len(viewType))
    61  	for v := range viewType {
    62  		views = append(views, v)
    63  	}
    64  	if err := view.Register(views...); err != nil {
    65  		log.Printf("error subscribing to views: %v", err)
    66  	}
    67  	view.RegisterExporter(snapExporter{})
    68  }
    69  
    70  func rpczHandler(w http.ResponseWriter, r *http.Request) {
    71  	w.Header().Set("Content-Type", "text/html; charset=utf-8")
    72  	WriteHTMLRpczPage(w)
    73  }
    74  
    75  // WriteHTMLRpczPage writes an HTML document to w containing per-method RPC stats.
    76  func WriteHTMLRpczPage(w io.Writer) {
    77  	if err := headerTemplate.Execute(w, headerData{Title: "RPC Stats"}); err != nil {
    78  		log.Printf("zpages: executing template: %v", err)
    79  	}
    80  	WriteHTMLRpczSummary(w)
    81  	if err := footerTemplate.Execute(w, nil); err != nil {
    82  		log.Printf("zpages: executing template: %v", err)
    83  	}
    84  }
    85  
    86  // WriteHTMLRpczSummary writes HTML to w containing per-method RPC stats.
    87  //
    88  // It includes neither a header nor footer, so you can embed this data in other pages.
    89  func WriteHTMLRpczSummary(w io.Writer) {
    90  	mu.Lock()
    91  	if err := statsTemplate.Execute(w, getStatsPage()); err != nil {
    92  		log.Printf("zpages: executing template: %v", err)
    93  	}
    94  	mu.Unlock()
    95  }
    96  
    97  // WriteTextRpczPage writes formatted text to w containing per-method RPC stats.
    98  func WriteTextRpczPage(w io.Writer) {
    99  	mu.Lock()
   100  	defer mu.Unlock()
   101  	page := getStatsPage()
   102  
   103  	for i, sg := range page.StatGroups {
   104  		switch i {
   105  		case 0:
   106  			fmt.Fprint(w, "Sent:\n")
   107  		case 1:
   108  			fmt.Fprint(w, "\nReceived:\n")
   109  		}
   110  		tw := tabwriter.NewWriter(w, 6, 8, 1, ' ', 0)
   111  		fmt.Fprint(tw, "Method\tCount\t\t\tAvgLat\t\t\tMaxLat\t\t\tRate\t\t\tIn (MiB/s)\t\t\tOut (MiB/s)\t\t\tErrors\t\t\n")
   112  		fmt.Fprint(tw, "\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\n")
   113  		for _, s := range sg.Snapshots {
   114  			fmt.Fprintf(tw, "%s\t%d\t%d\t%d\t%v\t%v\t%v\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%d\t%d\t%d\n",
   115  				s.Method,
   116  				s.CountMinute,
   117  				s.CountHour,
   118  				s.CountTotal,
   119  				s.AvgLatencyMinute,
   120  				s.AvgLatencyHour,
   121  				s.AvgLatencyTotal,
   122  				s.RPCRateMinute,
   123  				s.RPCRateHour,
   124  				s.RPCRateTotal,
   125  				s.InputRateMinute/bytesPerKb,
   126  				s.InputRateHour/bytesPerKb,
   127  				s.InputRateTotal/bytesPerKb,
   128  				s.OutputRateMinute/bytesPerKb,
   129  				s.OutputRateHour/bytesPerKb,
   130  				s.OutputRateTotal/bytesPerKb,
   131  				s.ErrorsMinute,
   132  				s.ErrorsHour,
   133  				s.ErrorsTotal)
   134  		}
   135  		tw.Flush()
   136  	}
   137  }
   138  
   139  // headerData contains data for the header template.
   140  type headerData struct {
   141  	Title string
   142  }
   143  
   144  // statsPage aggregates stats on the page for 'sent' and 'received' categories
   145  type statsPage struct {
   146  	StatGroups []*statGroup
   147  }
   148  
   149  // statGroup aggregates snapshots for a directional category
   150  type statGroup struct {
   151  	Direction string
   152  	Snapshots []*statSnapshot
   153  }
   154  
   155  func (s *statGroup) Len() int {
   156  	return len(s.Snapshots)
   157  }
   158  
   159  func (s *statGroup) Swap(i, j int) {
   160  	s.Snapshots[i], s.Snapshots[j] = s.Snapshots[j], s.Snapshots[i]
   161  }
   162  
   163  func (s *statGroup) Less(i, j int) bool {
   164  	return s.Snapshots[i].Method < s.Snapshots[j].Method
   165  }
   166  
   167  // statSnapshot holds the data items that are presented in a single row of RPC
   168  // stat information.
   169  type statSnapshot struct {
   170  	// TODO: compute hour/minute values from cumulative
   171  	Method           string
   172  	Received         bool
   173  	CountMinute      uint64
   174  	CountHour        uint64
   175  	CountTotal       uint64
   176  	AvgLatencyMinute time.Duration
   177  	AvgLatencyHour   time.Duration
   178  	AvgLatencyTotal  time.Duration
   179  	RPCRateMinute    float64
   180  	RPCRateHour      float64
   181  	RPCRateTotal     float64
   182  	InputRateMinute  float64
   183  	InputRateHour    float64
   184  	InputRateTotal   float64
   185  	OutputRateMinute float64
   186  	OutputRateHour   float64
   187  	OutputRateTotal  float64
   188  	ErrorsMinute     uint64
   189  	ErrorsHour       uint64
   190  	ErrorsTotal      uint64
   191  }
   192  
   193  type methodKey struct {
   194  	method   string
   195  	received bool
   196  }
   197  
   198  type snapExporter struct{}
   199  
   200  func (s snapExporter) ExportView(vd *view.Data) {
   201  	received, ok := viewType[vd.View]
   202  	if !ok {
   203  		return
   204  	}
   205  	if len(vd.Rows) == 0 {
   206  		return
   207  	}
   208  	ageSec := float64(time.Since(programStartTime)) / float64(time.Second)
   209  
   210  	computeRate := func(maxSec, x float64) float64 {
   211  		dur := ageSec
   212  		if maxSec > 0 && dur > maxSec {
   213  			dur = maxSec
   214  		}
   215  		return x / dur
   216  	}
   217  
   218  	convertTime := func(ms float64) time.Duration {
   219  		if math.IsInf(ms, 0) || math.IsNaN(ms) {
   220  			return 0
   221  		}
   222  		return time.Duration(float64(time.Millisecond) * ms)
   223  	}
   224  
   225  	haveResetErrors := make(map[string]struct{})
   226  
   227  	mu.Lock()
   228  	defer mu.Unlock()
   229  	for _, row := range vd.Rows {
   230  		var method string
   231  		for _, tag := range row.Tags {
   232  			if tag.Key == ocgrpc.KeyClientMethod || tag.Key == ocgrpc.KeyServerMethod {
   233  				method = tag.Value
   234  				break
   235  			}
   236  		}
   237  
   238  		key := methodKey{method: method, received: received}
   239  		s := snaps[key]
   240  		if s == nil {
   241  			s = &statSnapshot{Method: method, Received: received}
   242  			snaps[key] = s
   243  		}
   244  
   245  		var (
   246  			sum   float64
   247  			count float64
   248  		)
   249  		switch v := row.Data.(type) {
   250  		case *view.CountData:
   251  			sum = float64(v.Value)
   252  			count = float64(v.Value)
   253  		case *view.DistributionData:
   254  			sum = v.Sum()
   255  			count = float64(v.Count)
   256  		case *view.SumData:
   257  			sum = v.Value
   258  			count = v.Value
   259  		}
   260  
   261  		// Update field of s corresponding to the view.
   262  		switch vd.View {
   263  		case ocgrpc.ClientCompletedRPCsView:
   264  			if _, ok := haveResetErrors[method]; !ok {
   265  				haveResetErrors[method] = struct{}{}
   266  				s.ErrorsTotal = 0
   267  			}
   268  			for _, tag := range row.Tags {
   269  				if tag.Key == ocgrpc.KeyClientStatus && tag.Value != "OK" {
   270  					s.ErrorsTotal += uint64(count)
   271  				}
   272  			}
   273  
   274  		case ocgrpc.ClientRoundtripLatencyView:
   275  			s.AvgLatencyTotal = convertTime(sum / count)
   276  
   277  		case ocgrpc.ClientSentBytesPerRPCView:
   278  			s.OutputRateTotal = computeRate(0, sum)
   279  
   280  		case ocgrpc.ClientReceivedBytesPerRPCView:
   281  			s.InputRateTotal = computeRate(0, sum)
   282  
   283  		case ocgrpc.ClientSentMessagesPerRPCView:
   284  			s.CountTotal = uint64(count)
   285  			s.RPCRateTotal = computeRate(0, count)
   286  
   287  		case ocgrpc.ClientReceivedMessagesPerRPCView:
   288  			// currently unused
   289  
   290  		case ocgrpc.ServerCompletedRPCsView:
   291  			if _, ok := haveResetErrors[method]; !ok {
   292  				haveResetErrors[method] = struct{}{}
   293  				s.ErrorsTotal = 0
   294  			}
   295  			for _, tag := range row.Tags {
   296  				if tag.Key == ocgrpc.KeyServerStatus && tag.Value != "OK" {
   297  					s.ErrorsTotal += uint64(count)
   298  				}
   299  			}
   300  
   301  		case ocgrpc.ServerLatencyView:
   302  			s.AvgLatencyTotal = convertTime(sum / count)
   303  
   304  		case ocgrpc.ServerSentBytesPerRPCView:
   305  			s.OutputRateTotal = computeRate(0, sum)
   306  
   307  		case ocgrpc.ServerReceivedMessagesPerRPCView:
   308  			s.CountTotal = uint64(count)
   309  			s.RPCRateTotal = computeRate(0, count)
   310  
   311  		case ocgrpc.ServerSentMessagesPerRPCView:
   312  			// currently unused
   313  		}
   314  	}
   315  }
   316  
   317  func getStatsPage() *statsPage {
   318  	sentStats := statGroup{Direction: "Sent"}
   319  	receivedStats := statGroup{Direction: "Received"}
   320  	for key, sg := range snaps {
   321  		if key.received {
   322  			receivedStats.Snapshots = append(receivedStats.Snapshots, sg)
   323  		} else {
   324  			sentStats.Snapshots = append(sentStats.Snapshots, sg)
   325  		}
   326  	}
   327  	sort.Sort(&sentStats)
   328  	sort.Sort(&receivedStats)
   329  
   330  	return &statsPage{
   331  		StatGroups: []*statGroup{&sentStats, &receivedStats},
   332  	}
   333  }
   334  

View as plain text