...

Source file src/go.opentelemetry.io/otel/internal/global/meter.go

Documentation: go.opentelemetry.io/otel/internal/global

     1  // Copyright The OpenTelemetry Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package global // import "go.opentelemetry.io/otel/internal/global"
    16  
    17  import (
    18  	"container/list"
    19  	"sync"
    20  	"sync/atomic"
    21  
    22  	"go.opentelemetry.io/otel/metric"
    23  	"go.opentelemetry.io/otel/metric/embedded"
    24  )
    25  
    26  // meterProvider is a placeholder for a configured SDK MeterProvider.
    27  //
    28  // All MeterProvider functionality is forwarded to a delegate once
    29  // configured.
    30  type meterProvider struct {
    31  	embedded.MeterProvider
    32  
    33  	mtx    sync.Mutex
    34  	meters map[il]*meter
    35  
    36  	delegate metric.MeterProvider
    37  }
    38  
    39  // setDelegate configures p to delegate all MeterProvider functionality to
    40  // provider.
    41  //
    42  // All Meters provided prior to this function call are switched out to be
    43  // Meters provided by provider. All instruments and callbacks are recreated and
    44  // delegated.
    45  //
    46  // It is guaranteed by the caller that this happens only once.
    47  func (p *meterProvider) setDelegate(provider metric.MeterProvider) {
    48  	p.mtx.Lock()
    49  	defer p.mtx.Unlock()
    50  
    51  	p.delegate = provider
    52  
    53  	if len(p.meters) == 0 {
    54  		return
    55  	}
    56  
    57  	for _, meter := range p.meters {
    58  		meter.setDelegate(provider)
    59  	}
    60  
    61  	p.meters = nil
    62  }
    63  
    64  // Meter implements MeterProvider.
    65  func (p *meterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter {
    66  	p.mtx.Lock()
    67  	defer p.mtx.Unlock()
    68  
    69  	if p.delegate != nil {
    70  		return p.delegate.Meter(name, opts...)
    71  	}
    72  
    73  	// At this moment it is guaranteed that no sdk is installed, save the meter in the meters map.
    74  
    75  	c := metric.NewMeterConfig(opts...)
    76  	key := il{
    77  		name:    name,
    78  		version: c.InstrumentationVersion(),
    79  	}
    80  
    81  	if p.meters == nil {
    82  		p.meters = make(map[il]*meter)
    83  	}
    84  
    85  	if val, ok := p.meters[key]; ok {
    86  		return val
    87  	}
    88  
    89  	t := &meter{name: name, opts: opts}
    90  	p.meters[key] = t
    91  	return t
    92  }
    93  
    94  // meter is a placeholder for a metric.Meter.
    95  //
    96  // All Meter functionality is forwarded to a delegate once configured.
    97  // Otherwise, all functionality is forwarded to a NoopMeter.
    98  type meter struct {
    99  	embedded.Meter
   100  
   101  	name string
   102  	opts []metric.MeterOption
   103  
   104  	mtx         sync.Mutex
   105  	instruments []delegatedInstrument
   106  
   107  	registry list.List
   108  
   109  	delegate atomic.Value // metric.Meter
   110  }
   111  
   112  type delegatedInstrument interface {
   113  	setDelegate(metric.Meter)
   114  }
   115  
   116  // setDelegate configures m to delegate all Meter functionality to Meters
   117  // created by provider.
   118  //
   119  // All subsequent calls to the Meter methods will be passed to the delegate.
   120  //
   121  // It is guaranteed by the caller that this happens only once.
   122  func (m *meter) setDelegate(provider metric.MeterProvider) {
   123  	meter := provider.Meter(m.name, m.opts...)
   124  	m.delegate.Store(meter)
   125  
   126  	m.mtx.Lock()
   127  	defer m.mtx.Unlock()
   128  
   129  	for _, inst := range m.instruments {
   130  		inst.setDelegate(meter)
   131  	}
   132  
   133  	for e := m.registry.Front(); e != nil; e = e.Next() {
   134  		r := e.Value.(*registration)
   135  		r.setDelegate(meter)
   136  		m.registry.Remove(e)
   137  	}
   138  
   139  	m.instruments = nil
   140  	m.registry.Init()
   141  }
   142  
   143  func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
   144  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   145  		return del.Int64Counter(name, options...)
   146  	}
   147  	m.mtx.Lock()
   148  	defer m.mtx.Unlock()
   149  	i := &siCounter{name: name, opts: options}
   150  	m.instruments = append(m.instruments, i)
   151  	return i, nil
   152  }
   153  
   154  func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) {
   155  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   156  		return del.Int64UpDownCounter(name, options...)
   157  	}
   158  	m.mtx.Lock()
   159  	defer m.mtx.Unlock()
   160  	i := &siUpDownCounter{name: name, opts: options}
   161  	m.instruments = append(m.instruments, i)
   162  	return i, nil
   163  }
   164  
   165  func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
   166  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   167  		return del.Int64Histogram(name, options...)
   168  	}
   169  	m.mtx.Lock()
   170  	defer m.mtx.Unlock()
   171  	i := &siHistogram{name: name, opts: options}
   172  	m.instruments = append(m.instruments, i)
   173  	return i, nil
   174  }
   175  
   176  func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
   177  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   178  		return del.Int64ObservableCounter(name, options...)
   179  	}
   180  	m.mtx.Lock()
   181  	defer m.mtx.Unlock()
   182  	i := &aiCounter{name: name, opts: options}
   183  	m.instruments = append(m.instruments, i)
   184  	return i, nil
   185  }
   186  
   187  func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
   188  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   189  		return del.Int64ObservableUpDownCounter(name, options...)
   190  	}
   191  	m.mtx.Lock()
   192  	defer m.mtx.Unlock()
   193  	i := &aiUpDownCounter{name: name, opts: options}
   194  	m.instruments = append(m.instruments, i)
   195  	return i, nil
   196  }
   197  
   198  func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
   199  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   200  		return del.Int64ObservableGauge(name, options...)
   201  	}
   202  	m.mtx.Lock()
   203  	defer m.mtx.Unlock()
   204  	i := &aiGauge{name: name, opts: options}
   205  	m.instruments = append(m.instruments, i)
   206  	return i, nil
   207  }
   208  
   209  func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
   210  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   211  		return del.Float64Counter(name, options...)
   212  	}
   213  	m.mtx.Lock()
   214  	defer m.mtx.Unlock()
   215  	i := &sfCounter{name: name, opts: options}
   216  	m.instruments = append(m.instruments, i)
   217  	return i, nil
   218  }
   219  
   220  func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) {
   221  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   222  		return del.Float64UpDownCounter(name, options...)
   223  	}
   224  	m.mtx.Lock()
   225  	defer m.mtx.Unlock()
   226  	i := &sfUpDownCounter{name: name, opts: options}
   227  	m.instruments = append(m.instruments, i)
   228  	return i, nil
   229  }
   230  
   231  func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
   232  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   233  		return del.Float64Histogram(name, options...)
   234  	}
   235  	m.mtx.Lock()
   236  	defer m.mtx.Unlock()
   237  	i := &sfHistogram{name: name, opts: options}
   238  	m.instruments = append(m.instruments, i)
   239  	return i, nil
   240  }
   241  
   242  func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
   243  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   244  		return del.Float64ObservableCounter(name, options...)
   245  	}
   246  	m.mtx.Lock()
   247  	defer m.mtx.Unlock()
   248  	i := &afCounter{name: name, opts: options}
   249  	m.instruments = append(m.instruments, i)
   250  	return i, nil
   251  }
   252  
   253  func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
   254  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   255  		return del.Float64ObservableUpDownCounter(name, options...)
   256  	}
   257  	m.mtx.Lock()
   258  	defer m.mtx.Unlock()
   259  	i := &afUpDownCounter{name: name, opts: options}
   260  	m.instruments = append(m.instruments, i)
   261  	return i, nil
   262  }
   263  
   264  func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
   265  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   266  		return del.Float64ObservableGauge(name, options...)
   267  	}
   268  	m.mtx.Lock()
   269  	defer m.mtx.Unlock()
   270  	i := &afGauge{name: name, opts: options}
   271  	m.instruments = append(m.instruments, i)
   272  	return i, nil
   273  }
   274  
   275  // RegisterCallback captures the function that will be called during Collect.
   276  func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) {
   277  	if del, ok := m.delegate.Load().(metric.Meter); ok {
   278  		insts = unwrapInstruments(insts)
   279  		return del.RegisterCallback(f, insts...)
   280  	}
   281  
   282  	m.mtx.Lock()
   283  	defer m.mtx.Unlock()
   284  
   285  	reg := &registration{instruments: insts, function: f}
   286  	e := m.registry.PushBack(reg)
   287  	reg.unreg = func() error {
   288  		m.mtx.Lock()
   289  		_ = m.registry.Remove(e)
   290  		m.mtx.Unlock()
   291  		return nil
   292  	}
   293  	return reg, nil
   294  }
   295  
   296  type wrapped interface {
   297  	unwrap() metric.Observable
   298  }
   299  
   300  func unwrapInstruments(instruments []metric.Observable) []metric.Observable {
   301  	out := make([]metric.Observable, 0, len(instruments))
   302  
   303  	for _, inst := range instruments {
   304  		if in, ok := inst.(wrapped); ok {
   305  			out = append(out, in.unwrap())
   306  		} else {
   307  			out = append(out, inst)
   308  		}
   309  	}
   310  
   311  	return out
   312  }
   313  
   314  type registration struct {
   315  	embedded.Registration
   316  
   317  	instruments []metric.Observable
   318  	function    metric.Callback
   319  
   320  	unreg   func() error
   321  	unregMu sync.Mutex
   322  }
   323  
   324  func (c *registration) setDelegate(m metric.Meter) {
   325  	insts := unwrapInstruments(c.instruments)
   326  
   327  	c.unregMu.Lock()
   328  	defer c.unregMu.Unlock()
   329  
   330  	if c.unreg == nil {
   331  		// Unregister already called.
   332  		return
   333  	}
   334  
   335  	reg, err := m.RegisterCallback(c.function, insts...)
   336  	if err != nil {
   337  		GetErrorHandler().Handle(err)
   338  	}
   339  
   340  	c.unreg = reg.Unregister
   341  }
   342  
   343  func (c *registration) Unregister() error {
   344  	c.unregMu.Lock()
   345  	defer c.unregMu.Unlock()
   346  	if c.unreg == nil {
   347  		// Unregister already called.
   348  		return nil
   349  	}
   350  
   351  	var err error
   352  	err, c.unreg = c.unreg(), nil
   353  	return err
   354  }
   355  

View as plain text