...

Text file src/github.com/datawire/ambassador/v2/scripts/devloop-helpers/fake_configd.py

Documentation: github.com/datawire/ambassador/v2/scripts/devloop-helpers

     1from typing import Optional
     2
     3import sys
     4import yaml
     5
     6from flask import Flask, jsonify, request
     7
     8from ambassador.utils import parse_yaml, yaml_dumper
     9
    10
    11app = Flask(__name__)
    12
    13def merge_dicts(x, y):
    14    z = x.copy()
    15    z.update(y)
    16    return z
    17
    18@app.route('/api/snapshot/<generation_counter>/<kind>')
    19def services(generation_counter, kind):
    20    if kind in app.elements:
    21        return yaml.dump_all(app.elements[kind], Dumper=yaml_dumper), 200
    22    else:
    23        return "no such element", 404
    24
    25
    26def main(services_path: str, endpoint_path: Optional[str]):
    27    k8s_resources = parse_yaml(open(services_path, 'r').read())
    28    total_resources = len(k8s_resources)
    29
    30    services = [ obj for obj in k8s_resources if obj.get('kind', None) == 'Service' ]
    31
    32    app.elements = {
    33        'services': services
    34    }
    35
    36    if endpoint_path:
    37        k8s_resources = parse_yaml(open(endpoint_path, 'r').read())
    38        total_resources += len(k8s_resources)
    39
    40        app.elements['endpoints'] = [ obj for obj in k8s_resources if obj.get('kind', None) == 'Endpoints' ]
    41
    42    print("Total resources: %d" % total_resources)
    43    print("Services:        %d" % len(app.elements['services']))
    44    print("Endpoints:       %d" % len(app.elements.get('endpoints', [])))
    45
    46    app.run(host='0.0.0.0', port=9999, debug=True)
    47
    48
    49if __name__ == '__main__':
    50    services_path = sys.argv[1]
    51    endpoint_path = None
    52
    53    if len(sys.argv) > 2:
    54        endpoint_path = sys.argv[2]
    55
    56    main(services_path, endpoint_path)

View as plain text