...

Text file src/github.com/datawire/ambassador/v2/demo/services/auth.py

Documentation: github.com/datawire/ambassador/v2/demo/services

     1#!/usr/bin/env python
     2
     3import sys
     4
     5import logging
     6import uuid
     7
     8from flask import Flask, Response, jsonify, request
     9
    10__version__ = '0.0.1'
    11
    12AdminUsers = {
    13    'admin': 'admin',
    14    '<<realm>>': 'Ambassador Diagnostics'
    15}
    16
    17DemoUsers = {
    18    'username': 'password',
    19    '<<realm>>': 'Ambassador'
    20}
    21
    22app = Flask(__name__)
    23
    24@app.before_request
    25def before():
    26    logging.debug("=>> %s" % request)
    27
    28    for header in sorted(request.headers.keys()):
    29        logging.debug("=>>   %s: %s" % (header, request.headers[header]))
    30
    31def check_auth(auth, validusers):
    32    if not auth:
    33        return False
    34
    35    if auth.username not in validusers:
    36        return False
    37
    38    if auth.password != validusers[auth.username]:
    39        return False
    40
    41    return True
    42
    43@app.route('/', defaults={'path': ''}, methods=['GET', 'PUT', 'POST', 'DELETE'])
    44@app.route('/<path:path>', methods=['GET', 'PUT', 'POST', 'DELETE'])
    45def catch_all(path):
    46    # Restore the leading '/' to our path.
    47    path = "/" + path
    48
    49    resp = Response('You want path: %s' % path)
    50
    51    validusers = None
    52    generate_session = False
    53
    54
    55    if not path.startswith("/auth/v0/"):
    56        logging.info("direct access attempted to %s" % path)
    57        resp = Response(
    58            "Direct access not supported",
    59            400,
    60            {
    61                'X-ExtAuth-Required': 'True',
    62            }
    63        )
    64
    65    # Drop the auth prefix, but leave a leading /.
    66    path = path[len("/auth/v0"):]
    67
    68    while path.startswith("//"):
    69        path = path[1:]
    70
    71    logging.debug("Requested path is %s" % path)
    72
    73    if path.startswith("/ambassador/"):
    74        # Require auth as an admin.
    75        validusers = AdminUsers
    76    elif path.startswith("/qotm/quote"):
    77        # Require auth as a user, and generate a session.
    78        validusers = DemoUsers
    79        generate_session = True
    80
    81    if validusers:
    82        auth = request.authorization
    83
    84        # We require Basic-Auth for this.
    85        if not check_auth(auth, validusers):
    86            resp = Response(
    87                "Authentication is required for %s" % path,
    88                401,
    89                {
    90                    'WWW-Authenticate': 'Basic realm="%s"' % validusers['<<realm>>'],
    91                    'X-ExtAuth-Required': 'True'
    92                }
    93            )
    94        elif generate_session:
    95            session = request.headers.get('x-qotm-session', None)
    96
    97            if not session:
    98                session = str(uuid.uuid4()).upper()
    99                logging.debug("Generated new QOTM session ID %s" % session)
   100
   101            resp = Response(
   102                "Authentication succeeded for %s" % path,
   103                200,
   104                {
   105                    'X-Authenticated-As': auth.username,
   106                    'X-ExtAuth-Required': 'True',
   107                    'X-QoTM-Session': session
   108                }
   109            )
   110    else:
   111        resp = Response(
   112            "Authentication not required for %s" % path,
   113            200,
   114            { 'X-ExtAuth-Required': 'False' }
   115        )
   116
   117    resp.headers['X-Test'] = 'Should not be seen.'
   118
   119    logging.debug("=<< %s" % resp)
   120
   121    for header in sorted(resp.headers.keys()):
   122        logging.debug("=<<   %s: %s" % (header, resp.headers[header]))
   123
   124    return resp
   125
   126if __name__ == "__main__":
   127    logging.basicConfig(
   128        # filename=logPath,
   129        level=logging.DEBUG, # if appDebug else logging.INFO,
   130        format="%%(asctime)s demo-auth %s %%(levelname)s: %%(message)s" % __version__,
   131        datefmt="%Y-%m-%d %H:%M:%S"
   132    )
   133
   134    logging.info("initializing")
   135    app.run(host='0.0.0.0', port=5050, debug=True)

View as plain text