1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package loadtest
20
21 import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "log"
27 "runtime"
28 "strconv"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "cloud.google.com/go/pubsub"
34 pb "cloud.google.com/go/pubsub/loadtest/pb"
35 "golang.org/x/time/rate"
36 )
37
38 type pubServerConfig struct {
39 topic *pubsub.Topic
40 msgData []byte
41 batchSize int32
42 ordered bool
43 }
44
45
46 type PubServer struct {
47 ID string
48
49 cfg atomic.Value
50 seqNum int32
51 pb.UnimplementedLoadtestWorkerServer
52 }
53
54
55 func (l *PubServer) Start(ctx context.Context, req *pb.StartRequest) (*pb.StartResponse, error) {
56 log.Println("received start")
57 c, err := pubsub.NewClient(ctx, req.Project)
58 if err != nil {
59 return nil, err
60 }
61 l.init(c, req.Topic, req.GetPublisherOptions().GetMessageSize(), req.GetPublisherOptions().GetBatchSize(), req.GetPublisherOptions().GetBatchDuration().AsDuration(), false)
62 log.Println("started")
63 return &pb.StartResponse{}, nil
64 }
65
66 func (l *PubServer) init(c *pubsub.Client, topicName string, msgSize, batchSize int32, batchDur time.Duration, ordered bool) {
67 topic := c.Topic(topicName)
68 topic.PublishSettings = pubsub.PublishSettings{
69 DelayThreshold: batchDur,
70 CountThreshold: 950,
71 ByteThreshold: 9500000,
72 BufferedByteLimit: 2e9,
73 }
74 topic.EnableMessageOrdering = ordered
75
76 l.cfg.Store(pubServerConfig{
77 topic: topic,
78 msgData: bytes.Repeat([]byte{'A'}, int(msgSize)),
79 batchSize: batchSize,
80 ordered: ordered,
81 })
82 }
83
84 func (l *PubServer) publishBatch() ([]int64, error) {
85 var cfg pubServerConfig
86 if c, ok := l.cfg.Load().(pubServerConfig); ok {
87 cfg = c
88 } else {
89 return nil, errors.New("config not loaded")
90 }
91
92 start := time.Now()
93 latencies := make([]int64, cfg.batchSize)
94 startStr := strconv.FormatInt(start.UnixNano()/1e6, 10)
95 seqNum := atomic.AddInt32(&l.seqNum, cfg.batchSize) - cfg.batchSize
96
97 rs := make([]*pubsub.PublishResult, cfg.batchSize)
98 for i := int32(0); i < cfg.batchSize; i++ {
99 msg := &pubsub.Message{
100 Data: cfg.msgData,
101 Attributes: map[string]string{
102 "sendTime": startStr,
103 "clientId": l.ID,
104 "sequenceNumber": strconv.Itoa(int(seqNum + i)),
105 },
106 }
107 if cfg.ordered {
108 msg.OrderingKey = fmt.Sprintf("key-%d", seqNum+i)
109 }
110 rs[i] = cfg.topic.Publish(context.TODO(), msg)
111 }
112 for i, r := range rs {
113 _, err := r.Get(context.Background())
114 if err != nil {
115 return nil, err
116 }
117
118
119
120
121
122
123
124
125 latencies[i] = time.Since(start).Nanoseconds() / 1e6
126 }
127 return latencies, nil
128 }
129
130
131 type SubServer struct {
132
133 lim *rate.Limiter
134
135 mu sync.Mutex
136 idents []*pb.MessageIdentifier
137 latencies []int64
138 pb.UnimplementedLoadtestWorkerServer
139 }
140
141
142 func (s *SubServer) Start(ctx context.Context, req *pb.StartRequest) (*pb.StartResponse, error) {
143 log.Println("received start")
144 s.lim = rate.NewLimiter(rate.Every(time.Second), 1)
145
146 c, err := pubsub.NewClient(ctx, req.Project)
147 if err != nil {
148 return nil, err
149 }
150
151
152 go func() {
153 sub := c.Subscription(req.GetPubsubOptions().Subscription)
154 sub.ReceiveSettings.NumGoroutines = 10 * runtime.GOMAXPROCS(0)
155 err := sub.Receive(context.Background(), s.callback)
156 log.Fatal(err)
157 }()
158
159 log.Println("started")
160 return &pb.StartResponse{}, nil
161 }
162
163 func (s *SubServer) callback(_ context.Context, m *pubsub.Message) {
164 id, err := strconv.ParseInt(m.Attributes["clientId"], 10, 64)
165 if err != nil {
166 log.Println(err)
167 m.Nack()
168 return
169 }
170
171 seqNum, err := strconv.ParseInt(m.Attributes["sequenceNumber"], 10, 32)
172 if err != nil {
173 log.Println(err)
174 m.Nack()
175 return
176 }
177
178 sendTimeMillis, err := strconv.ParseInt(m.Attributes["sendTime"], 10, 64)
179 if err != nil {
180 log.Println(err)
181 m.Nack()
182 return
183 }
184
185 latency := time.Now().UnixNano()/1e6 - sendTimeMillis
186 ident := &pb.MessageIdentifier{
187 PublisherClientId: id,
188 SequenceNumber: int32(seqNum),
189 }
190
191 s.mu.Lock()
192 s.idents = append(s.idents, ident)
193 s.latencies = append(s.latencies, latency)
194 s.mu.Unlock()
195 m.Ack()
196 }
197
View as plain text