...
1 package destination
2
3 import (
4 "errors"
5 "net/http"
6 "testing"
7 "time"
8
9 pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
10 "github.com/linkerd/linkerd2/controller/api/destination/watcher"
11 consts "github.com/linkerd/linkerd2/pkg/k8s"
12 logging "github.com/sirupsen/logrus"
13 corev1 "k8s.io/api/core/v1"
14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15 )
16
17 func TestEndpointProfileTranslator(t *testing.T) {
18
19
20
21 addr := &watcher.Address{
22 IP: "10.10.11.11",
23 Port: 8080,
24 }
25 podAddr := &watcher.Address{
26 IP: "10.10.11.11",
27 Port: 8080,
28 Pod: &corev1.Pod{
29 ObjectMeta: metav1.ObjectMeta{
30 Annotations: map[string]string{
31 consts.ProxyOpaquePortsAnnotation: "8080",
32 },
33 },
34 },
35 }
36
37 t.Run("Sends update", func(t *testing.T) {
38 mockGetProfileServer := &mockDestinationGetProfileServer{
39 profilesReceived: make(chan *pb.DestinationProfile),
40 }
41 log := logging.WithField("test", t.Name())
42 translator := newEndpointProfileTranslator(
43 true, "cluster", "identity", make(map[uint32]struct{}), nil,
44 mockGetProfileServer,
45 nil,
46 log,
47 )
48 translator.Start()
49 defer translator.Stop()
50
51 if err := translator.Update(addr); err != nil {
52 t.Fatal("Expected update")
53 }
54 select {
55 case p := <-mockGetProfileServer.profilesReceived:
56 log.Debugf("Received update: %v", p)
57 case <-time.After(1 * time.Second):
58 t.Fatal("No update received")
59 }
60
61 if err := translator.Update(addr); err != nil {
62 t.Fatal("Unexpected update")
63 }
64 select {
65 case p := <-mockGetProfileServer.profilesReceived:
66 t.Fatalf("Duplicate update sent: %v", p)
67 case <-time.After(1 * time.Second):
68 }
69
70 if err := translator.Update(podAddr); err != nil {
71 t.Fatal("Expected update")
72 }
73 select {
74 case p := <-mockGetProfileServer.profilesReceived:
75 log.Debugf("Received update: %v", p)
76 case <-time.After(1 * time.Second):
77 }
78 })
79
80 t.Run("Handles overflow", func(t *testing.T) {
81 mockGetProfileServer := &mockDestinationGetProfileServer{
82 profilesReceived: make(chan *pb.DestinationProfile, 1),
83 }
84 log := logging.WithField("test", t.Name())
85 endStream := make(chan struct{})
86 translator := newEndpointProfileTranslator(
87 true, "cluster", "identity", make(map[uint32]struct{}), nil,
88 mockGetProfileServer,
89 endStream,
90 log,
91 )
92
93
94
95
96 for i := 0; i < updateQueueCapacity/2; i++ {
97 if err := translator.Update(podAddr); err != nil {
98 t.Fatal("Expected update")
99 }
100 select {
101 case <-endStream:
102 t.Fatal("Stream ended prematurely")
103 default:
104 }
105
106 if err := translator.Update(addr); err != nil {
107 t.Fatal("Expected update")
108 }
109 select {
110 case <-endStream:
111 t.Fatal("Stream ended prematurely")
112 default:
113 }
114 }
115
116
117 t.Logf("Queue length=%d capacity=%d", translator.queueLen(), updateQueueCapacity)
118 if err := translator.Update(podAddr); err == nil {
119 if !errors.Is(err, http.ErrServerClosed) {
120 t.Fatalf("Expected update to fail; queue=%d; capacity=%d", translator.queueLen(), updateQueueCapacity)
121 }
122 }
123
124 select {
125 case <-endStream:
126 default:
127 t.Fatal("Stream should have ended")
128 }
129
130
131
132 })
133 }
134
View as plain text