1
16
17 package apiserver
18
19 import (
20 "net/http"
21 "net/url"
22 "sync/atomic"
23
24 "k8s.io/apimachinery/pkg/runtime"
25 "k8s.io/apimachinery/pkg/util/httpstream"
26 "k8s.io/apimachinery/pkg/util/proxy"
27 "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
28 endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
29 genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
30 utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
31 apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy"
32 "k8s.io/apiserver/pkg/util/x509metrics"
33 "k8s.io/client-go/transport"
34 "k8s.io/klog/v2"
35 apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
36 apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
37 )
38
39 const (
40 aggregatorComponent string = "aggregator"
41 )
42
43 type certKeyFunc func() ([]byte, []byte)
44
45
46
47 type proxyHandler struct {
48
49 localDelegate http.Handler
50
51
52 proxyCurrentCertKeyContent certKeyFunc
53 proxyTransportDial *transport.DialHolder
54
55
56 serviceResolver ServiceResolver
57
58 handlingInfo atomic.Value
59
60
61 rejectForwardingRedirects bool
62 }
63
64 type proxyHandlingInfo struct {
65
66 local bool
67
68
69 name string
70
71 transportConfig *transport.Config
72
73
74 transportBuildingError error
75
76 proxyRoundTripper http.RoundTripper
77
78 serviceName string
79
80 serviceNamespace string
81
82 serviceAvailable bool
83
84 servicePort int32
85 }
86
87 func proxyError(w http.ResponseWriter, req *http.Request, error string, code int) {
88 http.Error(w, error, code)
89
90 ctx := req.Context()
91 info, ok := genericapirequest.RequestInfoFrom(ctx)
92 if !ok {
93 klog.Warning("no RequestInfo found in the context")
94 return
95 }
96
97 endpointmetrics.RecordRequestTermination(req, info, aggregatorComponent, code)
98 }
99
100 func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
101 value := r.handlingInfo.Load()
102 if value == nil {
103 r.localDelegate.ServeHTTP(w, req)
104 return
105 }
106 handlingInfo := value.(proxyHandlingInfo)
107 if handlingInfo.local {
108 if r.localDelegate == nil {
109 http.Error(w, "", http.StatusNotFound)
110 return
111 }
112 r.localDelegate.ServeHTTP(w, req)
113 return
114 }
115
116 if !handlingInfo.serviceAvailable {
117 proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
118 return
119 }
120
121 if handlingInfo.transportBuildingError != nil {
122 proxyError(w, req, handlingInfo.transportBuildingError.Error(), http.StatusInternalServerError)
123 return
124 }
125
126 user, ok := genericapirequest.UserFrom(req.Context())
127 if !ok {
128 proxyError(w, req, "missing user", http.StatusInternalServerError)
129 return
130 }
131
132
133 location := &url.URL{}
134 location.Scheme = "https"
135 rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName, handlingInfo.servicePort)
136 if err != nil {
137 klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err)
138 proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
139 return
140 }
141 location.Host = rloc.Host
142 location.Path = req.URL.Path
143 location.RawQuery = req.URL.Query().Encode()
144
145 newReq, cancelFn := apiserverproxyutil.NewRequestForProxy(location, req)
146 defer cancelFn()
147
148 if handlingInfo.proxyRoundTripper == nil {
149 proxyError(w, req, "", http.StatusNotFound)
150 return
151 }
152
153 proxyRoundTripper := handlingInfo.proxyRoundTripper
154 upgrade := httpstream.IsUpgradeRequest(req)
155
156 proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)
157
158
159
160
161 if upgrade {
162 transport.SetAuthProxyHeaders(newReq, user.GetName(), user.GetGroups(), user.GetExtra())
163 }
164
165 handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
166 if r.rejectForwardingRedirects {
167 handler.RejectForwardingRedirects = true
168 }
169 utilflowcontrol.RequestDelegated(req.Context())
170 handler.ServeHTTP(w, newReq)
171 }
172
173
174 type responder struct {
175 w http.ResponseWriter
176 }
177
178
179
180 func (r *responder) Object(statusCode int, obj runtime.Object) {
181 responsewriters.WriteRawJSON(statusCode, obj, r.w)
182 }
183
184 func (r *responder) Error(_ http.ResponseWriter, _ *http.Request, err error) {
185 http.Error(r.w, err.Error(), http.StatusServiceUnavailable)
186 }
187
188
189
190
191
192 func (r *proxyHandler) setServiceAvailable() {
193 info := r.handlingInfo.Load().(proxyHandlingInfo)
194 info.serviceAvailable = true
195 r.handlingInfo.Store(info)
196 }
197
198 func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIService) {
199 if apiService.Spec.Service == nil {
200 r.handlingInfo.Store(proxyHandlingInfo{local: true})
201 return
202 }
203
204 proxyClientCert, proxyClientKey := r.proxyCurrentCertKeyContent()
205
206 transportConfig := &transport.Config{
207 TLS: transport.TLSConfig{
208 Insecure: apiService.Spec.InsecureSkipTLSVerify,
209 ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
210 CertData: proxyClientCert,
211 KeyData: proxyClientKey,
212 CAData: apiService.Spec.CABundle,
213 },
214 DialHolder: r.proxyTransportDial,
215 }
216 transportConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(
217 x509MissingSANCounter,
218 x509InsecureSHA1Counter,
219 ))
220
221 newInfo := proxyHandlingInfo{
222 name: apiService.Name,
223 transportConfig: transportConfig,
224 serviceName: apiService.Spec.Service.Name,
225 serviceNamespace: apiService.Spec.Service.Namespace,
226 servicePort: *apiService.Spec.Service.Port,
227 serviceAvailable: apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService, apiregistrationv1api.Available),
228 }
229 newInfo.proxyRoundTripper, newInfo.transportBuildingError = transport.New(newInfo.transportConfig)
230 if newInfo.transportBuildingError != nil {
231 klog.Warning(newInfo.transportBuildingError.Error())
232 }
233 r.handlingInfo.Store(newInfo)
234 }
235
View as plain text