...

Source file src/cloud.google.com/go/pubsub/loadtest/loadtest.go

Documentation: cloud.google.com/go/pubsub/loadtest

     1  // Copyright 2017 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Package loadtest implements load testing for pubsub,
    16  // following the interface defined in https://github.com/GoogleCloudPlatform/pubsub/tree/master/load-test-framework/ .
    17  //
    18  // This package is experimental.
    19  package loadtest
    20  
    21  import (
    22  	"bytes"
    23  	"context"
    24  	"errors"
    25  	"fmt"
    26  	"log"
    27  	"runtime"
    28  	"strconv"
    29  	"sync"
    30  	"sync/atomic"
    31  	"time"
    32  
    33  	"cloud.google.com/go/pubsub"
    34  	pb "cloud.google.com/go/pubsub/loadtest/pb"
    35  	"golang.org/x/time/rate"
    36  )
    37  
    38  type pubServerConfig struct {
    39  	topic     *pubsub.Topic
    40  	msgData   []byte
    41  	batchSize int32
    42  	ordered   bool
    43  }
    44  
    45  // PubServer is a dummy Pub/Sub server for load testing.
    46  type PubServer struct {
    47  	ID string
    48  
    49  	cfg    atomic.Value
    50  	seqNum int32
    51  	pb.UnimplementedLoadtestWorkerServer
    52  }
    53  
    54  // Start starts the server.
    55  func (l *PubServer) Start(ctx context.Context, req *pb.StartRequest) (*pb.StartResponse, error) {
    56  	log.Println("received start")
    57  	c, err := pubsub.NewClient(ctx, req.Project)
    58  	if err != nil {
    59  		return nil, err
    60  	}
    61  	l.init(c, req.Topic, req.GetPublisherOptions().GetMessageSize(), req.GetPublisherOptions().GetBatchSize(), req.GetPublisherOptions().GetBatchDuration().AsDuration(), false)
    62  	log.Println("started")
    63  	return &pb.StartResponse{}, nil
    64  }
    65  
    66  func (l *PubServer) init(c *pubsub.Client, topicName string, msgSize, batchSize int32, batchDur time.Duration, ordered bool) {
    67  	topic := c.Topic(topicName)
    68  	topic.PublishSettings = pubsub.PublishSettings{
    69  		DelayThreshold:    batchDur,
    70  		CountThreshold:    950,
    71  		ByteThreshold:     9500000,
    72  		BufferedByteLimit: 2e9,
    73  	}
    74  	topic.EnableMessageOrdering = ordered
    75  
    76  	l.cfg.Store(pubServerConfig{
    77  		topic:     topic,
    78  		msgData:   bytes.Repeat([]byte{'A'}, int(msgSize)),
    79  		batchSize: batchSize,
    80  		ordered:   ordered,
    81  	})
    82  }
    83  
    84  func (l *PubServer) publishBatch() ([]int64, error) {
    85  	var cfg pubServerConfig
    86  	if c, ok := l.cfg.Load().(pubServerConfig); ok {
    87  		cfg = c
    88  	} else {
    89  		return nil, errors.New("config not loaded")
    90  	}
    91  
    92  	start := time.Now()
    93  	latencies := make([]int64, cfg.batchSize)
    94  	startStr := strconv.FormatInt(start.UnixNano()/1e6, 10)
    95  	seqNum := atomic.AddInt32(&l.seqNum, cfg.batchSize) - cfg.batchSize
    96  
    97  	rs := make([]*pubsub.PublishResult, cfg.batchSize)
    98  	for i := int32(0); i < cfg.batchSize; i++ {
    99  		msg := &pubsub.Message{
   100  			Data: cfg.msgData,
   101  			Attributes: map[string]string{
   102  				"sendTime":       startStr,
   103  				"clientId":       l.ID,
   104  				"sequenceNumber": strconv.Itoa(int(seqNum + i)),
   105  			},
   106  		}
   107  		if cfg.ordered {
   108  			msg.OrderingKey = fmt.Sprintf("key-%d", seqNum+i)
   109  		}
   110  		rs[i] = cfg.topic.Publish(context.TODO(), msg)
   111  	}
   112  	for i, r := range rs {
   113  		_, err := r.Get(context.Background())
   114  		if err != nil {
   115  			return nil, err
   116  		}
   117  		// TODO(jba,pongad): fix latencies
   118  		// Later values will be skewed by earlier ones, since we wait for the
   119  		// results in order. (On the other hand, it may not matter much, since
   120  		// messages are added to bundles in order and bundles get sent more or
   121  		// less in order.) If we want more accurate values, we can either start
   122  		// a goroutine for each result (similar to the original code using a
   123  		// callback), or call reflect.Select with the Ready channels of the
   124  		// results.
   125  		latencies[i] = time.Since(start).Nanoseconds() / 1e6
   126  	}
   127  	return latencies, nil
   128  }
   129  
   130  // SubServer is a dummy Pub/Sub server for load testing.
   131  type SubServer struct {
   132  	// TODO(deklerk): what is this actually for?
   133  	lim *rate.Limiter
   134  
   135  	mu        sync.Mutex
   136  	idents    []*pb.MessageIdentifier
   137  	latencies []int64
   138  	pb.UnimplementedLoadtestWorkerServer
   139  }
   140  
   141  // Start starts the server.
   142  func (s *SubServer) Start(ctx context.Context, req *pb.StartRequest) (*pb.StartResponse, error) {
   143  	log.Println("received start")
   144  	s.lim = rate.NewLimiter(rate.Every(time.Second), 1)
   145  
   146  	c, err := pubsub.NewClient(ctx, req.Project)
   147  	if err != nil {
   148  		return nil, err
   149  	}
   150  
   151  	// Load test API doesn't define any way to stop right now.
   152  	go func() {
   153  		sub := c.Subscription(req.GetPubsubOptions().Subscription)
   154  		sub.ReceiveSettings.NumGoroutines = 10 * runtime.GOMAXPROCS(0)
   155  		err := sub.Receive(context.Background(), s.callback)
   156  		log.Fatal(err)
   157  	}()
   158  
   159  	log.Println("started")
   160  	return &pb.StartResponse{}, nil
   161  }
   162  
   163  func (s *SubServer) callback(_ context.Context, m *pubsub.Message) {
   164  	id, err := strconv.ParseInt(m.Attributes["clientId"], 10, 64)
   165  	if err != nil {
   166  		log.Println(err)
   167  		m.Nack()
   168  		return
   169  	}
   170  
   171  	seqNum, err := strconv.ParseInt(m.Attributes["sequenceNumber"], 10, 32)
   172  	if err != nil {
   173  		log.Println(err)
   174  		m.Nack()
   175  		return
   176  	}
   177  
   178  	sendTimeMillis, err := strconv.ParseInt(m.Attributes["sendTime"], 10, 64)
   179  	if err != nil {
   180  		log.Println(err)
   181  		m.Nack()
   182  		return
   183  	}
   184  
   185  	latency := time.Now().UnixNano()/1e6 - sendTimeMillis
   186  	ident := &pb.MessageIdentifier{
   187  		PublisherClientId: id,
   188  		SequenceNumber:    int32(seqNum),
   189  	}
   190  
   191  	s.mu.Lock()
   192  	s.idents = append(s.idents, ident)
   193  	s.latencies = append(s.latencies, latency)
   194  	s.mu.Unlock()
   195  	m.Ack()
   196  }
   197  

View as plain text