...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/instrumentation.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  
    20  	"go.opencensus.io/stats"
    21  	"go.opencensus.io/stats/view"
    22  	"go.opencensus.io/tag"
    23  )
    24  
    25  var (
    26  	// Metrics on a stream are tagged with the stream ID.
    27  	keyStream = tag.MustNewKey("streamID")
    28  
    29  	// We allow users to annotate streams with a data origin for monitoring purposes.
    30  	// See the WithDataOrigin writer option for providing this.
    31  	keyDataOrigin = tag.MustNewKey("dataOrigin")
    32  
    33  	// keyError tags metrics using the status code of returned errors.
    34  	keyError = tag.MustNewKey("error")
    35  )
    36  
    37  // DefaultOpenCensusViews retains the set of all opencensus views that this
    38  // library has instrumented, to add view registration for exporters.
    39  var DefaultOpenCensusViews []*view.View
    40  
    41  const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"
    42  
    43  var (
    44  	// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
    45  	// It is EXPERIMENTAL and subject to change or removal without notice.
    46  	AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)
    47  
    48  	// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
    49  	// It is EXPERIMENTAL and subject to change or removal without notice.
    50  	AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)
    51  
    52  	// AppendRequests is a measure of the number of append requests sent.
    53  	// It is EXPERIMENTAL and subject to change or removal without notice.
    54  	AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)
    55  
    56  	// AppendRequestBytes is a measure of the bytes sent as append requests.
    57  	// It is EXPERIMENTAL and subject to change or removal without notice.
    58  	AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes)
    59  
    60  	// AppendRequestErrors is a measure of the number of append requests that errored on send.
    61  	// It is EXPERIMENTAL and subject to change or removal without notice.
    62  	AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless)
    63  
    64  	// AppendRequestReconnects is a measure of the number of times that sending an append request triggered reconnect.
    65  	// It is EXPERIMENTAL and subject to change or removal without notice.
    66  	AppendRequestReconnects = stats.Int64(statsPrefix+"append_reconnections", "Number of append rows reconnections", stats.UnitDimensionless)
    67  
    68  	// AppendRequestRows is a measure of the number of append rows sent.
    69  	// It is EXPERIMENTAL and subject to change or removal without notice.
    70  	AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless)
    71  
    72  	// AppendResponses is a measure of the number of append responses received.
    73  	// It is EXPERIMENTAL and subject to change or removal without notice.
    74  	AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)
    75  
    76  	// AppendResponseErrors is a measure of the number of append responses received with an error attached.
    77  	// It is EXPERIMENTAL and subject to change or removal without notice.
    78  	AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless)
    79  
    80  	// AppendRetryCount is a measure of the number of appends that were automatically retried by the library
    81  	// after receiving a non-successful response.
    82  	// It is EXPERIMENTAL and subject to change or removal without notice.
    83  	AppendRetryCount = stats.Int64(statsPrefix+"append_retry_count", "Number of appends that were retried", stats.UnitDimensionless)
    84  
    85  	// FlushRequests is a measure of the number of FlushRows requests sent.
    86  	// It is EXPERIMENTAL and subject to change or removal without notice.
    87  	FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
    88  )
    89  
    90  var (
    91  
    92  	// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
    93  	// It is EXPERIMENTAL and subject to change or removal without notice.
    94  	AppendClientOpenView *view.View
    95  
    96  	// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
    97  	// It is EXPERIMENTAL and subject to change or removal without notice.
    98  	AppendClientOpenRetryView *view.View
    99  
   100  	// AppendRequestsView is a cumulative sum of AppendRequests.
   101  	// It is EXPERIMENTAL and subject to change or removal without notice.
   102  	AppendRequestsView *view.View
   103  
   104  	// AppendRequestBytesView is a cumulative sum of AppendRequestBytes.
   105  	// It is EXPERIMENTAL and subject to change or removal without notice.
   106  	AppendRequestBytesView *view.View
   107  
   108  	// AppendRequestErrorsView is a cumulative sum of AppendRequestErrors.
   109  	// It is EXPERIMENTAL and subject to change or removal without notice.
   110  	AppendRequestErrorsView *view.View
   111  
   112  	// AppendRequestReconnectsView is a cumulative sum of AppendRequestReconnects.
   113  	// It is EXPERIMENTAL and subject to change or removal without notice.
   114  	AppendRequestReconnectsView *view.View
   115  
   116  	// AppendRequestRowsView is a cumulative sum of AppendRows.
   117  	// It is EXPERIMENTAL and subject to change or removal without notice.
   118  	AppendRequestRowsView *view.View
   119  
   120  	// AppendResponsesView is a cumulative sum of AppendResponses.
   121  	// It is EXPERIMENTAL and subject to change or removal without notice.
   122  	AppendResponsesView *view.View
   123  
   124  	// AppendResponseErrorsView is a cumulative sum of AppendResponseErrors.
   125  	// It is EXPERIMENTAL and subject to change or removal without notice.
   126  	AppendResponseErrorsView *view.View
   127  
   128  	// AppendRetryView is a cumulative sum of AppendRetryCount.
   129  	// It is EXPERIMENTAL and subject to change or removal without notice.
   130  	AppendRetryView *view.View
   131  
   132  	// FlushRequestsView is a cumulative sum of FlushRequests.
   133  	// It is EXPERIMENTAL and subject to change or removal without notice.
   134  	FlushRequestsView *view.View
   135  )
   136  
   137  func init() {
   138  	AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyError)
   139  	AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount))
   140  
   141  	AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
   142  	AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin)
   143  	AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError)
   144  	AppendRequestReconnectsView = createSumView(stats.Measure(AppendRequestReconnects), keyStream, keyDataOrigin, keyError)
   145  	AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin)
   146  
   147  	AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
   148  	AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError)
   149  	AppendRetryView = createSumView(stats.Measure(AppendRetryCount), keyStream, keyDataOrigin)
   150  	FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)
   151  
   152  	DefaultOpenCensusViews = []*view.View{
   153  		AppendClientOpenView,
   154  		AppendClientOpenRetryView,
   155  
   156  		AppendRequestsView,
   157  		AppendRequestBytesView,
   158  		AppendRequestErrorsView,
   159  		AppendRequestReconnectsView,
   160  		AppendRequestRowsView,
   161  
   162  		AppendResponsesView,
   163  		AppendResponseErrorsView,
   164  		AppendRetryView,
   165  
   166  		FlushRequestsView,
   167  	}
   168  }
   169  
   170  func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View {
   171  	return &view.View{
   172  		Name:        m.Name(),
   173  		Description: m.Description(),
   174  		TagKeys:     keys,
   175  		Measure:     m,
   176  		Aggregation: agg,
   177  	}
   178  }
   179  
   180  func createSumView(m stats.Measure, keys ...tag.Key) *view.View {
   181  	return createView(m, view.Sum(), keys...)
   182  }
   183  
   184  // setupWriterStatContext returns a new context modified with the instrumentation tags.
   185  // This will panic if no managedstream is provided
   186  func setupWriterStatContext(ms *ManagedStream) context.Context {
   187  	if ms == nil {
   188  		panic("no ManagedStream provided")
   189  	}
   190  	kCtx := ms.ctx
   191  	if ms.streamSettings == nil {
   192  		return kCtx
   193  	}
   194  	if ms.streamSettings.streamID != "" {
   195  		ctx, err := tag.New(kCtx, tag.Upsert(keyStream, ms.streamSettings.streamID))
   196  		if err != nil {
   197  			return kCtx // failed to add a tag, return the original context.
   198  		}
   199  		kCtx = ctx
   200  	}
   201  	if ms.streamSettings.dataOrigin != "" {
   202  		ctx, err := tag.New(kCtx, tag.Upsert(keyDataOrigin, ms.streamSettings.dataOrigin))
   203  		if err != nil {
   204  			return kCtx
   205  		}
   206  		kCtx = ctx
   207  	}
   208  	return kCtx
   209  }
   210  
   211  // recordWriterStat records a measure which may optionally contain writer-related tags like stream ID
   212  // or data origin.
   213  func recordWriterStat(ms *ManagedStream, m *stats.Int64Measure, n int64) {
   214  	stats.Record(ms.ctx, m.M(n))
   215  }
   216  
   217  func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
   218  	stats.Record(ctx, m.M(n))
   219  }
   220  

View as plain text