...

Source file src/github.com/datawire/ambassador/v2/cmd/entrypoint/notify.go

Documentation: github.com/datawire/ambassador/v2/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"io/ioutil"
     8  	"net/http"
     9  	"net/url"
    10  	"syscall"
    11  	"time"
    12  
    13  	"github.com/datawire/ambassador/v2/pkg/acp"
    14  	"github.com/datawire/ambassador/v2/pkg/debug"
    15  
    16  	"github.com/datawire/dlib/dlog"
    17  )
    18  
    19  type notable interface {
    20  	NoteSnapshotSent()
    21  	NoteSnapshotProcessed()
    22  }
    23  
    24  type noopNotable struct{}
    25  
    26  func (_ *noopNotable) NoteSnapshotSent()      {}
    27  func (_ *noopNotable) NoteSnapshotProcessed() {}
    28  
    29  func notifyReconfigWebhooks(ctx context.Context, ambwatch notable) error {
    30  	isEdgeStack, err := IsEdgeStack()
    31  	if err != nil {
    32  		return err
    33  	}
    34  	return notifyReconfigWebhooksFunc(ctx, ambwatch, isEdgeStack)
    35  }
    36  
    37  func notifyReconfigWebhooksFunc(ctx context.Context, ambwatch notable, edgestack bool) error {
    38  	// XXX: last N snapshots?
    39  	snapshotUrl := url.QueryEscape("http://localhost:9696/snapshot")
    40  
    41  	needDiagdNotify := true
    42  	needSidecarNotify := true
    43  
    44  	// We're about to send a new snapshot to diagd. The webhook we're using for this
    45  	// won't return, by design, until the snapshot has been processed, so first note
    46  	// that we're sending the snapshot...
    47  	ambwatch.NoteSnapshotSent()
    48  
    49  	for {
    50  		// ...then send it and wait for the webhook to return...
    51  		finished, err := notifyWebhookUrl(ctx, "diagd", fmt.Sprintf("%s?url=%s", GetEventUrl(), snapshotUrl))
    52  		if err != nil {
    53  			return err
    54  		}
    55  		if finished {
    56  			needDiagdNotify = false
    57  			// ...then note that it's been processed. This DOES NOT imply that the processing
    58  			// was successful: it's just about whether or not diagd is making progress instead
    59  			// of getting stuck.
    60  			ambwatch.NoteSnapshotProcessed()
    61  		}
    62  
    63  		// Then go deal with the Edge Stack sidecar.
    64  		if edgestack {
    65  			finished, err := notifyWebhookUrl(ctx, "edgestack sidecar", fmt.Sprintf("%s?url=%s", GetSidecarUrl(), snapshotUrl))
    66  			if err != nil {
    67  				return err
    68  			}
    69  			if finished {
    70  				needSidecarNotify = false
    71  			}
    72  		} else {
    73  			needSidecarNotify = false
    74  		}
    75  
    76  		select {
    77  		case <-ctx.Done():
    78  			return nil
    79  		default:
    80  			// XXX: find a better way to wait for diagd and/or the sidecar to spin up
    81  			if needDiagdNotify || needSidecarNotify {
    82  				time.Sleep(1 * time.Second)
    83  			} else {
    84  				return nil
    85  			}
    86  		}
    87  	}
    88  }
    89  
    90  // posts to a webhook style url, logging any errors, and returning false if a retry is needed
    91  func notifyWebhookUrl(ctx context.Context, name, xurl string) (bool, error) {
    92  	defer debug.FromContext(ctx).Timer(fmt.Sprintf("notifyWebhook:%s", name)).Start()()
    93  
    94  	req, err := http.NewRequestWithContext(ctx, http.MethodPost, xurl, nil)
    95  	if err != nil {
    96  		return false, err
    97  	}
    98  
    99  	// As long as this notification is going to localhost, set the X-Ambassador-Diag-IP
   100  	// header for it. This is only used by diagd right now, but this is the easy way
   101  	// to deal with it.
   102  	parsedURL, err := url.Parse(xurl)
   103  	if err != nil {
   104  		// This is "impossible" in that it's a blatant programming error, not
   105  		// an error caused by the user. Panic.
   106  		panic(fmt.Errorf("BUG: bad URL passed to notifyWebhookUrl: '%s', %v", xurl, err))
   107  	}
   108  
   109  	// OK, the URL parsed clean (as it *!&@*#& well should have!) so we can find
   110  	// out if it's going to localhost. We'll do this the strict way, since these
   111  	// URLs should be hardcoded.
   112  
   113  	if acp.HostPortIsLocal(fmt.Sprintf("%s:%s", parsedURL.Hostname(), parsedURL.Port())) {
   114  		// If we're speaking to localhost, we're speaking from localhost. Hit it.
   115  		req.Header.Set("X-Ambassador-Diag-IP", "127.0.0.1")
   116  	}
   117  
   118  	req.Header.Set("content-type", "application/json")
   119  	resp, err := http.DefaultClient.Do(req)
   120  	if err != nil {
   121  		if errors.Is(err, syscall.ECONNREFUSED) {
   122  			// We couldn't succesfully connect to the sidecar, probably because it hasn't
   123  			// started up yet, so we log the error and return false to signal retry.
   124  			dlog.Error(ctx, err.Error())
   125  			return false, nil
   126  		} else {
   127  			// If either of the sidecars cannot successfully handle a webhook request, we
   128  			// deliberately consider it a fatal error so that we can ensure shared fate between all
   129  			// ambassador processes. The only known case where this occurs so far is when the diagd
   130  			// gunicorn worker gets OOMKilled. This results in an EOF and we end up here.
   131  			return false, err
   132  		}
   133  	}
   134  	defer resp.Body.Close()
   135  
   136  	if resp.StatusCode != 200 {
   137  		body, err := ioutil.ReadAll(resp.Body)
   138  		if err != nil {
   139  			dlog.Printf(ctx, "error reading body from %s: %v", name, err)
   140  		} else {
   141  			dlog.Printf(ctx, "error notifying %s: %s, %s", name, resp.Status, string(body))
   142  		}
   143  	}
   144  
   145  	// We assume the sidecars are idempotent. That means we don't want to retry even if we get
   146  	// back a non 200 response since we would get an error the next time also and just be stuck
   147  	// retrying forever.
   148  	return true, nil
   149  }
   150  

View as plain text