1import logging
2from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
3
4from ..config import Config
5from .irresource import IRResource
6
7if TYPE_CHECKING:
8 from .ir import IR # pragma: no cover
9
10
11class IRHealthChecks(IRResource):
12
13 # The list of mappers that will make up the final health checking config
14 _mappers: Optional[List[Dict[str, Union[str, int, Dict]]]]
15
16 # The IR config, used as input from a `health_checks` field on a Mapping
17 _ir_config: List[Dict[str, Union[str, int, Dict]]]
18
19 def __init__(
20 self,
21 ir: "IR",
22 aconf: Config,
23 health_checks_config: List[Dict[str, Union[str, int, Dict]]],
24 rkey: str = "ir.health_checks",
25 kind: str = "IRHealthChecks",
26 name: str = "health_checks",
27 **kwargs,
28 ) -> None:
29 self._ir_config = health_checks_config
30 self._mappers = None
31 super().__init__(ir=ir, aconf=aconf, rkey=rkey, kind=kind, name=name, **kwargs)
32
33 # Return the final config, or None if there isn't any, either because
34 # there was no input config, or none of the input config was valid.
35 def config(self) -> Optional[List[Dict[str, Union[str, int, Dict]]]]:
36 if not self._mappers:
37 return None
38 return self._mappers
39
40 def setup(self, ir: "IR", aconf: Config) -> bool:
41 self._setup(ir, aconf)
42 return True
43
44 def _setup(self, ir: "IR", aconf: Config) -> None:
45 # Dont post any errors if there is empty config
46 if not self._ir_config:
47 return
48
49 # Do nothing (and post no errors) if there's config, but it's empty.
50 if len(self._ir_config) == 0:
51 return
52
53 # If we have some configuration to deal with, try to load it, and post any errors
54 # that we find along the way. Internally, _load_config will skip any health checks
55 # that are invalid, preserving other rules. This prevents one bad health check from eliminating
56 # the others.
57 self._mappers = self._generate_mappers()
58 if self._mappers is not None:
59 ir.logger.debug("IRHealthChecks: loaded mappers %s" % repr(self._mappers))
60
61 def _generate_mappers(self) -> Optional[List[Dict[str, Union[str, int, Dict]]]]:
62 all_mappers: List[Dict[str, Union[str, int, Dict]]] = []
63
64 # Make sure each health check in the list has config for either a grpc health check or an http health check
65 for hc in self._ir_config:
66
67 health_check_config = hc["health_check"]
68
69 # This should never happen but is necessary for the linting type checks
70 if not isinstance(health_check_config, dict):
71 self.post_error(
72 f"IRHealthChecks: health_check: field must be an object, found {health_check_config}. Ignoring health-check {hc}",
73 log_level=logging.ERROR,
74 )
75 continue
76
77 grpc_health_check = health_check_config.get("grpc", None)
78 http_health_check = health_check_config.get("http", None)
79
80 timeout = hc.get("timeout", "3s") # default 3.0s timeout
81 interval = hc.get("interval", "5s") # default 5.0s Interval
82 healthy_threshold = hc.get("healthy_threshold", 1)
83 unhealthy_threshold = hc.get("unhealthy_threshold", 2)
84
85 mapper: Dict[str, Union[str, int, Dict]] = {
86 "timeout": timeout,
87 "interval": interval,
88 "healthy_threshold": healthy_threshold,
89 "unhealthy_threshold": unhealthy_threshold,
90 }
91
92 # Process a http health check
93 if http_health_check is not None:
94 if not isinstance(http_health_check, dict):
95 self.post_error(
96 f"IRHealthChecks: health_check.http: field must be an object, found {http_health_check}. Ignoring health-check {hc}",
97 log_level=logging.ERROR,
98 )
99 continue
100
101 path = http_health_check["path"]
102 http_mapper: Dict[str, Any] = {"path": path}
103
104 # Process header add/remove operations
105 request_headers_to_add = http_health_check.get("add_request_headers", None)
106 if request_headers_to_add is not None:
107 if isinstance(request_headers_to_add, list):
108 self.post_error(
109 f"IRHealthChecks: add_request_headers must be a dict of header:value pairs. Ignoring field for health-check: {hc}",
110 log_level=logging.ERROR,
111 )
112 addHeaders = self.generate_headers_to_add(request_headers_to_add)
113 if len(addHeaders) > 0:
114 http_mapper["request_headers_to_add"] = addHeaders
115 request_headers_to_remove = http_health_check.get("remove_request_headers", None)
116 if request_headers_to_remove is not None:
117 if not isinstance(request_headers_to_remove, list):
118 self.post_error(
119 f"IRHealthChecks: remove_request_headers must be a list. Ignoring field for health-check: {hc}",
120 log_level=logging.ERROR,
121 )
122 else:
123 http_mapper["request_headers_to_remove"] = request_headers_to_remove
124
125 host = http_health_check.get("hostname", None)
126 if host is not None:
127 http_mapper["host"] = host
128
129 # Process the expected statuses
130 expected_statuses = http_health_check.get("expected_statuses", None)
131 if expected_statuses is not None:
132 validStatuses = []
133 for statusRange in expected_statuses:
134 minCode = int(statusRange["min"])
135 maxCode = int(statusRange["max"])
136 # We add one to the end code because by default Envoy expects the start of the range to be
137 # inclusive, but the end of the range to be exclusive. Lets just make both inclusive for simplicity.
138 maxCode += 1
139 if minCode > maxCode:
140 self.post_error(
141 f"IRHealthChecks: expected_statuses: status range start value {minCode} cannot be higher than the end {maxCode} for range. Ignoring expected status for health-check {hc}",
142 log_level=logging.ERROR,
143 )
144 continue
145
146 newRange = {"start": minCode, "end": maxCode}
147 validStatuses.append(newRange)
148 if len(validStatuses) > 0:
149 http_mapper["expected_statuses"] = validStatuses
150 # Add the http health check to the config
151 mapper["http_health_check"] = http_mapper
152
153 # Process a gRPC health check
154 if grpc_health_check is not None:
155 if not isinstance(grpc_health_check, dict):
156 self.post_error(
157 f"IRHealthChecks: health_check.grpc: field must be an object, found {grpc_health_check}, Ignoring...",
158 log_level=logging.ERROR,
159 )
160 continue
161
162 upstream_name = grpc_health_check["upstream_name"]
163 grpc_mapper: Dict[str, str] = {"service_name": upstream_name}
164
165 authority = grpc_health_check.get("authority", None)
166 if authority is not None:
167 grpc_mapper["authority"] = authority
168
169 # Add the gRPC health check to the config
170 mapper["grpc_health_check"] = grpc_mapper
171 all_mappers.append(mapper)
172
173 # If nothing could be parsed successfully, post an error.
174 if len(all_mappers) == 0:
175 self.post_error(
176 f"IRHealthChecks: no valid health check could be parsed for config: {self._ir_config}",
177 log_level=logging.ERROR,
178 )
179 return None
180 return all_mappers
181
182 @staticmethod
183 def generate_headers_to_add(header_dict: dict) -> List[dict]:
184 headers = []
185 for k, v in header_dict.items():
186 append = True
187 if isinstance(v, dict):
188 if "append" in v:
189 append = bool(v["append"])
190 headers.append({"header": {"key": k, "value": v["value"]}, "append": append})
191 else:
192 headers.append(
193 {
194 "header": {"key": k, "value": v},
195 "append": append, # Default append True, for backward compatability
196 }
197 )
198 return headers
View as plain text