...

Source file src/cloud.google.com/go/pubsub/trace.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2018 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"context"
    19  	"log"
    20  	"sync"
    21  
    22  	"go.opencensus.io/stats"
    23  	"go.opencensus.io/stats/view"
    24  	"go.opencensus.io/tag"
    25  )
    26  
    27  // The following keys are used to tag requests with a specific topic/subscription ID.
    28  var (
    29  	keyTopic        = tag.MustNewKey("topic")
    30  	keySubscription = tag.MustNewKey("subscription")
    31  )
    32  
    33  // In the following, errors are used if status is not "OK".
    34  var (
    35  	keyStatus = tag.MustNewKey("status")
    36  	keyError  = tag.MustNewKey("error")
    37  )
    38  
    39  const statsPrefix = "cloud.google.com/go/pubsub/"
    40  
    41  // The following are measures recorded in publish/subscribe flows.
    42  var (
    43  	// PublishedMessages is a measure of the number of messages published, which may include errors.
    44  	// It is EXPERIMENTAL and subject to change or removal without notice.
    45  	PublishedMessages = stats.Int64(statsPrefix+"published_messages", "Number of PubSub message published", stats.UnitDimensionless)
    46  
    47  	// PublishLatency is a measure of the number of milliseconds it took to publish a bundle,
    48  	// which may consist of one or more messages.
    49  	// It is EXPERIMENTAL and subject to change or removal without notice.
    50  	PublishLatency = stats.Float64(statsPrefix+"publish_roundtrip_latency", "The latency in milliseconds per publish batch", stats.UnitMilliseconds)
    51  
    52  	// PullCount is a measure of the number of messages pulled.
    53  	// It is EXPERIMENTAL and subject to change or removal without notice.
    54  	PullCount = stats.Int64(statsPrefix+"pull_count", "Number of PubSub messages pulled", stats.UnitDimensionless)
    55  
    56  	// AckCount is a measure of the number of messages acked.
    57  	// It is EXPERIMENTAL and subject to change or removal without notice.
    58  	AckCount = stats.Int64(statsPrefix+"ack_count", "Number of PubSub messages acked", stats.UnitDimensionless)
    59  
    60  	// NackCount is a measure of the number of messages nacked.
    61  	// It is EXPERIMENTAL and subject to change or removal without notice.
    62  	NackCount = stats.Int64(statsPrefix+"nack_count", "Number of PubSub messages nacked", stats.UnitDimensionless)
    63  
    64  	// ModAckCount is a measure of the number of messages whose ack-deadline was modified.
    65  	// It is EXPERIMENTAL and subject to change or removal without notice.
    66  	ModAckCount = stats.Int64(statsPrefix+"mod_ack_count", "Number of ack-deadlines modified", stats.UnitDimensionless)
    67  
    68  	// ModAckTimeoutCount is a measure of the number ModifyAckDeadline RPCs that timed out.
    69  	// It is EXPERIMENTAL and subject to change or removal without notice.
    70  	ModAckTimeoutCount = stats.Int64(statsPrefix+"mod_ack_timeout_count", "Number of ModifyAckDeadline RPCs that timed out", stats.UnitDimensionless)
    71  
    72  	// StreamOpenCount is a measure of the number of times a streaming-pull stream was opened.
    73  	// It is EXPERIMENTAL and subject to change or removal without notice.
    74  	StreamOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of calls opening a new streaming pull", stats.UnitDimensionless)
    75  
    76  	// StreamRetryCount is a measure of the number of times a streaming-pull operation was retried.
    77  	// It is EXPERIMENTAL and subject to change or removal without notice.
    78  	StreamRetryCount = stats.Int64(statsPrefix+"stream_retry_count", "Number of retries of a stream send or receive", stats.UnitDimensionless)
    79  
    80  	// StreamRequestCount is a measure of the number of requests sent on a streaming-pull stream.
    81  	// It is EXPERIMENTAL and subject to change or removal without notice.
    82  	StreamRequestCount = stats.Int64(statsPrefix+"stream_request_count", "Number gRPC StreamingPull request messages sent", stats.UnitDimensionless)
    83  
    84  	// StreamResponseCount is a measure of the number of responses received on a streaming-pull stream.
    85  	// It is EXPERIMENTAL and subject to change or removal without notice.
    86  	StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless)
    87  
    88  	// OutstandingMessages is a measure of the number of outstanding messages held by the client before they are processed.
    89  	// It is EXPERIMENTAL and subject to change or removal without notice.
    90  	OutstandingMessages = stats.Int64(statsPrefix+"outstanding_messages", "Number of outstanding Pub/Sub messages", stats.UnitDimensionless)
    91  
    92  	// OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up.
    93  	// It is EXPERIMENTAL and subject to change or removal without notice.
    94  	OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless)
    95  
    96  	// PublisherOutstandingMessages is a measure of the number of published outstanding messages held by the client before they are processed.
    97  	// It is EXPERIMENTAL and subject to change or removal without notice.
    98  	PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless)
    99  
   100  	// PublisherOutstandingBytes is a measure of the number of bytes all outstanding publish messages held by the client take up.
   101  	// It is EXPERIMENTAL and subject to change or removal without notice.
   102  	PublisherOutstandingBytes = stats.Int64(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", stats.UnitDimensionless)
   103  )
   104  
   105  var (
   106  	// PublishedMessagesView is a cumulative sum of PublishedMessages.
   107  	// It is EXPERIMENTAL and subject to change or removal without notice.
   108  	PublishedMessagesView *view.View
   109  
   110  	// PublishLatencyView is a distribution of PublishLatency.
   111  	// It is EXPERIMENTAL and subject to change or removal without notice.
   112  	PublishLatencyView *view.View
   113  
   114  	// PullCountView is a cumulative sum of PullCount.
   115  	// It is EXPERIMENTAL and subject to change or removal without notice.
   116  	PullCountView *view.View
   117  
   118  	// AckCountView is a cumulative sum of AckCount.
   119  	// It is EXPERIMENTAL and subject to change or removal without notice.
   120  	AckCountView *view.View
   121  
   122  	// NackCountView is a cumulative sum of NackCount.
   123  	// It is EXPERIMENTAL and subject to change or removal without notice.
   124  	NackCountView *view.View
   125  
   126  	// ModAckCountView is a cumulative sum of ModAckCount.
   127  	// It is EXPERIMENTAL and subject to change or removal without notice.
   128  	ModAckCountView *view.View
   129  
   130  	// ModAckTimeoutCountView is a cumulative sum of ModAckTimeoutCount.
   131  	// It is EXPERIMENTAL and subject to change or removal without notice.
   132  	ModAckTimeoutCountView *view.View
   133  
   134  	// StreamOpenCountView is a cumulative sum of StreamOpenCount.
   135  	// It is EXPERIMENTAL and subject to change or removal without notice.
   136  	StreamOpenCountView *view.View
   137  
   138  	// StreamRetryCountView is a cumulative sum of StreamRetryCount.
   139  	// It is EXPERIMENTAL and subject to change or removal without notice.
   140  	StreamRetryCountView *view.View
   141  
   142  	// StreamRequestCountView is a cumulative sum of StreamRequestCount.
   143  	// It is EXPERIMENTAL and subject to change or removal without notice.
   144  	StreamRequestCountView *view.View
   145  
   146  	// StreamResponseCountView is a cumulative sum of StreamResponseCount.
   147  	// It is EXPERIMENTAL and subject to change or removal without notice.
   148  	StreamResponseCountView *view.View
   149  
   150  	// OutstandingMessagesView is the last value of OutstandingMessages
   151  	// It is EXPERIMENTAL and subject to change or removal without notice.
   152  	OutstandingMessagesView *view.View
   153  
   154  	// OutstandingBytesView is the last value of OutstandingBytes
   155  	// It is EXPERIMENTAL and subject to change or removal without notice.
   156  	OutstandingBytesView *view.View
   157  
   158  	// PublisherOutstandingMessagesView is the last value of OutstandingMessages
   159  	// It is EXPERIMENTAL and subject to change or removal without notice.
   160  	PublisherOutstandingMessagesView *view.View
   161  
   162  	// PublisherOutstandingBytesView is the last value of OutstandingBytes
   163  	// It is EXPERIMENTAL and subject to change or removal without notice.
   164  	PublisherOutstandingBytesView *view.View
   165  )
   166  
   167  func init() {
   168  	PublishedMessagesView = createCountView(stats.Measure(PublishedMessages), keyTopic, keyStatus, keyError)
   169  	PublishLatencyView = createDistView(PublishLatency, keyTopic, keyStatus, keyError)
   170  	PublisherOutstandingMessagesView = createLastValueView(PublisherOutstandingMessages, keyTopic)
   171  	PublisherOutstandingBytesView = createLastValueView(PublisherOutstandingBytes, keyTopic)
   172  	PullCountView = createCountView(PullCount, keySubscription)
   173  	AckCountView = createCountView(AckCount, keySubscription)
   174  	NackCountView = createCountView(NackCount, keySubscription)
   175  	ModAckCountView = createCountView(ModAckCount, keySubscription)
   176  	ModAckTimeoutCountView = createCountView(ModAckTimeoutCount, keySubscription)
   177  	StreamOpenCountView = createCountView(StreamOpenCount, keySubscription)
   178  	StreamRetryCountView = createCountView(StreamRetryCount, keySubscription)
   179  	StreamRequestCountView = createCountView(StreamRequestCount, keySubscription)
   180  	StreamResponseCountView = createCountView(StreamResponseCount, keySubscription)
   181  	OutstandingMessagesView = createLastValueView(OutstandingMessages, keySubscription)
   182  	OutstandingBytesView = createLastValueView(OutstandingBytes, keySubscription)
   183  
   184  	DefaultPublishViews = []*view.View{
   185  		PublishedMessagesView,
   186  		PublishLatencyView,
   187  		PublisherOutstandingMessagesView,
   188  		PublisherOutstandingBytesView,
   189  	}
   190  
   191  	DefaultSubscribeViews = []*view.View{
   192  		PullCountView,
   193  		AckCountView,
   194  		NackCountView,
   195  		ModAckCountView,
   196  		ModAckTimeoutCountView,
   197  		StreamOpenCountView,
   198  		StreamRetryCountView,
   199  		StreamRequestCountView,
   200  		StreamResponseCountView,
   201  		OutstandingMessagesView,
   202  		OutstandingBytesView,
   203  	}
   204  }
   205  
   206  // These arrays hold the default OpenCensus views that keep track of publish/subscribe operations.
   207  // It is EXPERIMENTAL and subject to change or removal without notice.
   208  var (
   209  	DefaultPublishViews   []*view.View
   210  	DefaultSubscribeViews []*view.View
   211  )
   212  
   213  func createCountView(m stats.Measure, keys ...tag.Key) *view.View {
   214  	return &view.View{
   215  		Name:        m.Name(),
   216  		Description: m.Description(),
   217  		TagKeys:     keys,
   218  		Measure:     m,
   219  		Aggregation: view.Sum(),
   220  	}
   221  }
   222  
   223  func createDistView(m stats.Measure, keys ...tag.Key) *view.View {
   224  	return &view.View{
   225  		Name:        m.Name(),
   226  		Description: m.Description(),
   227  		TagKeys:     keys,
   228  		Measure:     m,
   229  		Aggregation: view.Distribution(0, 25, 50, 75, 100, 200, 400, 600, 800, 1000, 2000, 4000, 6000),
   230  	}
   231  }
   232  
   233  func createLastValueView(m stats.Measure, keys ...tag.Key) *view.View {
   234  	return &view.View{
   235  		Name:        m.Name(),
   236  		Description: m.Description(),
   237  		TagKeys:     keys,
   238  		Measure:     m,
   239  		Aggregation: view.LastValue(),
   240  	}
   241  }
   242  
   243  var logOnce sync.Once
   244  
   245  // withSubscriptionKey returns a new context modified with the subscriptionKey tag map.
   246  func withSubscriptionKey(ctx context.Context, subName string) context.Context {
   247  	ctx, err := tag.New(ctx, tag.Upsert(keySubscription, subName))
   248  	if err != nil {
   249  		logOnce.Do(func() {
   250  			log.Printf("pubsub: error creating tag map for 'subscribe' key: %v", err)
   251  		})
   252  	}
   253  	return ctx
   254  }
   255  
   256  func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
   257  	stats.Record(ctx, m.M(n))
   258  }
   259  

View as plain text