...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "context"
19 "net/http/httptest"
20 "sync"
21 "testing"
22 "time"
23
24 "go.etcd.io/etcd/client/pkg/v3/types"
25 "go.etcd.io/etcd/raft/v3"
26 "go.etcd.io/etcd/raft/v3/raftpb"
27 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
28
29 "go.uber.org/zap"
30 )
31
32 func BenchmarkSendingMsgApp(b *testing.B) {
33
34 tr := &Transport{
35 ID: types.ID(1),
36 ClusterID: types.ID(1),
37 Raft: &fakeRaft{},
38 ServerStats: newServerStats(),
39 LeaderStats: stats.NewLeaderStats(zap.NewExample(), "1"),
40 }
41 tr.Start()
42 srv := httptest.NewServer(tr.Handler())
43 defer srv.Close()
44
45
46 r := &countRaft{}
47 tr2 := &Transport{
48 ID: types.ID(2),
49 ClusterID: types.ID(1),
50 Raft: r,
51 ServerStats: newServerStats(),
52 LeaderStats: stats.NewLeaderStats(zap.NewExample(), "2"),
53 }
54 tr2.Start()
55 srv2 := httptest.NewServer(tr2.Handler())
56 defer srv2.Close()
57
58 tr.AddPeer(types.ID(2), []string{srv2.URL})
59 defer tr.Stop()
60 tr2.AddPeer(types.ID(1), []string{srv.URL})
61 defer tr2.Stop()
62 if !waitStreamWorking(tr.Get(types.ID(2)).(*peer)) {
63 b.Fatalf("stream from 1 to 2 is not in work as expected")
64 }
65
66 b.ReportAllocs()
67 b.SetBytes(64)
68
69 b.ResetTimer()
70 data := make([]byte, 64)
71 for i := 0; i < b.N; i++ {
72 tr.Send([]raftpb.Message{
73 {
74 Type: raftpb.MsgApp,
75 From: 1,
76 To: 2,
77 Index: uint64(i),
78 Entries: []raftpb.Entry{
79 {
80 Index: uint64(i + 1),
81 Data: data,
82 },
83 },
84 },
85 })
86 }
87
88 for r.count() != b.N {
89 time.Sleep(time.Millisecond)
90 }
91 b.StopTimer()
92 }
93
94 type countRaft struct {
95 mu sync.Mutex
96 cnt int
97 }
98
99 func (r *countRaft) Process(ctx context.Context, m raftpb.Message) error {
100 r.mu.Lock()
101 defer r.mu.Unlock()
102 r.cnt++
103 return nil
104 }
105
106 func (r *countRaft) IsIDRemoved(id uint64) bool { return false }
107
108 func (r *countRaft) ReportUnreachable(id uint64) {}
109
110 func (r *countRaft) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
111
112 func (r *countRaft) count() int {
113 r.mu.Lock()
114 defer r.mu.Unlock()
115 return r.cnt
116 }
117
View as plain text