...

Source file src/google.golang.org/api/transport/grpc/pool.go

Documentation: google.golang.org/api/transport/grpc

     1  // Copyright 2020 Google LLC.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package grpc
     6  
     7  import (
     8  	"context"
     9  	"fmt"
    10  	"sync/atomic"
    11  
    12  	"cloud.google.com/go/auth/grpctransport"
    13  	"google.golang.org/api/internal"
    14  	"google.golang.org/grpc"
    15  )
    16  
    17  // ConnPool is a pool of grpc.ClientConns.
    18  type ConnPool = internal.ConnPool // NOTE(cbro): type alias to export the type. It must live in internal to avoid a circular dependency.
    19  
    20  var _ ConnPool = &roundRobinConnPool{}
    21  var _ ConnPool = &singleConnPool{}
    22  
    23  // singleConnPool is a special case for a single connection.
    24  type singleConnPool struct {
    25  	*grpc.ClientConn
    26  }
    27  
    28  func (p *singleConnPool) Conn() *grpc.ClientConn { return p.ClientConn }
    29  func (p *singleConnPool) Num() int               { return 1 }
    30  
    31  type roundRobinConnPool struct {
    32  	conns []*grpc.ClientConn
    33  
    34  	idx uint32 // access via sync/atomic
    35  }
    36  
    37  func (p *roundRobinConnPool) Num() int {
    38  	return len(p.conns)
    39  }
    40  
    41  func (p *roundRobinConnPool) Conn() *grpc.ClientConn {
    42  	i := atomic.AddUint32(&p.idx, 1)
    43  	return p.conns[i%uint32(len(p.conns))]
    44  }
    45  
    46  func (p *roundRobinConnPool) Close() error {
    47  	var errs multiError
    48  	for _, conn := range p.conns {
    49  		if err := conn.Close(); err != nil {
    50  			errs = append(errs, err)
    51  		}
    52  	}
    53  	if len(errs) == 0 {
    54  		return nil
    55  	}
    56  	return errs
    57  }
    58  
    59  func (p *roundRobinConnPool) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
    60  	return p.Conn().Invoke(ctx, method, args, reply, opts...)
    61  }
    62  
    63  func (p *roundRobinConnPool) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    64  	return p.Conn().NewStream(ctx, desc, method, opts...)
    65  }
    66  
    67  // multiError represents errors from multiple conns in the group.
    68  //
    69  // TODO: figure out how and whether this is useful to export. End users should
    70  // not be depending on the transport/grpc package directly, so there might need
    71  // to be some service-specific multi-error type.
    72  type multiError []error
    73  
    74  func (m multiError) Error() string {
    75  	s, n := "", 0
    76  	for _, e := range m {
    77  		if e != nil {
    78  			if n == 0 {
    79  				s = e.Error()
    80  			}
    81  			n++
    82  		}
    83  	}
    84  	switch n {
    85  	case 0:
    86  		return "(0 errors)"
    87  	case 1:
    88  		return s
    89  	case 2:
    90  		return s + " (and 1 other error)"
    91  	}
    92  	return fmt.Sprintf("%s (and %d other errors)", s, n-1)
    93  }
    94  
    95  type poolAdapter struct {
    96  	pool grpctransport.GRPCClientConnPool
    97  }
    98  
    99  func (p *poolAdapter) Conn() *grpc.ClientConn {
   100  	return p.pool.Connection()
   101  }
   102  
   103  func (p *poolAdapter) Num() int {
   104  	return p.pool.Len()
   105  }
   106  
   107  func (p *poolAdapter) Close() error {
   108  	return p.pool.Close()
   109  }
   110  
   111  func (p *poolAdapter) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
   112  	return p.pool.Invoke(ctx, method, args, reply, opts...)
   113  }
   114  
   115  func (p *poolAdapter) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
   116  	return p.pool.NewStream(ctx, desc, method, opts...)
   117  }
   118  

View as plain text