...
1
2
3
4
5 package grpc
6
7 import (
8 "context"
9 "fmt"
10 "sync/atomic"
11
12 "cloud.google.com/go/auth/grpctransport"
13 "google.golang.org/api/internal"
14 "google.golang.org/grpc"
15 )
16
17
18 type ConnPool = internal.ConnPool
19
20 var _ ConnPool = &roundRobinConnPool{}
21 var _ ConnPool = &singleConnPool{}
22
23
24 type singleConnPool struct {
25 *grpc.ClientConn
26 }
27
28 func (p *singleConnPool) Conn() *grpc.ClientConn { return p.ClientConn }
29 func (p *singleConnPool) Num() int { return 1 }
30
31 type roundRobinConnPool struct {
32 conns []*grpc.ClientConn
33
34 idx uint32
35 }
36
37 func (p *roundRobinConnPool) Num() int {
38 return len(p.conns)
39 }
40
41 func (p *roundRobinConnPool) Conn() *grpc.ClientConn {
42 i := atomic.AddUint32(&p.idx, 1)
43 return p.conns[i%uint32(len(p.conns))]
44 }
45
46 func (p *roundRobinConnPool) Close() error {
47 var errs multiError
48 for _, conn := range p.conns {
49 if err := conn.Close(); err != nil {
50 errs = append(errs, err)
51 }
52 }
53 if len(errs) == 0 {
54 return nil
55 }
56 return errs
57 }
58
59 func (p *roundRobinConnPool) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
60 return p.Conn().Invoke(ctx, method, args, reply, opts...)
61 }
62
63 func (p *roundRobinConnPool) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
64 return p.Conn().NewStream(ctx, desc, method, opts...)
65 }
66
67
68
69
70
71
72 type multiError []error
73
74 func (m multiError) Error() string {
75 s, n := "", 0
76 for _, e := range m {
77 if e != nil {
78 if n == 0 {
79 s = e.Error()
80 }
81 n++
82 }
83 }
84 switch n {
85 case 0:
86 return "(0 errors)"
87 case 1:
88 return s
89 case 2:
90 return s + " (and 1 other error)"
91 }
92 return fmt.Sprintf("%s (and %d other errors)", s, n-1)
93 }
94
95 type poolAdapter struct {
96 pool grpctransport.GRPCClientConnPool
97 }
98
99 func (p *poolAdapter) Conn() *grpc.ClientConn {
100 return p.pool.Connection()
101 }
102
103 func (p *poolAdapter) Num() int {
104 return p.pool.Len()
105 }
106
107 func (p *poolAdapter) Close() error {
108 return p.pool.Close()
109 }
110
111 func (p *poolAdapter) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
112 return p.pool.Invoke(ctx, method, args, reply, opts...)
113 }
114
115 func (p *poolAdapter) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
116 return p.pool.NewStream(ctx, desc, method, opts...)
117 }
118
View as plain text