...
1import os
2import sys
3
4import requests
5
6from ambassador.utils import parse_bool
7
8
9def usage(program):
10 sys.stderr.write(f"Usage: {program} [--watt|--k8s|--fs] UPDATE_URL\n")
11 sys.stderr.write(
12 "Notify `diagd` (and `amb-sidecar`, if AES) that a new WATT snapshot is available at UPDATE_URL.\n"
13 )
14 sys.exit(1)
15
16
17base_host = os.environ.get("DEV_AMBASSADOR_EVENT_HOST", "http://localhost:8877")
18base_path = os.environ.get("DEV_AMBASSADOR_EVENT_PATH", "_internal/v0")
19
20sidecar_host = os.environ.get("DEV_AMBASSADOR_SIDECAR_HOST", "http://localhost:8500")
21sidecar_path = os.environ.get("DEV_AMBASSADOR_SIDECAR_PATH", "_internal/v0")
22
23url_type = "update"
24arg_key = "url"
25
26program = os.path.basename(sys.argv[0])
27args = sys.argv[1:]
28
29while args and args[0].startswith("--"):
30 arg = args.pop(0)
31
32 if arg == "--k8s":
33 # Already set up.
34 pass
35 elif arg == "--watt":
36 url_type = "watt"
37 elif arg == "--fs":
38 url_type = "fs"
39 arg_key = "path"
40 else:
41 usage(program)
42
43if len(args) != 1:
44 usage(program)
45
46urls = [f"{base_host}/{base_path}/{url_type}"]
47
48if parse_bool(os.environ.get("EDGE_STACK", "false")) or os.path.exists("/ambassador/.edge_stack"):
49 urls.append(f"{sidecar_host}/{sidecar_path}/{url_type}")
50
51exitcode = 0
52
53for url in urls:
54 r = requests.post(url, params={arg_key: args[0]})
55
56 if r.status_code != 200:
57 sys.stderr.write("failed to update %s: %d: %s" % (r.url, r.status_code, r.text))
58 exitcode = 1
59
60sys.exit(exitcode)
View as plain text