...
1import sys
2
3# Copyright 2018 Datawire. All rights reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License
16
17import datetime
18import json
19import logging
20import os
21import socket
22
23__version__ = '0.0.14'
24
25logging.basicConfig(
26 level=logging.DEBUG if os.environ.get('STATSD_TEST_DEBUG') else logging.INFO,
27 format="%%(asctime)s stats-test %s %%(levelname)s: %%(message)s" % __version__,
28 datefmt="%Y-%m-%d %H:%M:%S"
29)
30
31UDP_IP = "0.0.0.0"
32UDP_PORT = 8125
33
34sock = socket.socket(socket.AF_INET, # Internet
35 socket.SOCK_DGRAM) # UDP
36sock.bind((UDP_IP, UDP_PORT))
37
38logging.info("Listening on %d" % UDP_PORT)
39
40interesting_clusters = {
41 cname for cname in os.environ.get('STATSD_TEST_CLUSTER').split(":")
42}
43
44logging.info(f"Interesting clusters: {interesting_clusters}")
45
46interesting = {}
47
48last_summary = datetime.datetime.now()
49
50def summary():
51 r = ""
52
53 for cluster_name in interesting_clusters:
54 cluster = interesting.get(cluster_name, {})
55 trq = cluster.get(f'upstream_rq_total', -1)
56 grq = cluster.get(f'upstream_rq_2xx', -1)
57 ttm = cluster.get(f'upstream_rq_time', -1)
58
59 if (trq > 0) and (trq > 0):
60 ttm = ", %.1f ms avg" % (ttm / trq)
61
62 r += f'{cluster_name}: {trq} req, {grq} good{ttm}\n'
63
64 return r
65
66while True:
67 now = datetime.datetime.now()
68
69 if (now - last_summary) > datetime.timedelta(seconds=30):
70 logging.info(f"30sec\n{summary()}")
71 last_summary = datetime.datetime.now()
72
73 data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
74
75 data = data.decode('utf-8').strip()
76 peer_ip, peer_port = addr
77
78 logging.debug(f"data: {data}")
79
80 if data == 'RESET':
81 logging.info('RESETTING')
82
83 interesting = {}
84
85 sock.sendto(bytes('RESET', 'utf-8'), addr)
86 elif data == 'DUMP':
87 logging.info('DUMP')
88
89 contents = json.dumps(interesting).encode("utf-8")
90 logging.info(f"SEND {contents}")
91
92 sock.sendto(contents, addr)
93 elif data == 'SUMMARY':
94
95 contents = summary().encode("utf-8")
96 logging.info('SUMMARY:\n{contents}')
97
98 sock.sendto(contents, addr)
99 else:
100 # Here's a sample 'normal' line:
101 # envoy.cluster.cluster_http___statsdtest_http.upstream_rq_200:310|c
102 # envoy.cluster.cluster_http___statsdtest_http.upstream_rq_2xx:310|c
103 # envoy.cluster.cluster_http___statsdtest_http.upstream_rq_time:3|ms
104 #
105 # and here's the dogstatsd equivalent:
106 # envoy.cluster.upstream_rq:363|c|#envoy.response_code:200,envoy.cluster_name:cluster_http___dogstatsdtest_http
107 # envoy.cluster.upstream_rq_xx:363|c|#envoy.response_code_class:2,envoy.cluster_name:cluster_http___dogstatsdtest_http
108 # envoy.cluster.upstream_rq_time:2|ms|#envoy.cluster_name:cluster_http___dogstatsdtest_http
109 #
110 # So first, it needs to start with 'envoy.cluster.'.
111
112 if not data.startswith('envoy.cluster.'):
113 # logging.info(f"SKIP: {data}")
114 continue
115
116 logging.info(f"CLUSTER: {data}")
117
118 # Strip the leading 'envoy.cluster.'...
119 data = data[len('envoy.cluster.'):]
120
121 # Next up, split fields out.
122 fields = data.split('|')
123
124 if (len(fields) < 2) or (len(fields) > 3):
125 logging.debug(f'bad fields {fields}')
126 continue
127
128 key_and_value = fields[0]
129 data_type = fields[1]
130 dog_elements = {}
131
132 if len(fields) > 2:
133 dog_stuff = fields[2]
134
135 if not dog_stuff.startswith('#'):
136 logging.debug(f'bad dog_stuff {dog_stuff}')
137 continue
138
139 dog_stuff = dog_stuff[1:]
140
141 for dog_element in dog_stuff.split(','):
142 dog_key, dog_value = dog_element.split(':', 1)
143 dog_elements[dog_key] = dog_value
144
145 key, value = key_and_value.split(':', 1)
146 cluster_name = None
147
148 if not dog_elements:
149 # No datadog stuff, so we should be able to grab the cluster name
150 # from the key.
151 cluster_name, key = key.split('.', 1)
152 else:
153 cluster_name = dog_elements.get('envoy.cluster_name')
154
155 if not cluster_name:
156 logging.debug('no cluster_name')
157 continue
158
159 # Is this an interesting cluster?
160 if cluster_name not in interesting_clusters:
161 logging.debug(f'{cluster_name} is uninteresting')
162 continue
163
164 # Finally, fix up the dogstatsd stat keys.
165 if dog_elements:
166 if key.endswith('_rq') and ('envoy.response_code' in dog_elements):
167 key = f'{key}_{dog_elements["envoy.response_code"]}'
168 elif key.endswith('_xx') and ('envoy.response_code_class' in dog_elements):
169 rclass = dog_elements["envoy.response_code_class"]
170 key = key.replace('_xx', f'_{rclass}xx')
171
172 # logging.info(f'{cluster_name}: {key} += {value} {data_type}')
173
174 cluster_stats = interesting.setdefault(cluster_name, {})
175
176 if key not in cluster_stats:
177 cluster_stats[key] = 0
178
179 cluster_stats[key] += int(value)
View as plain text