...
1from typing import TYPE_CHECKING, ClassVar, Dict, List, Optional, Tuple
2
3from ..config import Config
4from .irfilter import IRFilter
5from .irresource import IRResource
6
7if TYPE_CHECKING:
8 from ..envoy.v3.v3cidrrange import CIDRRange
9 from .ir import IR # pragma: no cover
10
11
12class IRIPAllowDeny(IRFilter):
13 """
14 IRIPAllowDeny is an IRFilter that implements an allow/deny list based
15 on IP address.
16 """
17
18 parent: IRResource
19 action: str
20 principals: List[Tuple[str, "CIDRRange"]]
21
22 EnvoyTypeMap: ClassVar[Dict[str, str]] = {"remote": "remote_ip", "peer": "direct_remote_ip"}
23
24 def __init__(
25 self,
26 ir: "IR",
27 aconf: Config,
28 rkey: str = "ir.ipallowdeny",
29 name: str = "ir.ipallowdeny",
30 kind: str = "IRIPAllowDeny",
31 parent: IRResource = None,
32 action: str = None,
33 **kwargs,
34 ) -> None:
35 """
36 Initialize an IRIPAllowDeny. In addition to the usual IRFilter parameters,
37 parent and action are required:
38
39 parent is the IRResource in which the IRIPAllowDeny is defined; at present,
40 this will be the Ambassador module. It's required because it's where errors
41 should be posted.
42
43 action must be either "ALLOW" or "DENY". This action will be normalized to
44 all-uppercase in setup().
45 """
46
47 assert parent is not None
48 assert action is not None
49
50 super().__init__(
51 ir=ir,
52 aconf=aconf,
53 rkey=rkey,
54 kind=kind,
55 name=name,
56 parent=parent,
57 action=action,
58 **kwargs,
59 )
60
61 def setup(self, ir: "IR", aconf: Config) -> bool:
62 """
63 Set up an IRIPAllowDeny based on the action and principals passed into
64 __init__.
65 """
66
67 assert self.parent
68
69 # These pops will crash if the action or principals are missing. That's
70 # OK -- they're required elements.
71 action: Optional[str] = self.pop("action")
72 principals: Optional[List[Dict[str, str]]] = self.pop("principals")
73
74 assert action is not None
75 assert principals is not None
76
77 action = action.upper()
78
79 if (action != "ALLOW") and (action != "DENY"):
80 raise RuntimeError(f"IRIPAllowDeny action must be ALLOW or DENY, not {action}")
81
82 self.action = action
83 self.principals = []
84
85 ir.logger.debug(f"PRINCIPALS: {principals}")
86
87 # principals looks like
88 #
89 # [
90 # { 'peer': '127.0.0.1' },
91 # { 'remote': '192.68.0.0/24' },
92 # { 'remote': '::1' }
93 # ]
94 #
95 # or the like, where the key in the dict specifies how Envoy will handle the
96 # IP match, and the value is a CIDRRange spec.
97
98 from ..envoy.v3.v3cidrrange import CIDRRange
99
100 for pdict in principals:
101 # If we have more than one thing in the dict, that's an error.
102
103 first = True
104
105 for kind, spec in pdict.items():
106 if not first:
107 self.parent.post_error(
108 f"ip{self.action.lower()} principals must be separate list elements"
109 )
110 break
111
112 first = False
113
114 envoy_kind = IRIPAllowDeny.EnvoyTypeMap.get(kind, None)
115
116 if not envoy_kind:
117 self.parent.post_error(
118 f"ip{self.action.lower()} principal type {kind} unknown: must be peer or remote"
119 )
120 continue
121
122 cidrrange = CIDRRange(spec)
123
124 if cidrrange:
125 self.principals.append((envoy_kind, cidrrange))
126 else:
127 self.parent.post_error(
128 f"ip_{self.action.lower()} principal {spec} is not valid: {cidrrange.error}"
129 )
130
131 if len(self.principals) > 0:
132 return True
133 else:
134 return False
135
136 def __str__(self) -> str:
137 pstrs = [str(x) for x in self.principals]
138 return f"<IPAllowDeny {self.action}: {', '.join(pstrs)}>"
139
140 def as_dict(self) -> dict:
141 return {
142 "action": self.action,
143 "principals": [{kind: block.as_dict()} for kind, block in self.principals],
144 }
View as plain text