...

Source file src/github.com/prometheus/alertmanager/cluster/connection_pool.go

Documentation: github.com/prometheus/alertmanager/cluster

     1  // Copyright 2020 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package cluster
    15  
    16  import (
    17  	"crypto/tls"
    18  	"fmt"
    19  	"sync"
    20  	"time"
    21  
    22  	lru "github.com/hashicorp/golang-lru"
    23  	"github.com/pkg/errors"
    24  )
    25  
    26  const capacity = 1024
    27  
    28  type connectionPool struct {
    29  	mtx       sync.Mutex
    30  	cache     *lru.Cache
    31  	tlsConfig *tls.Config
    32  }
    33  
    34  func newConnectionPool(tlsClientCfg *tls.Config) (*connectionPool, error) {
    35  	cache, err := lru.NewWithEvict(
    36  		capacity, func(_, value interface{}) {
    37  			conn, ok := value.(*tlsConn)
    38  			if ok {
    39  				_ = conn.Close()
    40  			}
    41  		},
    42  	)
    43  	if err != nil {
    44  		return nil, errors.Wrap(err, "failed to create new LRU")
    45  	}
    46  	return &connectionPool{
    47  		cache:     cache,
    48  		tlsConfig: tlsClientCfg,
    49  	}, nil
    50  }
    51  
    52  // borrowConnection returns a *tlsConn from the pool. The connection does not
    53  // need to be returned to the pool because each connection has its own locking.
    54  func (pool *connectionPool) borrowConnection(addr string, timeout time.Duration) (*tlsConn, error) {
    55  	pool.mtx.Lock()
    56  	defer pool.mtx.Unlock()
    57  	if pool.cache == nil {
    58  		return nil, errors.New("connection pool closed")
    59  	}
    60  	key := fmt.Sprintf("%s/%d", addr, int64(timeout))
    61  	value, exists := pool.cache.Get(key)
    62  	if exists {
    63  		conn, ok := value.(*tlsConn)
    64  		if ok && conn.alive() {
    65  			return conn, nil
    66  		}
    67  	}
    68  	conn, err := dialTLSConn(addr, timeout, pool.tlsConfig)
    69  	if err != nil {
    70  		return nil, err
    71  	}
    72  	pool.cache.Add(key, conn)
    73  	return conn, nil
    74  }
    75  
    76  func (pool *connectionPool) shutdown() {
    77  	pool.mtx.Lock()
    78  	defer pool.mtx.Unlock()
    79  	if pool.cache == nil {
    80  		return
    81  	}
    82  	pool.cache.Purge()
    83  	pool.cache = nil
    84  }
    85  

View as plain text