...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador/ir/iripallowdeny.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador/ir

     1from typing import TYPE_CHECKING, ClassVar, Dict, List, Optional, Tuple
     2
     3from ..config import Config
     4from .irfilter import IRFilter
     5from .irresource import IRResource
     6
     7if TYPE_CHECKING:
     8    from ..envoy.v3.v3cidrrange import CIDRRange
     9    from .ir import IR  # pragma: no cover
    10
    11
    12class IRIPAllowDeny(IRFilter):
    13    """
    14    IRIPAllowDeny is an IRFilter that implements an allow/deny list based
    15    on IP address.
    16    """
    17
    18    parent: IRResource
    19    action: str
    20    principals: List[Tuple[str, "CIDRRange"]]
    21
    22    EnvoyTypeMap: ClassVar[Dict[str, str]] = {"remote": "remote_ip", "peer": "direct_remote_ip"}
    23
    24    def __init__(
    25        self,
    26        ir: "IR",
    27        aconf: Config,
    28        rkey: str = "ir.ipallowdeny",
    29        name: str = "ir.ipallowdeny",
    30        kind: str = "IRIPAllowDeny",
    31        parent: IRResource = None,
    32        action: str = None,
    33        **kwargs,
    34    ) -> None:
    35        """
    36        Initialize an IRIPAllowDeny. In addition to the usual IRFilter parameters,
    37        parent and action are required:
    38
    39        parent is the IRResource in which the IRIPAllowDeny is defined; at present,
    40        this will be the Ambassador module. It's required because it's where errors
    41        should be posted.
    42
    43        action must be either "ALLOW" or "DENY". This action will be normalized to
    44        all-uppercase in setup().
    45        """
    46
    47        assert parent is not None
    48        assert action is not None
    49
    50        super().__init__(
    51            ir=ir,
    52            aconf=aconf,
    53            rkey=rkey,
    54            kind=kind,
    55            name=name,
    56            parent=parent,
    57            action=action,
    58            **kwargs,
    59        )
    60
    61    def setup(self, ir: "IR", aconf: Config) -> bool:
    62        """
    63        Set up an IRIPAllowDeny based on the action and principals passed into
    64        __init__.
    65        """
    66
    67        assert self.parent
    68
    69        # These pops will crash if the action or principals are missing. That's
    70        # OK -- they're required elements.
    71        action: Optional[str] = self.pop("action")
    72        principals: Optional[List[Dict[str, str]]] = self.pop("principals")
    73
    74        assert action is not None
    75        assert principals is not None
    76
    77        action = action.upper()
    78
    79        if (action != "ALLOW") and (action != "DENY"):
    80            raise RuntimeError(f"IRIPAllowDeny action must be ALLOW or DENY, not {action}")
    81
    82        self.action = action
    83        self.principals = []
    84
    85        ir.logger.debug(f"PRINCIPALS: {principals}")
    86
    87        # principals looks like
    88        #
    89        # [
    90        #    { 'peer': '127.0.0.1' },
    91        #    { 'remote': '192.68.0.0/24' },
    92        #    { 'remote': '::1' }
    93        # ]
    94        #
    95        # or the like, where the key in the dict specifies how Envoy will handle the
    96        # IP match, and the value is a CIDRRange spec.
    97
    98        from ..envoy.v3.v3cidrrange import CIDRRange
    99
   100        for pdict in principals:
   101            # If we have more than one thing in the dict, that's an error.
   102
   103            first = True
   104
   105            for kind, spec in pdict.items():
   106                if not first:
   107                    self.parent.post_error(
   108                        f"ip{self.action.lower()} principals must be separate list elements"
   109                    )
   110                    break
   111
   112                first = False
   113
   114                envoy_kind = IRIPAllowDeny.EnvoyTypeMap.get(kind, None)
   115
   116                if not envoy_kind:
   117                    self.parent.post_error(
   118                        f"ip{self.action.lower()} principal type {kind} unknown: must be peer or remote"
   119                    )
   120                    continue
   121
   122                cidrrange = CIDRRange(spec)
   123
   124                if cidrrange:
   125                    self.principals.append((envoy_kind, cidrrange))
   126                else:
   127                    self.parent.post_error(
   128                        f"ip_{self.action.lower()} principal {spec} is not valid: {cidrrange.error}"
   129                    )
   130
   131        if len(self.principals) > 0:
   132            return True
   133        else:
   134            return False
   135
   136    def __str__(self) -> str:
   137        pstrs = [str(x) for x in self.principals]
   138        return f"<IPAllowDeny {self.action}: {', '.join(pstrs)}>"
   139
   140    def as_dict(self) -> dict:
   141        return {
   142            "action": self.action,
   143            "principals": [{kind: block.as_dict()} for kind, block in self.principals],
   144        }

View as plain text