...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/endpoint_profile_translator_test.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination

     1  package destination
     2  
     3  import (
     4  	"errors"
     5  	"net/http"
     6  	"testing"
     7  	"time"
     8  
     9  	pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
    10  	"github.com/linkerd/linkerd2/controller/api/destination/watcher"
    11  	consts "github.com/linkerd/linkerd2/pkg/k8s"
    12  	logging "github.com/sirupsen/logrus"
    13  	corev1 "k8s.io/api/core/v1"
    14  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    15  )
    16  
    17  func TestEndpointProfileTranslator(t *testing.T) {
    18  	// logging.SetLevel(logging.TraceLevel)
    19  	// defer logging.SetLevel(logging.PanicLevel)
    20  
    21  	addr := &watcher.Address{
    22  		IP:   "10.10.11.11",
    23  		Port: 8080,
    24  	}
    25  	podAddr := &watcher.Address{
    26  		IP:   "10.10.11.11",
    27  		Port: 8080,
    28  		Pod: &corev1.Pod{
    29  			ObjectMeta: metav1.ObjectMeta{
    30  				Annotations: map[string]string{
    31  					consts.ProxyOpaquePortsAnnotation: "8080",
    32  				},
    33  			},
    34  		},
    35  	}
    36  
    37  	t.Run("Sends update", func(t *testing.T) {
    38  		mockGetProfileServer := &mockDestinationGetProfileServer{
    39  			profilesReceived: make(chan *pb.DestinationProfile), // UNBUFFERED
    40  		}
    41  		log := logging.WithField("test", t.Name())
    42  		translator := newEndpointProfileTranslator(
    43  			true, "cluster", "identity", make(map[uint32]struct{}), nil,
    44  			mockGetProfileServer,
    45  			nil,
    46  			log,
    47  		)
    48  		translator.Start()
    49  		defer translator.Stop()
    50  
    51  		if err := translator.Update(addr); err != nil {
    52  			t.Fatal("Expected update")
    53  		}
    54  		select {
    55  		case p := <-mockGetProfileServer.profilesReceived:
    56  			log.Debugf("Received update: %v", p)
    57  		case <-time.After(1 * time.Second):
    58  			t.Fatal("No update received")
    59  		}
    60  
    61  		if err := translator.Update(addr); err != nil {
    62  			t.Fatal("Unexpected update")
    63  		}
    64  		select {
    65  		case p := <-mockGetProfileServer.profilesReceived:
    66  			t.Fatalf("Duplicate update sent: %v", p)
    67  		case <-time.After(1 * time.Second):
    68  		}
    69  
    70  		if err := translator.Update(podAddr); err != nil {
    71  			t.Fatal("Expected update")
    72  		}
    73  		select {
    74  		case p := <-mockGetProfileServer.profilesReceived:
    75  			log.Debugf("Received update: %v", p)
    76  		case <-time.After(1 * time.Second):
    77  		}
    78  	})
    79  
    80  	t.Run("Handles overflow", func(t *testing.T) {
    81  		mockGetProfileServer := &mockDestinationGetProfileServer{
    82  			profilesReceived: make(chan *pb.DestinationProfile, 1),
    83  		}
    84  		log := logging.WithField("test", t.Name())
    85  		endStream := make(chan struct{})
    86  		translator := newEndpointProfileTranslator(
    87  			true, "cluster", "identity", make(map[uint32]struct{}), nil,
    88  			mockGetProfileServer,
    89  			endStream,
    90  			log,
    91  		)
    92  
    93  		// We avoid starting the translator so that it doesn't drain its update
    94  		// queue and we can test the overflow behavior.
    95  
    96  		for i := 0; i < updateQueueCapacity/2; i++ {
    97  			if err := translator.Update(podAddr); err != nil {
    98  				t.Fatal("Expected update")
    99  			}
   100  			select {
   101  			case <-endStream:
   102  				t.Fatal("Stream ended prematurely")
   103  			default:
   104  			}
   105  
   106  			if err := translator.Update(addr); err != nil {
   107  				t.Fatal("Expected update")
   108  			}
   109  			select {
   110  			case <-endStream:
   111  				t.Fatal("Stream ended prematurely")
   112  			default:
   113  			}
   114  		}
   115  
   116  		// The queue should be full and the next update should fail.
   117  		t.Logf("Queue length=%d capacity=%d", translator.queueLen(), updateQueueCapacity)
   118  		if err := translator.Update(podAddr); err == nil {
   119  			if !errors.Is(err, http.ErrServerClosed) {
   120  				t.Fatalf("Expected update to fail; queue=%d; capacity=%d", translator.queueLen(), updateQueueCapacity)
   121  			}
   122  		}
   123  
   124  		select {
   125  		case <-endStream:
   126  		default:
   127  			t.Fatal("Stream should have ended")
   128  		}
   129  
   130  		// XXX We should assert that endpointProfileUpdatesQueueOverflowCounter
   131  		// == 1 but we can't read counter values.
   132  	})
   133  }
   134  

View as plain text