...
1
16
17 package discovery
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "net/http"
24 "net/http/httptest"
25 "net/url"
26 "strconv"
27 "sync"
28 "time"
29
30 "k8s.io/apimachinery/pkg/util/wait"
31
32 "k8s.io/client-go/kubernetes"
33
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
36 corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
37 )
38
39 type FakeService interface {
40 Run(ctx context.Context) error
41 Port() *int32
42 Name() string
43 }
44
45
46
47 type fakeService struct {
48 name string
49 client kubernetes.Interface
50 handler http.Handler
51
52 lock sync.RWMutex
53 activePort *int32
54 }
55
56 func NewFakeService(name string, client kubernetes.Interface, handler http.Handler) *fakeService {
57 return &fakeService{
58 name: name,
59 client: client,
60 handler: handler,
61 }
62 }
63
64 func (f *fakeService) Run(ctx context.Context) error {
65 aggregatedServer := httptest.NewUnstartedServer(f.handler)
66 aggregatedServer.StartTLS()
67 defer aggregatedServer.Close()
68
69 serverURL, err := url.Parse(aggregatedServer.URL)
70 if err != nil {
71
72 panic(err)
73 }
74
75 serverPort, err := strconv.Atoi(serverURL.Port())
76 if err != nil {
77
78 panic(err)
79 }
80
81 port := int32(serverPort)
82
83
84 service, err := f.client.CoreV1().Services("default").Apply(
85 ctx,
86 corev1apply.Service(f.name, "default").
87 WithSpec(corev1apply.ServiceSpec().
88 WithPorts(
89 corev1apply.ServicePort().
90 WithPort(port)).
91 WithType("ExternalName").
92 WithExternalName("localhost")),
93 metav1.ApplyOptions{
94 FieldManager: "test-manager",
95 },
96 )
97 if err != nil {
98 return err
99 }
100
101 f.lock.Lock()
102 f.activePort = &port
103 f.lock.Unlock()
104
105 <-ctx.Done()
106
107 f.lock.Lock()
108 f.activePort = nil
109 f.lock.Unlock()
110
111
112 err = f.client.CoreV1().Services("default").Delete(ctx, service.Name, metav1.DeleteOptions{})
113 if errors.Is(err, context.Canceled) {
114 err = nil
115 }
116 return err
117 }
118
119 func (f *fakeService) WaitForReady(ctx context.Context) error {
120 err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, time.Second, false, func(ctx context.Context) (done bool, err error) {
121 return f.Port() != nil, nil
122 })
123
124 if errors.Is(err, context.Canceled) {
125 err = nil
126 } else if err != nil {
127 err = fmt.Errorf("service should have come alive in a reasonable amount of time: %w", err)
128 }
129
130 return err
131 }
132
133 func (f *fakeService) Port() *int32 {
134
135
136 f.lock.RLock()
137 defer f.lock.RUnlock()
138 return f.activePort
139 }
140
141 func (f *fakeService) Name() string {
142 return f.name
143 }
144
View as plain text