...

Source file src/go.opencensus.io/trace/spanstore.go

Documentation: go.opencensus.io/trace

     1  // Copyright 2017, OpenCensus Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package trace
    16  
    17  import (
    18  	"sync"
    19  	"time"
    20  
    21  	"go.opencensus.io/internal"
    22  )
    23  
    24  const (
    25  	maxBucketSize     = 100000
    26  	defaultBucketSize = 10
    27  )
    28  
    29  var (
    30  	ssmu       sync.RWMutex // protects spanStores
    31  	spanStores = make(map[string]*spanStore)
    32  )
    33  
    34  // This exists purely to avoid exposing internal methods used by z-Pages externally.
    35  type internalOnly struct{}
    36  
    37  func init() {
    38  	//TODO(#412): remove
    39  	internal.Trace = &internalOnly{}
    40  }
    41  
    42  // ReportActiveSpans returns the active spans for the given name.
    43  func (i internalOnly) ReportActiveSpans(name string) []*SpanData {
    44  	s := spanStoreForName(name)
    45  	if s == nil {
    46  		return nil
    47  	}
    48  	var out []*SpanData
    49  	s.mu.Lock()
    50  	defer s.mu.Unlock()
    51  	for activeSpan := range s.active {
    52  		if s, ok := activeSpan.(*span); ok {
    53  			out = append(out, s.makeSpanData())
    54  		}
    55  	}
    56  	return out
    57  }
    58  
    59  // ReportSpansByError returns a sample of error spans.
    60  //
    61  // If code is nonzero, only spans with that status code are returned.
    62  func (i internalOnly) ReportSpansByError(name string, code int32) []*SpanData {
    63  	s := spanStoreForName(name)
    64  	if s == nil {
    65  		return nil
    66  	}
    67  	var out []*SpanData
    68  	s.mu.Lock()
    69  	defer s.mu.Unlock()
    70  	if code != 0 {
    71  		if b, ok := s.errors[code]; ok {
    72  			for _, sd := range b.buffer {
    73  				if sd == nil {
    74  					break
    75  				}
    76  				out = append(out, sd)
    77  			}
    78  		}
    79  	} else {
    80  		for _, b := range s.errors {
    81  			for _, sd := range b.buffer {
    82  				if sd == nil {
    83  					break
    84  				}
    85  				out = append(out, sd)
    86  			}
    87  		}
    88  	}
    89  	return out
    90  }
    91  
    92  // ConfigureBucketSizes sets the number of spans to keep per latency and error
    93  // bucket for different span names.
    94  func (i internalOnly) ConfigureBucketSizes(bcs []internal.BucketConfiguration) {
    95  	for _, bc := range bcs {
    96  		latencyBucketSize := bc.MaxRequestsSucceeded
    97  		if latencyBucketSize < 0 {
    98  			latencyBucketSize = 0
    99  		}
   100  		if latencyBucketSize > maxBucketSize {
   101  			latencyBucketSize = maxBucketSize
   102  		}
   103  		errorBucketSize := bc.MaxRequestsErrors
   104  		if errorBucketSize < 0 {
   105  			errorBucketSize = 0
   106  		}
   107  		if errorBucketSize > maxBucketSize {
   108  			errorBucketSize = maxBucketSize
   109  		}
   110  		spanStoreSetSize(bc.Name, latencyBucketSize, errorBucketSize)
   111  	}
   112  }
   113  
   114  // ReportSpansPerMethod returns a summary of what spans are being stored for each span name.
   115  func (i internalOnly) ReportSpansPerMethod() map[string]internal.PerMethodSummary {
   116  	out := make(map[string]internal.PerMethodSummary)
   117  	ssmu.RLock()
   118  	defer ssmu.RUnlock()
   119  	for name, s := range spanStores {
   120  		s.mu.Lock()
   121  		p := internal.PerMethodSummary{
   122  			Active: len(s.active),
   123  		}
   124  		for code, b := range s.errors {
   125  			p.ErrorBuckets = append(p.ErrorBuckets, internal.ErrorBucketSummary{
   126  				ErrorCode: code,
   127  				Size:      b.size(),
   128  			})
   129  		}
   130  		for i, b := range s.latency {
   131  			min, max := latencyBucketBounds(i)
   132  			p.LatencyBuckets = append(p.LatencyBuckets, internal.LatencyBucketSummary{
   133  				MinLatency: min,
   134  				MaxLatency: max,
   135  				Size:       b.size(),
   136  			})
   137  		}
   138  		s.mu.Unlock()
   139  		out[name] = p
   140  	}
   141  	return out
   142  }
   143  
   144  // ReportSpansByLatency returns a sample of successful spans.
   145  //
   146  // minLatency is the minimum latency of spans to be returned.
   147  // maxLatency, if nonzero, is the maximum latency of spans to be returned.
   148  func (i internalOnly) ReportSpansByLatency(name string, minLatency, maxLatency time.Duration) []*SpanData {
   149  	s := spanStoreForName(name)
   150  	if s == nil {
   151  		return nil
   152  	}
   153  	var out []*SpanData
   154  	s.mu.Lock()
   155  	defer s.mu.Unlock()
   156  	for i, b := range s.latency {
   157  		min, max := latencyBucketBounds(i)
   158  		if i+1 != len(s.latency) && max <= minLatency {
   159  			continue
   160  		}
   161  		if maxLatency != 0 && maxLatency < min {
   162  			continue
   163  		}
   164  		for _, sd := range b.buffer {
   165  			if sd == nil {
   166  				break
   167  			}
   168  			if minLatency != 0 || maxLatency != 0 {
   169  				d := sd.EndTime.Sub(sd.StartTime)
   170  				if d < minLatency {
   171  					continue
   172  				}
   173  				if maxLatency != 0 && d > maxLatency {
   174  					continue
   175  				}
   176  			}
   177  			out = append(out, sd)
   178  		}
   179  	}
   180  	return out
   181  }
   182  
   183  // spanStore keeps track of spans stored for a particular span name.
   184  //
   185  // It contains all active spans; a sample of spans for failed requests,
   186  // categorized by error code; and a sample of spans for successful requests,
   187  // bucketed by latency.
   188  type spanStore struct {
   189  	mu                     sync.Mutex // protects everything below.
   190  	active                 map[SpanInterface]struct{}
   191  	errors                 map[int32]*bucket
   192  	latency                []bucket
   193  	maxSpansPerErrorBucket int
   194  }
   195  
   196  // newSpanStore creates a span store.
   197  func newSpanStore(name string, latencyBucketSize int, errorBucketSize int) *spanStore {
   198  	s := &spanStore{
   199  		active:                 make(map[SpanInterface]struct{}),
   200  		latency:                make([]bucket, len(defaultLatencies)+1),
   201  		maxSpansPerErrorBucket: errorBucketSize,
   202  	}
   203  	for i := range s.latency {
   204  		s.latency[i] = makeBucket(latencyBucketSize)
   205  	}
   206  	return s
   207  }
   208  
   209  // spanStoreForName returns the spanStore for the given name.
   210  //
   211  // It returns nil if it doesn't exist.
   212  func spanStoreForName(name string) *spanStore {
   213  	var s *spanStore
   214  	ssmu.RLock()
   215  	s, _ = spanStores[name]
   216  	ssmu.RUnlock()
   217  	return s
   218  }
   219  
   220  // spanStoreForNameCreateIfNew returns the spanStore for the given name.
   221  //
   222  // It creates it if it didn't exist.
   223  func spanStoreForNameCreateIfNew(name string) *spanStore {
   224  	ssmu.RLock()
   225  	s, ok := spanStores[name]
   226  	ssmu.RUnlock()
   227  	if ok {
   228  		return s
   229  	}
   230  	ssmu.Lock()
   231  	defer ssmu.Unlock()
   232  	s, ok = spanStores[name]
   233  	if ok {
   234  		return s
   235  	}
   236  	s = newSpanStore(name, defaultBucketSize, defaultBucketSize)
   237  	spanStores[name] = s
   238  	return s
   239  }
   240  
   241  // spanStoreSetSize resizes the spanStore for the given name.
   242  //
   243  // It creates it if it didn't exist.
   244  func spanStoreSetSize(name string, latencyBucketSize int, errorBucketSize int) {
   245  	ssmu.RLock()
   246  	s, ok := spanStores[name]
   247  	ssmu.RUnlock()
   248  	if ok {
   249  		s.resize(latencyBucketSize, errorBucketSize)
   250  		return
   251  	}
   252  	ssmu.Lock()
   253  	defer ssmu.Unlock()
   254  	s, ok = spanStores[name]
   255  	if ok {
   256  		s.resize(latencyBucketSize, errorBucketSize)
   257  		return
   258  	}
   259  	s = newSpanStore(name, latencyBucketSize, errorBucketSize)
   260  	spanStores[name] = s
   261  }
   262  
   263  func (s *spanStore) resize(latencyBucketSize int, errorBucketSize int) {
   264  	s.mu.Lock()
   265  	for i := range s.latency {
   266  		s.latency[i].resize(latencyBucketSize)
   267  	}
   268  	for _, b := range s.errors {
   269  		b.resize(errorBucketSize)
   270  	}
   271  	s.maxSpansPerErrorBucket = errorBucketSize
   272  	s.mu.Unlock()
   273  }
   274  
   275  // add adds a span to the active bucket of the spanStore.
   276  func (s *spanStore) add(span SpanInterface) {
   277  	s.mu.Lock()
   278  	s.active[span] = struct{}{}
   279  	s.mu.Unlock()
   280  }
   281  
   282  // finished removes a span from the active set, and adds a corresponding
   283  // SpanData to a latency or error bucket.
   284  func (s *spanStore) finished(span SpanInterface, sd *SpanData) {
   285  	latency := sd.EndTime.Sub(sd.StartTime)
   286  	if latency < 0 {
   287  		latency = 0
   288  	}
   289  	code := sd.Status.Code
   290  
   291  	s.mu.Lock()
   292  	delete(s.active, span)
   293  	if code == 0 {
   294  		s.latency[latencyBucket(latency)].add(sd)
   295  	} else {
   296  		if s.errors == nil {
   297  			s.errors = make(map[int32]*bucket)
   298  		}
   299  		if b := s.errors[code]; b != nil {
   300  			b.add(sd)
   301  		} else {
   302  			b := makeBucket(s.maxSpansPerErrorBucket)
   303  			s.errors[code] = &b
   304  			b.add(sd)
   305  		}
   306  	}
   307  	s.mu.Unlock()
   308  }
   309  

View as plain text