1 package agent
2
3 import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "io/ioutil"
8 "net"
9 "net/http"
10 "net/url"
11 "path"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/getkin/kin-openapi/openapi3"
17 "github.com/pkg/errors"
18
19 amb "github.com/datawire/ambassador/v2/pkg/api/getambassador.io/v3alpha1"
20 "github.com/datawire/ambassador/v2/pkg/kates"
21 snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
22 "github.com/datawire/dlib/dlog"
23 )
24
25
26 type APIDocsStore struct {
27
28 Client APIDocsHTTPClient
29
30 DontProcessSnapshotBeforeTime time.Time
31
32
33 store *inMemoryStore
34
35 docsDiff *docsDiffCalculator
36
37 processingSnapshotMutex sync.RWMutex
38 }
39
40
41 func NewAPIDocsStore() *APIDocsStore {
42 return &APIDocsStore{
43 Client: newAPIDocsHTTPClient(),
44 DontProcessSnapshotBeforeTime: time.Unix(0, 0),
45
46 store: newInMemoryStore(),
47 docsDiff: newMappingDocsCalculator([]docMappingRef{}),
48 }
49 }
50
51
52
53 func (a *APIDocsStore) ProcessSnapshot(ctx context.Context, snapshot *snapshotTypes.Snapshot) {
54 a.processingSnapshotMutex.Lock()
55 defer a.processingSnapshotMutex.Unlock()
56
57 emptyStore := len(a.store.getAll()) == 0
58 mappings := getProcessableMappingsFromSnapshot(snapshot)
59 if len(mappings) == 0 && emptyStore {
60 dlog.Debug(ctx, "Skipping apidocs snapshot processing until a mapping with documentation is found")
61 return
62 }
63
64 now := time.Now()
65 if now.Before(a.DontProcessSnapshotBeforeTime) {
66 dlog.Debugf(ctx, "Skipping apidocs snapshot processing until %v", a.DontProcessSnapshotBeforeTime)
67 return
68 }
69
70 dlog.Debug(ctx, "Processing snapshot...")
71 a.DontProcessSnapshotBeforeTime = now.Add(1 * time.Minute)
72
73 if emptyStore {
74
75
76
77 a.scrape(ctx, mappings)
78 } else {
79
80 go a.scrape(ctx, mappings)
81 }
82 }
83
84
85 func (a *APIDocsStore) StateOfWorld() []*snapshotTypes.APIDoc {
86 return toAPIDocs(a.store.getAll())
87 }
88
89 func getProcessableMappingsFromSnapshot(snapshot *snapshotTypes.Snapshot) []*amb.Mapping {
90 processableMappings := []*amb.Mapping{}
91 if snapshot == nil || snapshot.Kubernetes == nil {
92 return processableMappings
93 }
94
95 for _, mapping := range snapshot.Kubernetes.Mappings {
96 if mapping == nil {
97 continue
98 }
99 mappingDocs := mapping.Spec.Docs
100 if mappingDocs == nil || (mappingDocs.Ignored != nil && *mappingDocs.Ignored == true) {
101 continue
102 }
103 processableMappings = append(processableMappings, mapping)
104 }
105 return processableMappings
106 }
107
108
109
110
111
112
113
114
115
116 func (a *APIDocsStore) scrape(ctx context.Context, mappings []*amb.Mapping) {
117 defer func() {
118
119
120 a.docsDiff.deleteOld(ctx, a.store)
121 dlog.Debug(ctx, "Iteration done")
122 }()
123
124 dlog.Debugf(ctx, "Found %d Mappings", len(mappings))
125 for _, mapping := range mappings {
126 mappingDocs := mapping.Spec.Docs
127 displayName := mappingDocs.DisplayName
128 if displayName == "" {
129 displayName = fmt.Sprintf("%s.%s", mapping.GetName(), mapping.GetNamespace())
130 }
131 mappingHeaders := buildMappingRequestHeaders(mapping.Spec.Headers)
132 mappingPrefix := mapping.Spec.Prefix
133
134 mappingHostname := mapping.Spec.Hostname
135 if mappingHostname == "" || mappingHostname == "*" {
136 mappingHostname = mapping.Spec.DeprecatedHost
137 }
138
139 dm := &docMappingRef{
140 Ref: &kates.ObjectReference{
141 Kind: mapping.Kind,
142 Namespace: mapping.Namespace,
143 Name: mapping.Name,
144 UID: mapping.UID,
145 APIVersion: mapping.APIVersion,
146 ResourceVersion: mapping.ResourceVersion,
147 },
148 Name: displayName,
149 }
150 a.docsDiff.add(ctx, dm)
151
152 var doc *openAPIDoc
153 if mappingDocs.URL != "" {
154 parsedURL, err := url.Parse(mappingDocs.URL)
155 if err != nil {
156 dlog.Errorf(ctx, "could not parse URL or path in 'docs' %q", mappingDocs.URL)
157 continue
158 }
159 dlog.Debugf(ctx, "'url' specified: querying %s", parsedURL)
160 doc = a.getDoc(ctx, parsedURL, "", mappingHeaders, mappingHostname, "", false)
161 } else {
162 mappingsDocsURL, err := extractQueryableDocsURL(mapping)
163 if err != nil {
164 dlog.Errorf(ctx, "could not parse URL or path in 'docs': %v", err)
165 continue
166 }
167 dlog.Debugf(ctx, "'url' specified: querying %s", mappingsDocsURL)
168 doc = a.getDoc(ctx, mappingsDocsURL, mappingHostname, mappingHeaders, mappingHostname, mappingPrefix, true)
169 }
170
171 if doc != nil {
172 a.store.add(dm, doc)
173 }
174 }
175 }
176
177 func extractQueryableDocsURL(mapping *amb.Mapping) (*url.URL, error) {
178 mappingDocsPath := mapping.Spec.Docs.Path
179 mappingRewrite := "/"
180 if mapping.Spec.Rewrite != nil {
181 mappingRewrite = *mapping.Spec.Rewrite
182 }
183 if mappingDocsPath != "" {
184 mappingDocsPath = strings.ReplaceAll(mappingRewrite+mappingDocsPath, "//", "/")
185 }
186
187 mappingsDocsURL, err := url.Parse(mapping.Spec.Service + mappingDocsPath)
188 if err != nil {
189 return nil, err
190 }
191 if mappingsDocsURL.Host == "" {
192
193
194 mappingsDocsURL.Host = mapping.Spec.Service
195 mappingsDocsURL.Path = mappingDocsPath
196 mappingsDocsURL.Scheme = ""
197 mappingsDocsURL.Opaque = ""
198 mappingsDocsURL = mappingsDocsURL.ResolveReference(mappingsDocsURL)
199 }
200 if !strings.Contains(mappingsDocsURL.Hostname(), ".") {
201
202 servicePort := mappingsDocsURL.Port()
203 mappingsDocsURL.Host = fmt.Sprintf("%s.%s", mappingsDocsURL.Hostname(), mapping.Namespace)
204 if servicePort != "" {
205 mappingsDocsURL.Host = fmt.Sprintf("%s:%s", mappingsDocsURL.Hostname(), servicePort)
206 }
207 }
208 if mappingsDocsURL.Scheme == "" {
209
210 mappingsDocsURL.Scheme = "http"
211 }
212
213 return mappingsDocsURL, nil
214 }
215
216 func (a *APIDocsStore) getDoc(ctx context.Context, queryURL *url.URL, queryHost string, queryHeaders []Header, publicHost string, prefix string, keepExistingPrefix bool) *openAPIDoc {
217 b, err := a.Client.Get(ctx, queryURL, queryHost, queryHeaders)
218 if err != nil {
219 dlog.Errorf(ctx, "get failed %s: %v", queryURL, err)
220 return nil
221 }
222
223 if b != nil {
224 return newOpenAPI(ctx, b, publicHost, prefix, keepExistingPrefix)
225 }
226 return nil
227 }
228
229
230 type openAPIDoc struct {
231
232 JSON []byte
233
234 Type string
235
236 Version string
237 }
238
239
240
241 func newOpenAPI(ctx context.Context, docBytes []byte, baseURL string, prefix string, keepExistingPrefix bool) *openAPIDoc {
242 dlog.Debugf(ctx, "Trying to create new OpenAPI doc: base_url=%q prefix=%q", baseURL, prefix)
243
244 loader := openapi3.NewLoader()
245 doc, err := loader.LoadFromData(docBytes)
246 if err != nil {
247 dlog.Errorln(ctx, "failed to load OpenAPI spec:", err)
248 return nil
249 }
250 err = doc.Validate(loader.Context)
251 if err != nil {
252 dlog.Errorln(ctx, "failed to validate OpenAPI spec:", err)
253 return nil
254 }
255
256
257
258
259 existingPrefix := ""
260 if doc.Servers != nil && doc.Servers[0] != nil {
261 currentServerURL := doc.Servers[0].URL
262 dlog.Debugf(ctx, "Checking first server's URL: url=%#v", currentServerURL)
263 existingUrl, err := url.Parse(currentServerURL)
264 if err == nil {
265 existingPrefix = existingUrl.Path
266 } else {
267 dlog.Errorf(ctx, "failed to parse 'servers' URL: url=%q: %v", currentServerURL, err)
268 }
269 }
270 base, err := url.Parse(baseURL)
271 if err != nil {
272 dlog.Debugf(ctx, "could not parse URL %q", baseURL)
273 } else {
274 if prefix != "" {
275 if existingPrefix != "" && keepExistingPrefix {
276 base.Path = path.Join(base.Path, prefix, existingPrefix)
277 } else {
278 base.Path = path.Join(base.Path, prefix)
279 }
280 } else {
281 base.Path = existingPrefix
282 }
283
284 doc.Servers = []*openapi3.Server{{
285 URL: base.String(),
286 }}
287 }
288
289 json, err := doc.MarshalJSON()
290 if err != nil {
291 dlog.Errorln(ctx, "failed to marshal OpenAPI spec:", err)
292 return nil
293 }
294
295 return &openAPIDoc{
296 JSON: json,
297 Type: "OpenAPI",
298 Version: "v3",
299 }
300 }
301
302 func buildMappingRequestHeaders(mappingHeaders map[string]string) []Header {
303 headers := []Header{}
304
305 for key, value := range mappingHeaders {
306 if key == ":authority" {
307 continue
308 }
309 headers = append(headers, Header{Name: key, Value: value})
310 }
311
312 return headers
313 }
314
315 type Header struct {
316 Name string
317 Value string
318 }
319
320 type APIDocsHTTPClient interface {
321 Get(ctx context.Context, requestURL *url.URL, requestHost string, requestHeaders []Header) ([]byte, error)
322 }
323
324 type apiDocsHTTPClient struct {
325 *http.Client
326 }
327
328 func newAPIDocsHTTPClient() *apiDocsHTTPClient {
329 dialer := &net.Dialer{
330 Timeout: time.Second * 10,
331 }
332 c := &http.Client{
333 Timeout: time.Second * 10,
334 Transport: &http.Transport{
335
336 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
337 Dial: dialer.Dial,
338 },
339 }
340 return &apiDocsHTTPClient{c}
341 }
342
343 func (c *apiDocsHTTPClient) Get(ctx context.Context, requestURL *url.URL, requestHost string, requestHeaders []Header) ([]byte, error) {
344 ctx = dlog.WithField(ctx, "url", requestURL)
345 ctx = dlog.WithField(ctx, "host", requestHost)
346
347 req, err := http.NewRequest("GET", requestURL.String(), nil)
348 if err != nil {
349 dlog.Error(ctx, err)
350 return nil, err
351 }
352 req.Close = true
353
354 if requestHost != "" {
355 dlog.Debugf(ctx, "Using host=%s", requestHost)
356 req.Host = requestHost
357 }
358
359 if requestHeaders != nil {
360 for _, queryHeader := range requestHeaders {
361 dlog.Debugf(ctx, "Adding header %s=%s", queryHeader.Name, queryHeader.Value)
362 req.Header.Set(queryHeader.Name, queryHeader.Value)
363 }
364 }
365
366 res, err := c.Do(req)
367 if err != nil {
368 dlog.Error(ctx, err)
369 return nil, err
370 }
371 defer res.Body.Close()
372
373 if res.StatusCode != 200 {
374 dlog.Errorf(ctx, "Bad HTTP request: status_code=%v", res.StatusCode)
375 return nil, fmt.Errorf("HTTP error %d from %s", res.StatusCode, requestURL)
376 }
377
378 buf, err := ioutil.ReadAll(res.Body)
379 if err != nil {
380 return nil, errors.Wrap(err, "failed to read HTTP response body")
381 }
382
383 return buf, nil
384 }
385
386
387 type docMappingRef struct {
388 Ref *kates.ObjectReference
389 Name string
390 }
391
392 type mappingDocMap map[string]bool
393
394
395 type docsDiffCalculator struct {
396 previous mappingDocMap
397 current mappingDocMap
398 }
399
400
401 func newMappingDocsCalculator(known []docMappingRef) *docsDiffCalculator {
402 knownMap := make(mappingDocMap)
403 for _, m := range known {
404 knownMap[string(m.Ref.UID)] = true
405 }
406 return &docsDiffCalculator{current: make(mappingDocMap), previous: knownMap}
407 }
408
409
410 func (d *docsDiffCalculator) newRound() []string {
411 mappingUIDsToDelete := make([]string, 0)
412
413 for previousRef := range d.previous {
414 if !d.current[previousRef] {
415 mappingUIDsToDelete = append(mappingUIDsToDelete, string(previousRef))
416 }
417 }
418 d.previous = d.current
419 d.current = make(mappingDocMap)
420
421 return mappingUIDsToDelete
422 }
423
424
425 func (d *docsDiffCalculator) add(ctx context.Context, dm *docMappingRef) {
426 if dm != nil && dm.Ref != nil {
427 dlog.Debugf(ctx, "Adding Mapping Docs diff reference %s", dm)
428 d.current[string(dm.Ref.UID)] = true
429 }
430 }
431
432
433 func (d *docsDiffCalculator) deleteOld(ctx context.Context, store *inMemoryStore) {
434 for _, mappingUID := range d.newRound() {
435 dlog.Debugf(ctx, "Deleting old Mapping Docs %s", mappingUID)
436 store.deleteRefUID(mappingUID)
437 }
438 }
439
440 type docsRef struct {
441 docMappingRef *docMappingRef
442 openAPIDoc *openAPIDoc
443 }
444 type docsRefMap map[string]*docsRef
445
446 type inMemoryStore struct {
447 entriesMutex sync.RWMutex
448 entries docsRefMap
449 }
450
451 func newInMemoryStore() *inMemoryStore {
452 res := &inMemoryStore{
453 entries: make(docsRefMap),
454 }
455
456 return res
457 }
458
459 func (s *inMemoryStore) add(dm *docMappingRef, openAPIDoc *openAPIDoc) {
460 s.entriesMutex.Lock()
461 defer s.entriesMutex.Unlock()
462
463 s.entries[string(dm.Ref.UID)] = &docsRef{docMappingRef: dm, openAPIDoc: openAPIDoc}
464 }
465
466 func (s *inMemoryStore) deleteRefUID(mappingRefUID string) {
467 s.entriesMutex.Lock()
468 defer s.entriesMutex.Unlock()
469
470 for entryUID := range s.entries {
471 if mappingRefUID == entryUID {
472 delete(s.entries, entryUID)
473 }
474 }
475 }
476
477 func (s *inMemoryStore) getAll() []*docsRef {
478 s.entriesMutex.RLock()
479 defer s.entriesMutex.RUnlock()
480
481 var dr []*docsRef
482 for _, e := range s.entries {
483 dr = append(dr, e)
484 }
485 return dr
486 }
487
488 func toAPIDocs(docsRefs []*docsRef) []*snapshotTypes.APIDoc {
489 results := make([]*snapshotTypes.APIDoc, 0)
490 for _, doc := range docsRefs {
491 if doc != nil && doc.docMappingRef != nil && doc.openAPIDoc != nil {
492 apiDoc := &snapshotTypes.APIDoc{
493 Data: doc.openAPIDoc.JSON,
494 TypeMeta: &kates.TypeMeta{
495 Kind: doc.openAPIDoc.Type,
496 APIVersion: doc.openAPIDoc.Version,
497 },
498 Metadata: &kates.ObjectMeta{
499 Name: doc.docMappingRef.Name,
500 },
501 TargetRef: doc.docMappingRef.Ref,
502 }
503 results = append(results, apiDoc)
504 }
505 }
506 return results
507 }
508
View as plain text