1import copy
2import logging
3from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
4
5from ..config import Config
6from ..resource import Resource
7from ..utils import RichStatus
8
9if TYPE_CHECKING:
10 from .ir import IR # pragma: no cover
11
12
13class IRResource(Resource):
14 """
15 A resource within the IR.
16 """
17
18 @staticmethod
19 def helper_sort_keys(res: "IRResource", k: str) -> Tuple[str, List[str]]:
20 return k, list(sorted(res[k].keys()))
21
22 @staticmethod
23 def helper_rkey(res: "IRResource", k: str) -> Tuple[str, str]:
24 return "_rkey", res[k]
25
26 @staticmethod
27 def helper_list(res: "IRResource", k: str) -> Tuple[str, list]:
28 return k, list([x.as_dict() for x in res[k]])
29
30 __as_dict_helpers: Dict[str, Any] = {"apiVersion": "drop", "logger": "drop", "ir": "drop"}
31
32 _active: bool
33 _errored: bool
34 _cache_key: Optional[str]
35
36 def __init__(
37 self,
38 ir: "IR",
39 aconf: Config,
40 rkey: str,
41 kind: str,
42 name: str,
43 namespace: Optional[str] = None,
44 metadata_labels: Optional[Dict[str, str]] = None,
45 location: str = "--internal--",
46 apiVersion: str = "ambassador/ir",
47 **kwargs
48 ) -> None:
49 # print("IRResource __init__ (%s %s)" % (kind, name))
50
51 if not namespace:
52 namespace = ir.ambassador_namespace
53 self.namespace = namespace
54
55 super().__init__(
56 rkey=rkey,
57 location=location,
58 kind=kind,
59 name=name,
60 namespace=namespace,
61 metadata_labels=metadata_labels,
62 apiVersion=apiVersion,
63 **kwargs
64 )
65 self.ir = ir
66 self.logger = ir.logger
67
68 self._errored = False
69
70 self.__as_dict_helpers = IRResource.__as_dict_helpers
71 self.add_dict_helper("_errors", IRResource.helper_list)
72 self.add_dict_helper("_referenced_by", IRResource.helper_sort_keys)
73 self.add_dict_helper("rkey", IRResource.helper_rkey)
74
75 # Make certain that _active has a default...
76 self.set_active(False)
77
78 # ...and start with an empty cache key...
79 self._cache_key = None
80
81 # ...before we override it with the setup results.
82 self.set_active(self.setup(ir, aconf))
83
84 # XXX WTFO, I hear you cry. Why is this "type: ignore here?" So here's the deal:
85 # mypy doesn't like it if you override just the getter of a property that has a
86 # setter, too, and I cannot figure out how else to shut it up.
87 @property # type: ignore
88 def cache_key(self) -> str:
89 # If you ask for the cache key and it's not set, that is an error.
90 assert self._cache_key is not None
91 return self._cache_key
92
93 def lookup_default(
94 self, key: str, default_value: Optional[Any] = None, lookup_class: Optional[str] = None
95 ) -> Any:
96 """
97 Look up a key in the Ambassador module's "defaults" element.
98
99 The "lookup class" is
100 - the lookup_class parameter if one was passed, else
101 - self.default_class if that's set, else
102 - None.
103
104 We can look in two places for key -- the first match wins:
105
106 1. defaults[lookup class][key] if the lookup key is neither None nor "/"
107 2. defaults[key]
108
109 (A lookup class of "/" skips step 1.)
110
111 If we don't find the key in either place, return the given default_value.
112 If we _do_ find the key, _return a copy of the data!_ If we return the data itself
113 and the caller later modifies it... that's a problem.
114
115 :param key: the key to look up
116 :param default_value: the value to return if nothing is found in defaults.
117 :param lookup_class: the lookup class, see above
118 :return: Any
119 """
120
121 defaults = self.ir.ambassador_module.get("defaults", {})
122
123 lclass = lookup_class
124
125 if not lclass:
126 lclass = self.get("default_class", None)
127
128 if lclass and (lclass != "/"):
129 # Case 1.
130 classdict = defaults.get(lclass, None)
131
132 if classdict and (key in classdict):
133 return copy.deepcopy(classdict[key])
134
135 # We didn't find anything in case 1. Try case 2.
136 if defaults and (key in defaults):
137 return copy.deepcopy(defaults[key])
138
139 # We didn't find anything in either case. Return the default value.
140 return default_value
141
142 def lookup(
143 self,
144 key: str,
145 *args,
146 default_class: Optional[str] = None,
147 default_key: Optional[str] = None
148 ) -> Any:
149 """
150 Look up a key in this IRResource, with a fallback to the Ambassador module's "defaults"
151 element.
152
153 Here's the resolution order:
154
155 - if key is present in self, use its value.
156 - if not, use lookup_default above to try to find a value in the Ambassador module
157 - if we don't find anything, but a default value was passed in as *args[0], return that.
158 - if all else fails, return None.
159
160 :param key: the key to look up
161 :param default_class: the default class for the fallback lookup (optional, see above)
162 :param default_key: the key for the fallback lookup (optional, defaults to key)
163 :param args: an all-else-fails default value can go here, see above
164 :return: Any
165 """
166
167 value = self.get(key, None)
168
169 default_value = None
170
171 if len(args) > 0:
172 default_value = args[0]
173
174 if value is None:
175 if not default_key:
176 default_key = key
177
178 value = self.lookup_default(
179 default_key, default_value=default_value, lookup_class=default_class
180 )
181
182 return value
183
184 def add_dict_helper(self, key: str, helper) -> None:
185 self.__as_dict_helpers[key] = helper
186
187 def set_active(self, active: bool) -> None:
188 self._active = active
189
190 def is_active(self) -> bool:
191 return self._active
192
193 def __bool__(self) -> bool:
194 return self._active and not self._errored
195
196 def setup(self, ir: "IR", aconf: Config) -> bool:
197 # If you don't override setup, you end up with an IRResource that's always active.
198 return True
199
200 def add_mappings(self, ir: "IR", aconf: Config) -> None:
201 # If you don't override add_mappings, uh, no mappings will get added.
202 pass
203
204 def post_error(self, error: Union[str, RichStatus], log_level=logging.INFO):
205 self._errored = True
206
207 if not self.ir:
208 raise Exception("post_error cannot be called before __init__")
209
210 self.ir.post_error(error, resource=self, log_level=log_level)
211
212 def skip_key(self, k: str) -> bool:
213 if k.startswith("__") or k.startswith("_IRResource__"):
214 return True
215
216 if self.__as_dict_helpers.get(k, None) == "drop":
217 return True
218
219 return False
220
221 def as_dict(self) -> Dict:
222 od: Dict[str, Any] = {}
223
224 for k in self.keys():
225 if self.skip_key(k):
226 continue
227
228 helper = self.__as_dict_helpers.get(k, None)
229
230 if helper:
231 new_k, v = helper(self, k)
232
233 if new_k and v:
234 od[new_k] = v
235 elif isinstance(self[k], IRResource):
236 od[k] = self[k].as_dict()
237 elif self[k] is not None:
238 od[k] = self[k]
239
240 return od
241
242 @staticmethod
243 def normalize_service(service: str) -> str:
244 normalized_service = service
245
246 if service.lower().startswith("http://"):
247 normalized_service = service[len("http://") :]
248 elif service.lower().startswith("https://"):
249 normalized_service = service[len("https://") :]
250
251 return normalized_service
View as plain text