...

Source file src/github.com/datawire/ambassador/v2/pkg/agent/api_docs.go

Documentation: github.com/datawire/ambassador/v2/pkg/agent

     1  package agent
     2  
     3  import (
     4  	"context"
     5  	"crypto/tls"
     6  	"fmt"
     7  	"io/ioutil"
     8  	"net"
     9  	"net/http"
    10  	"net/url"
    11  	"path"
    12  	"strings"
    13  	"sync"
    14  	"time"
    15  
    16  	"github.com/getkin/kin-openapi/openapi3"
    17  	"github.com/pkg/errors"
    18  
    19  	amb "github.com/datawire/ambassador/v2/pkg/api/getambassador.io/v3alpha1"
    20  	"github.com/datawire/ambassador/v2/pkg/kates"
    21  	snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
    22  	"github.com/datawire/dlib/dlog"
    23  )
    24  
    25  // APIDocsStore is responsible for collecting the API docs from Mapping resources in a k8s cluster.
    26  type APIDocsStore struct {
    27  	// Client is used to scrape all Mappings for API documentation
    28  	Client APIDocsHTTPClient
    29  	// DontProcessSnapshotBeforeTime keeps track of the moment the next received snapshot should be processed
    30  	DontProcessSnapshotBeforeTime time.Time
    31  
    32  	// store hold the state of the world, with all Mappings and their API docs
    33  	store *inMemoryStore
    34  	// docsDiff helps calculate whether an API doc should be kept or discarded after processing a snapshot
    35  	docsDiff *docsDiffCalculator
    36  	// processingSnapshotMutex holds a lock so that a single snapshot gets processed at a time
    37  	processingSnapshotMutex sync.RWMutex
    38  }
    39  
    40  // NewAPIDocsStore is the main APIDocsStore constructor.
    41  func NewAPIDocsStore() *APIDocsStore {
    42  	return &APIDocsStore{
    43  		Client:                        newAPIDocsHTTPClient(),
    44  		DontProcessSnapshotBeforeTime: time.Unix(0, 0),
    45  
    46  		store:    newInMemoryStore(),
    47  		docsDiff: newMappingDocsCalculator([]docMappingRef{}),
    48  	}
    49  }
    50  
    51  // ProcessSnapshot will query the required services to retrieve the API documentation for each
    52  // of the Mappings in the snapshot. It will execute at most once every minute.
    53  func (a *APIDocsStore) ProcessSnapshot(ctx context.Context, snapshot *snapshotTypes.Snapshot) {
    54  	a.processingSnapshotMutex.Lock()
    55  	defer a.processingSnapshotMutex.Unlock()
    56  
    57  	emptyStore := len(a.store.getAll()) == 0
    58  	mappings := getProcessableMappingsFromSnapshot(snapshot)
    59  	if len(mappings) == 0 && emptyStore {
    60  		dlog.Debug(ctx, "Skipping apidocs snapshot processing until a mapping with documentation is found")
    61  		return
    62  	}
    63  
    64  	now := time.Now()
    65  	if now.Before(a.DontProcessSnapshotBeforeTime) {
    66  		dlog.Debugf(ctx, "Skipping apidocs snapshot processing until %v", a.DontProcessSnapshotBeforeTime)
    67  		return
    68  	}
    69  
    70  	dlog.Debug(ctx, "Processing snapshot...")
    71  	a.DontProcessSnapshotBeforeTime = now.Add(1 * time.Minute)
    72  
    73  	if emptyStore {
    74  		// We don't have anything in memory...
    75  		// Retrieve API docs synchronously so it appears snappy to the first-time user,
    76  		// or when the agent starts.
    77  		a.scrape(ctx, mappings)
    78  	} else {
    79  		// This is just an update, it can be processed asynchronously.
    80  		go a.scrape(ctx, mappings)
    81  	}
    82  }
    83  
    84  // StateOfWorld returns the current state of all discovered API docs.
    85  func (a *APIDocsStore) StateOfWorld() []*snapshotTypes.APIDoc {
    86  	return toAPIDocs(a.store.getAll())
    87  }
    88  
    89  func getProcessableMappingsFromSnapshot(snapshot *snapshotTypes.Snapshot) []*amb.Mapping {
    90  	processableMappings := []*amb.Mapping{}
    91  	if snapshot == nil || snapshot.Kubernetes == nil {
    92  		return processableMappings
    93  	}
    94  
    95  	for _, mapping := range snapshot.Kubernetes.Mappings {
    96  		if mapping == nil {
    97  			continue
    98  		}
    99  		mappingDocs := mapping.Spec.Docs
   100  		if mappingDocs == nil || (mappingDocs.Ignored != nil && *mappingDocs.Ignored == true) {
   101  			continue
   102  		}
   103  		processableMappings = append(processableMappings, mapping)
   104  	}
   105  	return processableMappings
   106  }
   107  
   108  // scrape will take care of fetching OpenAPI documentation from each of the
   109  // Mappings resources as we process a snapshot.
   110  //
   111  // Be careful as there is a very similar implementation of this logic in the DevPortal which
   112  // uses the ambassador diag representation to retrieve OpenAPI documentation from
   113  // Mapping resources.
   114  // Since both the DevPortal and the agent make use of this `docs` property, evolutions
   115  // made here should be considered for DevPortal too.
   116  func (a *APIDocsStore) scrape(ctx context.Context, mappings []*amb.Mapping) {
   117  	defer func() {
   118  		// Once we are finished retrieving mapping docs, delete anything we
   119  		// don't need anymore
   120  		a.docsDiff.deleteOld(ctx, a.store)
   121  		dlog.Debug(ctx, "Iteration done")
   122  	}()
   123  
   124  	dlog.Debugf(ctx, "Found %d Mappings", len(mappings))
   125  	for _, mapping := range mappings {
   126  		mappingDocs := mapping.Spec.Docs
   127  		displayName := mappingDocs.DisplayName
   128  		if displayName == "" {
   129  			displayName = fmt.Sprintf("%s.%s", mapping.GetName(), mapping.GetNamespace())
   130  		}
   131  		mappingHeaders := buildMappingRequestHeaders(mapping.Spec.Headers)
   132  		mappingPrefix := mapping.Spec.Prefix
   133  		// Lookup the Hostname first since it is more restrictive, otherwise fallback on the Host attribute
   134  		mappingHostname := mapping.Spec.Hostname
   135  		if mappingHostname == "" || mappingHostname == "*" {
   136  			mappingHostname = mapping.Spec.DeprecatedHost
   137  		}
   138  
   139  		dm := &docMappingRef{
   140  			Ref: &kates.ObjectReference{
   141  				Kind:            mapping.Kind,
   142  				Namespace:       mapping.Namespace,
   143  				Name:            mapping.Name,
   144  				UID:             mapping.UID,
   145  				APIVersion:      mapping.APIVersion,
   146  				ResourceVersion: mapping.ResourceVersion,
   147  			},
   148  			Name: displayName,
   149  		}
   150  		a.docsDiff.add(ctx, dm)
   151  
   152  		var doc *openAPIDoc
   153  		if mappingDocs.URL != "" {
   154  			parsedURL, err := url.Parse(mappingDocs.URL)
   155  			if err != nil {
   156  				dlog.Errorf(ctx, "could not parse URL or path in 'docs' %q", mappingDocs.URL)
   157  				continue
   158  			}
   159  			dlog.Debugf(ctx, "'url' specified: querying %s", parsedURL)
   160  			doc = a.getDoc(ctx, parsedURL, "", mappingHeaders, mappingHostname, "", false)
   161  		} else {
   162  			mappingsDocsURL, err := extractQueryableDocsURL(mapping)
   163  			if err != nil {
   164  				dlog.Errorf(ctx, "could not parse URL or path in 'docs': %v", err)
   165  				continue
   166  			}
   167  			dlog.Debugf(ctx, "'url' specified: querying %s", mappingsDocsURL)
   168  			doc = a.getDoc(ctx, mappingsDocsURL, mappingHostname, mappingHeaders, mappingHostname, mappingPrefix, true)
   169  		}
   170  
   171  		if doc != nil {
   172  			a.store.add(dm, doc)
   173  		}
   174  	}
   175  }
   176  
   177  func extractQueryableDocsURL(mapping *amb.Mapping) (*url.URL, error) {
   178  	mappingDocsPath := mapping.Spec.Docs.Path
   179  	mappingRewrite := "/"
   180  	if mapping.Spec.Rewrite != nil {
   181  		mappingRewrite = *mapping.Spec.Rewrite
   182  	}
   183  	if mappingDocsPath != "" {
   184  		mappingDocsPath = strings.ReplaceAll(mappingRewrite+mappingDocsPath, "//", "/")
   185  	}
   186  
   187  	mappingsDocsURL, err := url.Parse(mapping.Spec.Service + mappingDocsPath)
   188  	if err != nil {
   189  		return nil, err
   190  	}
   191  	if mappingsDocsURL.Host == "" {
   192  		// We did our best to parse the service+path, but failed to actually extract a Host.
   193  		// Now, be more explicit about which is which.
   194  		mappingsDocsURL.Host = mapping.Spec.Service
   195  		mappingsDocsURL.Path = mappingDocsPath
   196  		mappingsDocsURL.Scheme = ""
   197  		mappingsDocsURL.Opaque = ""
   198  		mappingsDocsURL = mappingsDocsURL.ResolveReference(mappingsDocsURL)
   199  	}
   200  	if !strings.Contains(mappingsDocsURL.Hostname(), ".") {
   201  		// The host does not appear to be a TLD, append the namespace
   202  		servicePort := mappingsDocsURL.Port()
   203  		mappingsDocsURL.Host = fmt.Sprintf("%s.%s", mappingsDocsURL.Hostname(), mapping.Namespace)
   204  		if servicePort != "" {
   205  			mappingsDocsURL.Host = fmt.Sprintf("%s:%s", mappingsDocsURL.Hostname(), servicePort)
   206  		}
   207  	}
   208  	if mappingsDocsURL.Scheme == "" {
   209  		// Assume plain-text if the mapping.Spec.Service did not specify https
   210  		mappingsDocsURL.Scheme = "http"
   211  	}
   212  
   213  	return mappingsDocsURL, nil
   214  }
   215  
   216  func (a *APIDocsStore) getDoc(ctx context.Context, queryURL *url.URL, queryHost string, queryHeaders []Header, publicHost string, prefix string, keepExistingPrefix bool) *openAPIDoc {
   217  	b, err := a.Client.Get(ctx, queryURL, queryHost, queryHeaders)
   218  	if err != nil {
   219  		dlog.Errorf(ctx, "get failed %s: %v", queryURL, err)
   220  		return nil
   221  	}
   222  
   223  	if b != nil {
   224  		return newOpenAPI(ctx, b, publicHost, prefix, keepExistingPrefix)
   225  	}
   226  	return nil
   227  }
   228  
   229  // openAPIDoc represent a typed OpenAPI/Swagger document
   230  type openAPIDoc struct {
   231  	// The actual OpenAPI/Swagger document in JSON
   232  	JSON []byte
   233  	// The document type (OpenAPI)
   234  	Type string
   235  	// The document version (v3)
   236  	Version string
   237  }
   238  
   239  // openAPIDoc constructor from raw bytes.
   240  // The baseURL and prefix are used to edit the original document with server information to query the API publicly
   241  func newOpenAPI(ctx context.Context, docBytes []byte, baseURL string, prefix string, keepExistingPrefix bool) *openAPIDoc {
   242  	dlog.Debugf(ctx, "Trying to create new OpenAPI doc: base_url=%q prefix=%q", baseURL, prefix)
   243  
   244  	loader := openapi3.NewLoader()
   245  	doc, err := loader.LoadFromData(docBytes)
   246  	if err != nil {
   247  		dlog.Errorln(ctx, "failed to load OpenAPI spec:", err)
   248  		return nil
   249  	}
   250  	err = doc.Validate(loader.Context)
   251  	if err != nil {
   252  		dlog.Errorln(ctx, "failed to validate OpenAPI spec:", err)
   253  		return nil
   254  	}
   255  
   256  	// Get prefix out of first server URL. E.g. if it's
   257  	// http://example.com/v1, we want to to add /v1 after the Ambassador
   258  	// prefix.
   259  	existingPrefix := ""
   260  	if doc.Servers != nil && doc.Servers[0] != nil {
   261  		currentServerURL := doc.Servers[0].URL
   262  		dlog.Debugf(ctx, "Checking first server's URL: url=%#v", currentServerURL)
   263  		existingUrl, err := url.Parse(currentServerURL)
   264  		if err == nil {
   265  			existingPrefix = existingUrl.Path
   266  		} else {
   267  			dlog.Errorf(ctx, "failed to parse 'servers' URL: url=%q: %v", currentServerURL, err)
   268  		}
   269  	}
   270  	base, err := url.Parse(baseURL)
   271  	if err != nil {
   272  		dlog.Debugf(ctx, "could not parse URL %q", baseURL)
   273  	} else {
   274  		if prefix != "" {
   275  			if existingPrefix != "" && keepExistingPrefix {
   276  				base.Path = path.Join(base.Path, prefix, existingPrefix)
   277  			} else {
   278  				base.Path = path.Join(base.Path, prefix)
   279  			}
   280  		} else {
   281  			base.Path = existingPrefix
   282  		}
   283  
   284  		doc.Servers = []*openapi3.Server{{
   285  			URL: base.String(),
   286  		}}
   287  	}
   288  
   289  	json, err := doc.MarshalJSON()
   290  	if err != nil {
   291  		dlog.Errorln(ctx, "failed to marshal OpenAPI spec:", err)
   292  		return nil
   293  	}
   294  
   295  	return &openAPIDoc{
   296  		JSON:    json,
   297  		Type:    "OpenAPI",
   298  		Version: "v3",
   299  	}
   300  }
   301  
   302  func buildMappingRequestHeaders(mappingHeaders map[string]string) []Header {
   303  	headers := []Header{}
   304  
   305  	for key, value := range mappingHeaders {
   306  		if key == ":authority" {
   307  			continue
   308  		}
   309  		headers = append(headers, Header{Name: key, Value: value})
   310  	}
   311  
   312  	return headers
   313  }
   314  
   315  type Header struct {
   316  	Name  string
   317  	Value string
   318  }
   319  
   320  type APIDocsHTTPClient interface {
   321  	Get(ctx context.Context, requestURL *url.URL, requestHost string, requestHeaders []Header) ([]byte, error)
   322  }
   323  
   324  type apiDocsHTTPClient struct {
   325  	*http.Client
   326  }
   327  
   328  func newAPIDocsHTTPClient() *apiDocsHTTPClient {
   329  	dialer := &net.Dialer{
   330  		Timeout: time.Second * 10,
   331  	}
   332  	c := &http.Client{
   333  		Timeout: time.Second * 10,
   334  		Transport: &http.Transport{
   335  			/* #nosec */
   336  			TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
   337  			Dial:            dialer.Dial,
   338  		},
   339  	}
   340  	return &apiDocsHTTPClient{c}
   341  }
   342  
   343  func (c *apiDocsHTTPClient) Get(ctx context.Context, requestURL *url.URL, requestHost string, requestHeaders []Header) ([]byte, error) {
   344  	ctx = dlog.WithField(ctx, "url", requestURL)
   345  	ctx = dlog.WithField(ctx, "host", requestHost)
   346  
   347  	req, err := http.NewRequest("GET", requestURL.String(), nil)
   348  	if err != nil {
   349  		dlog.Error(ctx, err)
   350  		return nil, err
   351  	}
   352  	req.Close = true
   353  
   354  	if requestHost != "" {
   355  		dlog.Debugf(ctx, "Using host=%s", requestHost)
   356  		req.Host = requestHost
   357  	}
   358  
   359  	if requestHeaders != nil {
   360  		for _, queryHeader := range requestHeaders {
   361  			dlog.Debugf(ctx, "Adding header %s=%s", queryHeader.Name, queryHeader.Value)
   362  			req.Header.Set(queryHeader.Name, queryHeader.Value)
   363  		}
   364  	}
   365  
   366  	res, err := c.Do(req)
   367  	if err != nil {
   368  		dlog.Error(ctx, err)
   369  		return nil, err
   370  	}
   371  	defer res.Body.Close()
   372  
   373  	if res.StatusCode != 200 {
   374  		dlog.Errorf(ctx, "Bad HTTP request: status_code=%v", res.StatusCode)
   375  		return nil, fmt.Errorf("HTTP error %d from %s", res.StatusCode, requestURL)
   376  	}
   377  
   378  	buf, err := ioutil.ReadAll(res.Body)
   379  	if err != nil {
   380  		return nil, errors.Wrap(err, "failed to read HTTP response body")
   381  	}
   382  
   383  	return buf, nil
   384  }
   385  
   386  // docMappingRef holds a reference to a Mapping with a 'docs' attribute, for a given display name.
   387  type docMappingRef struct {
   388  	Ref  *kates.ObjectReference
   389  	Name string
   390  }
   391  
   392  type mappingDocMap map[string]bool
   393  
   394  // Figure out which Mapping and API doc no longer exist and need to be deleted.
   395  type docsDiffCalculator struct {
   396  	previous mappingDocMap
   397  	current  mappingDocMap
   398  }
   399  
   400  // newMappingDocsCalculator creates a new diff calculator for mapping docs
   401  func newMappingDocsCalculator(known []docMappingRef) *docsDiffCalculator {
   402  	knownMap := make(mappingDocMap)
   403  	for _, m := range known {
   404  		knownMap[string(m.Ref.UID)] = true
   405  	}
   406  	return &docsDiffCalculator{current: make(mappingDocMap), previous: knownMap}
   407  }
   408  
   409  // After retrieving all known mappings, newRound will return list of mapping docs to delete
   410  func (d *docsDiffCalculator) newRound() []string {
   411  	mappingUIDsToDelete := make([]string, 0)
   412  
   413  	for previousRef := range d.previous {
   414  		if !d.current[previousRef] {
   415  			mappingUIDsToDelete = append(mappingUIDsToDelete, string(previousRef))
   416  		}
   417  	}
   418  	d.previous = d.current
   419  	d.current = make(mappingDocMap)
   420  
   421  	return mappingUIDsToDelete
   422  }
   423  
   424  // add a MappingDoc that was successfully retrieved this round
   425  func (d *docsDiffCalculator) add(ctx context.Context, dm *docMappingRef) {
   426  	if dm != nil && dm.Ref != nil {
   427  		dlog.Debugf(ctx, "Adding Mapping Docs diff reference %s", dm)
   428  		d.current[string(dm.Ref.UID)] = true
   429  	}
   430  }
   431  
   432  // deleteOld deletes old MappingDocs that are no longer present
   433  func (d *docsDiffCalculator) deleteOld(ctx context.Context, store *inMemoryStore) {
   434  	for _, mappingUID := range d.newRound() {
   435  		dlog.Debugf(ctx, "Deleting old Mapping Docs %s", mappingUID)
   436  		store.deleteRefUID(mappingUID)
   437  	}
   438  }
   439  
   440  type docsRef struct {
   441  	docMappingRef *docMappingRef
   442  	openAPIDoc    *openAPIDoc
   443  }
   444  type docsRefMap map[string]*docsRef
   445  
   446  type inMemoryStore struct {
   447  	entriesMutex sync.RWMutex
   448  	entries      docsRefMap
   449  }
   450  
   451  func newInMemoryStore() *inMemoryStore {
   452  	res := &inMemoryStore{
   453  		entries: make(docsRefMap),
   454  	}
   455  
   456  	return res
   457  }
   458  
   459  func (s *inMemoryStore) add(dm *docMappingRef, openAPIDoc *openAPIDoc) {
   460  	s.entriesMutex.Lock()
   461  	defer s.entriesMutex.Unlock()
   462  
   463  	s.entries[string(dm.Ref.UID)] = &docsRef{docMappingRef: dm, openAPIDoc: openAPIDoc}
   464  }
   465  
   466  func (s *inMemoryStore) deleteRefUID(mappingRefUID string) {
   467  	s.entriesMutex.Lock()
   468  	defer s.entriesMutex.Unlock()
   469  
   470  	for entryUID := range s.entries {
   471  		if mappingRefUID == entryUID {
   472  			delete(s.entries, entryUID)
   473  		}
   474  	}
   475  }
   476  
   477  func (s *inMemoryStore) getAll() []*docsRef {
   478  	s.entriesMutex.RLock()
   479  	defer s.entriesMutex.RUnlock()
   480  
   481  	var dr []*docsRef
   482  	for _, e := range s.entries {
   483  		dr = append(dr, e)
   484  	}
   485  	return dr
   486  }
   487  
   488  func toAPIDocs(docsRefs []*docsRef) []*snapshotTypes.APIDoc {
   489  	results := make([]*snapshotTypes.APIDoc, 0)
   490  	for _, doc := range docsRefs {
   491  		if doc != nil && doc.docMappingRef != nil && doc.openAPIDoc != nil {
   492  			apiDoc := &snapshotTypes.APIDoc{
   493  				Data: doc.openAPIDoc.JSON,
   494  				TypeMeta: &kates.TypeMeta{
   495  					Kind:       doc.openAPIDoc.Type,
   496  					APIVersion: doc.openAPIDoc.Version,
   497  				},
   498  				Metadata: &kates.ObjectMeta{
   499  					Name: doc.docMappingRef.Name,
   500  				},
   501  				TargetRef: doc.docMappingRef.Ref,
   502  			}
   503  			results = append(results, apiDoc)
   504  		}
   505  	}
   506  	return results
   507  }
   508  

View as plain text