1
17
18
19 package e2e
20
21 import (
22 "context"
23 "fmt"
24 "io"
25 "os"
26 "os/exec"
27
28 "google.golang.org/grpc"
29 channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1"
30 channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
31 "google.golang.org/grpc/credentials/insecure"
32 testgrpc "google.golang.org/grpc/interop/grpc_testing"
33 testpb "google.golang.org/grpc/interop/grpc_testing"
34 )
35
36 func cmd(path string, logger io.Writer, args []string, env []string) *exec.Cmd {
37 cmd := exec.Command(path, args...)
38 cmd.Env = append(os.Environ(), env...)
39 cmd.Stdout = logger
40 cmd.Stderr = logger
41 return cmd
42 }
43
44 const (
45 clientStatsPort = 60363
46 )
47
48 type client struct {
49 cmd *exec.Cmd
50
51 target string
52 statsCC *grpc.ClientConn
53 }
54
55
56 func newClient(target, binaryPath, bootstrap string, logger io.Writer, flags ...string) (*client, error) {
57 cmd := cmd(
58 binaryPath,
59 logger,
60 append([]string{
61 "--server=" + target,
62 "--print_response=true",
63 "--qps=100",
64 fmt.Sprintf("--stats_port=%d", clientStatsPort),
65 }, flags...),
66 []string{
67 "GRPC_GO_LOG_VERBOSITY_LEVEL=99",
68 "GRPC_GO_LOG_SEVERITY_LEVEL=info",
69 "GRPC_XDS_BOOTSTRAP_CONFIG=" + bootstrap,
70 },
71 )
72 cmd.Start()
73
74 cc, err := grpc.Dial(fmt.Sprintf("localhost:%d", clientStatsPort), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.WaitForReady(true)))
75 if err != nil {
76 return nil, err
77 }
78 return &client{
79 cmd: cmd,
80 target: target,
81 statsCC: cc,
82 }, nil
83 }
84
85 func (c *client) clientStats(ctx context.Context) (*testpb.LoadBalancerStatsResponse, error) {
86 ccc := testgrpc.NewLoadBalancerStatsServiceClient(c.statsCC)
87 return ccc.GetClientStats(ctx, &testpb.LoadBalancerStatsRequest{
88 NumRpcs: 100,
89 TimeoutSec: 10,
90 })
91 }
92
93 func (c *client) configRPCs(ctx context.Context, req *testpb.ClientConfigureRequest) error {
94 ccc := testgrpc.NewXdsUpdateClientConfigureServiceClient(c.statsCC)
95 _, err := ccc.Configure(ctx, req)
96 return err
97 }
98
99 func (c *client) channelzSubChannels(ctx context.Context) ([]*channelzpb.Subchannel, error) {
100 ccc := channelzgrpc.NewChannelzClient(c.statsCC)
101 r, err := ccc.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{})
102 if err != nil {
103 return nil, err
104 }
105
106 var ret []*channelzpb.Subchannel
107 for _, cc := range r.Channel {
108 if cc.Data.Target != c.target {
109 continue
110 }
111 for _, sc := range cc.SubchannelRef {
112 rr, err := ccc.GetSubchannel(ctx, &channelzpb.GetSubchannelRequest{SubchannelId: sc.SubchannelId})
113 if err != nil {
114 return nil, err
115 }
116 ret = append(ret, rr.Subchannel)
117 }
118 }
119 return ret, nil
120 }
121
122 func (c *client) stop() {
123 c.cmd.Process.Kill()
124 c.cmd.Wait()
125 }
126
127 const (
128 serverPort = 50051
129 )
130
131 type server struct {
132 cmd *exec.Cmd
133 port int
134 }
135
136
137
138
139
140 func newServers(hostnamePrefix, binaryPath, bootstrap string, logger io.Writer, count int) (_ []*server, err error) {
141 var ret []*server
142 defer func() {
143 if err != nil {
144 for _, s := range ret {
145 s.stop()
146 }
147 }
148 }()
149 for i := 0; i < count; i++ {
150 port := serverPort + i
151 cmd := cmd(
152 binaryPath,
153 logger,
154 []string{
155 fmt.Sprintf("--port=%d", port),
156 fmt.Sprintf("--host_name_override=%s-%d", hostnamePrefix, i),
157 },
158 []string{
159 "GRPC_GO_LOG_VERBOSITY_LEVEL=99",
160 "GRPC_GO_LOG_SEVERITY_LEVEL=info",
161 "GRPC_XDS_BOOTSTRAP_CONFIG=" + bootstrap,
162 },
163 )
164 cmd.Start()
165 ret = append(ret, &server{cmd: cmd, port: port})
166 }
167 return ret, nil
168 }
169
170 func (s *server) stop() {
171 s.cmd.Process.Kill()
172 s.cmd.Wait()
173 }
174
View as plain text