...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package main
18
19 import (
20 "context"
21 "flag"
22 "fmt"
23 "log"
24 "net"
25 "os"
26 "strings"
27
28 "cloud.google.com/go/pubsub"
29 pb "cloud.google.com/go/pubsub/internal/benchwrapper/proto"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/status"
32 )
33
34 var port = flag.String("port", "", "specify a port to run on")
35
36 func main() {
37 flag.Parse()
38 if *port == "" {
39 log.Fatalf("usage: %s --port=8081", os.Args[0])
40 }
41
42 if os.Getenv("PUBSUB_EMULATOR_HOST") == "" {
43 log.Fatal("This benchmarking server only works when connected to an emulator. Please set PUBSUB_EMULATOR_HOST.")
44 }
45
46 ctx := context.Background()
47 c, err := pubsub.NewClient(ctx, "someproject")
48 if err != nil {
49 log.Fatal(err)
50 }
51
52 lis, err := net.Listen("tcp", fmt.Sprintf(":%s", *port))
53 if err != nil {
54 log.Fatal(err)
55 }
56 s := grpc.NewServer()
57 pb.RegisterPubsubBenchWrapperServer(s, &server{
58 c: c,
59 })
60 log.Printf("Running on localhost:%s\n", *port)
61 log.Fatal(s.Serve(lis))
62 }
63
64 type server struct {
65 c *pubsub.Client
66 pb.PubsubBenchWrapperServer
67 }
68
69 func (s *server) Recv(ctx context.Context, req *pb.PubsubRecv) (*pb.EmptyResponse, error) {
70 sub := s.c.Subscription(req.SubName)
71 sub.ReceiveSettings.NumGoroutines = 1
72 err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
73 msg.Ack()
74 })
75
76 if err != nil {
77 s, _ := status.FromError(err)
78
79 if strings.Contains(s.Message(), "EOF") {
80 return &pb.EmptyResponse{}, nil
81 }
82 return nil, err
83 }
84 return &pb.EmptyResponse{}, nil
85 }
86
View as plain text