...
1 package util
2
3 import (
4 "errors"
5 "io"
6 "sync"
7
8 destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination"
9 "github.com/linkerd/linkerd2-proxy-api/go/net"
10 "golang.org/x/net/context"
11 "google.golang.org/grpc"
12 "google.golang.org/grpc/metadata"
13 )
14
15 type mockStream struct {
16 ctx context.Context
17 Cancel context.CancelFunc
18 }
19
20 func newMockStream() mockStream {
21 ctx, cancel := context.WithCancel(context.Background())
22 return mockStream{ctx, cancel}
23 }
24
25 func (ms mockStream) Context() context.Context { return ms.ctx }
26 func (ms mockStream) SendMsg(m interface{}) error { return nil }
27 func (ms mockStream) RecvMsg(m interface{}) error { return nil }
28
29
30 type MockServerStream struct{ mockStream }
31
32
33 func (mss MockServerStream) SetHeader(metadata.MD) error { return nil }
34
35
36 func (mss MockServerStream) SendHeader(metadata.MD) error { return nil }
37
38
39 func (mss MockServerStream) SetTrailer(metadata.MD) {}
40
41
42 func NewMockServerStream() MockServerStream {
43 return MockServerStream{newMockStream()}
44 }
45
46
47 type MockAPIClient struct {
48 ErrorToReturn error
49 DestinationGetClientToReturn destinationPb.Destination_GetClient
50 }
51
52
53 func (c *MockAPIClient) Get(ctx context.Context, in *destinationPb.GetDestination, opts ...grpc.CallOption) (destinationPb.Destination_GetClient, error) {
54 return c.DestinationGetClientToReturn, c.ErrorToReturn
55 }
56
57
58 func (c *MockAPIClient) GetProfile(ctx context.Context, _ *destinationPb.GetDestination, _ ...grpc.CallOption) (destinationPb.Destination_GetProfileClient, error) {
59
60 return nil, errors.New("not implemented")
61 }
62
63
64 type MockDestinationGetClient struct {
65 UpdatesToReturn []destinationPb.Update
66 ErrorsToReturn []error
67 grpc.ClientStream
68 sync.Mutex
69 }
70
71
72 func (a *MockDestinationGetClient) Recv() (*destinationPb.Update, error) {
73 a.Lock()
74 defer a.Unlock()
75 var updatePopped *destinationPb.Update
76 var errorPopped error
77 if len(a.UpdatesToReturn) == 0 && len(a.ErrorsToReturn) == 0 {
78 return nil, io.EOF
79 }
80 if len(a.UpdatesToReturn) != 0 {
81 updatePopped, a.UpdatesToReturn = &a.UpdatesToReturn[0], a.UpdatesToReturn[1:]
82 }
83 if len(a.ErrorsToReturn) != 0 {
84 errorPopped, a.ErrorsToReturn = a.ErrorsToReturn[0], a.ErrorsToReturn[1:]
85 }
86
87 return updatePopped, errorPopped
88 }
89
90
91 type AuthorityEndpoints struct {
92 Namespace string
93 ServiceID string
94 Pods []PodDetails
95 }
96
97
98 type PodDetails struct {
99 Name string
100 IP uint32
101 Port uint32
102 }
103
104
105 func BuildAddrSet(endpoint AuthorityEndpoints) *destinationPb.WeightedAddrSet {
106 addrs := make([]*destinationPb.WeightedAddr, 0)
107 for _, pod := range endpoint.Pods {
108 addr := &net.TcpAddress{
109 Ip: &net.IPAddress{Ip: &net.IPAddress_Ipv4{Ipv4: pod.IP}},
110 Port: pod.Port,
111 }
112 labels := map[string]string{"pod": pod.Name}
113 weightedAddr := &destinationPb.WeightedAddr{Addr: addr, MetricLabels: labels}
114 addrs = append(addrs, weightedAddr)
115 }
116 labels := map[string]string{"namespace": endpoint.Namespace, "service": endpoint.ServiceID}
117 return &destinationPb.WeightedAddrSet{Addrs: addrs, MetricLabels: labels}
118 }
119
View as plain text