...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/transport_bench_test.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"context"
    19  	"net/http/httptest"
    20  	"sync"
    21  	"testing"
    22  	"time"
    23  
    24  	"go.etcd.io/etcd/client/pkg/v3/types"
    25  	"go.etcd.io/etcd/raft/v3"
    26  	"go.etcd.io/etcd/raft/v3/raftpb"
    27  	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
    28  
    29  	"go.uber.org/zap"
    30  )
    31  
    32  func BenchmarkSendingMsgApp(b *testing.B) {
    33  	// member 1
    34  	tr := &Transport{
    35  		ID:          types.ID(1),
    36  		ClusterID:   types.ID(1),
    37  		Raft:        &fakeRaft{},
    38  		ServerStats: newServerStats(),
    39  		LeaderStats: stats.NewLeaderStats(zap.NewExample(), "1"),
    40  	}
    41  	tr.Start()
    42  	srv := httptest.NewServer(tr.Handler())
    43  	defer srv.Close()
    44  
    45  	// member 2
    46  	r := &countRaft{}
    47  	tr2 := &Transport{
    48  		ID:          types.ID(2),
    49  		ClusterID:   types.ID(1),
    50  		Raft:        r,
    51  		ServerStats: newServerStats(),
    52  		LeaderStats: stats.NewLeaderStats(zap.NewExample(), "2"),
    53  	}
    54  	tr2.Start()
    55  	srv2 := httptest.NewServer(tr2.Handler())
    56  	defer srv2.Close()
    57  
    58  	tr.AddPeer(types.ID(2), []string{srv2.URL})
    59  	defer tr.Stop()
    60  	tr2.AddPeer(types.ID(1), []string{srv.URL})
    61  	defer tr2.Stop()
    62  	if !waitStreamWorking(tr.Get(types.ID(2)).(*peer)) {
    63  		b.Fatalf("stream from 1 to 2 is not in work as expected")
    64  	}
    65  
    66  	b.ReportAllocs()
    67  	b.SetBytes(64)
    68  
    69  	b.ResetTimer()
    70  	data := make([]byte, 64)
    71  	for i := 0; i < b.N; i++ {
    72  		tr.Send([]raftpb.Message{
    73  			{
    74  				Type:  raftpb.MsgApp,
    75  				From:  1,
    76  				To:    2,
    77  				Index: uint64(i),
    78  				Entries: []raftpb.Entry{
    79  					{
    80  						Index: uint64(i + 1),
    81  						Data:  data,
    82  					},
    83  				},
    84  			},
    85  		})
    86  	}
    87  	// wait until all messages are received by the target raft
    88  	for r.count() != b.N {
    89  		time.Sleep(time.Millisecond)
    90  	}
    91  	b.StopTimer()
    92  }
    93  
    94  type countRaft struct {
    95  	mu  sync.Mutex
    96  	cnt int
    97  }
    98  
    99  func (r *countRaft) Process(ctx context.Context, m raftpb.Message) error {
   100  	r.mu.Lock()
   101  	defer r.mu.Unlock()
   102  	r.cnt++
   103  	return nil
   104  }
   105  
   106  func (r *countRaft) IsIDRemoved(id uint64) bool { return false }
   107  
   108  func (r *countRaft) ReportUnreachable(id uint64) {}
   109  
   110  func (r *countRaft) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
   111  
   112  func (r *countRaft) count() int {
   113  	r.mu.Lock()
   114  	defer r.mu.Unlock()
   115  	return r.cnt
   116  }
   117  

View as plain text