diff --git a/pystatsd/backends/__init__.py b/pystatsd/backends/__init__.py new file mode 100644 index 0000000..b74f83e --- /dev/null +++ b/pystatsd/backends/__init__.py @@ -0,0 +1,16 @@ +from .ganglia import Ganglia +from .gmetric import Gmetric +from .graphite import Graphite +from .console import Console + +def create_instance(transport, options): + if transport == 'graphite': + return Graphite(options) + elif transport == 'ganglia': + return Ganglia(options) + elif transport == 'ganglia-gmetric': + return Gmetric(options) + elif transport == 'console': + return Console(options) + else: + return None diff --git a/pystatsd/backends/console.py b/pystatsd/backends/console.py new file mode 100644 index 0000000..c22201e --- /dev/null +++ b/pystatsd/backends/console.py @@ -0,0 +1,22 @@ +import logging + +log = logging.getLogger(__name__) + +class Console(object): + def __init__(self, options={}): + print("Console started") + + def init(self, cfg): + self.debug = cfg.get('debug') + self.flush_interval = cfg.get('flush_interval') + + def flush(self, timestamp, metrics): + for k, v in metrics['counters'].items(): + print("%s => count=%s" % (k, v)) + + for k, v in metrics['gauges'].items(): + print("%s => value=%s" % (k, v)) + + for k, v in metrics['timers'].items(): + print("%s => lower=%s, mean=%s, upper=%s, %dpct=%s, count=%s" + % (k, v['min'], v['mean'], v['max'], v['pct_threshold'], v['max_threshold'], v['count'])) diff --git a/pystatsd/backends/ganglia.py b/pystatsd/backends/ganglia.py new file mode 100644 index 0000000..2aef52a --- /dev/null +++ b/pystatsd/backends/ganglia.py @@ -0,0 +1,46 @@ +import logging + +from .. import gmetric + +log = logging.getLogger(__name__) + +class Ganglia(object): + def __init__(self, options): + self.host = options.get('ganglia_host', 'localhost') + self.port = options.get('ganglia_port', 8649) + self.protocol = options.get('ganglia_protocol', 'udp') + self.spoof_host = options.get('ganglia_spoof_host', 'statsd:statsd') + + def init(self, options): + self.debug = options.get('debug') + self.flush_interval = options.get('flush_interval') + self.dmax = int(self.flush_interval * 1.2) + + def flush(self, timestamp, metrics): + g = gmetric.Gmetric(self.host, self.port, self.protocol) + + for k, v in metrics['counters'].items(): + # We put counters in _counters group. Underscore is to make sure counters show up + # first in the GUI. Change below if you disagree + g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.spoof_host) + + for k, v in metrics['gauges'].items(): + g.send(k, v, "double", "count", "both", 60, self.dmax, "_gauges", self.spoof_host) + + for k, v in metrics['timers'].items(): + # We are gonna convert all times into seconds, then let rrdtool + # add proper SI unit. This avoids things like 3521 k ms which + # is 3.521 seconds. What group should these metrics be in. For the + # time being we'll set it to the name of the key + group = k + g.send(k + "_min", v['min'] / 1000, "double", "seconds", "both", 60, + self.dmax, group, self.spoof_host) + g.send(k + "_mean", v['mean'] / 1000, "double", "seconds", "both", 60, + self.dmax, group, self.spoof_host) + g.send(k + "_max", v['max'] / 1000, "double", "seconds", "both", 60, + self.dmax, group, self.spoof_host) + g.send(k + "_count", v['count'], "double", "count", "both", 60, self.dmax, + group, self.spoof_host) + g.send(k + "_" + str(v['pct_threshold']) + "pct", v['max_threshold'] / 1000, + "double", "seconds", "both", 60, self.dmax, group, + self.spoof_host) diff --git a/pystatsd/backends/gmetric.py b/pystatsd/backends/gmetric.py new file mode 100644 index 0000000..3832833 --- /dev/null +++ b/pystatsd/backends/gmetric.py @@ -0,0 +1,35 @@ +import logging + +from subprocess import call +from .ganglia import Ganglia + +log = logging.getLogger(__name__) + +class Gmetric(object): + def __init__(self, options): + self.gmetric_exec = options.get('gmetric_exec', '/usr/bin/gmetric') + self.gmetric_options = options.get('gmetric_options', '-d') + + def init(self, options): + self.debug = options.get('debug') + self.flush_interval = options.get('flush_interval') + + def flush(self, timestamp, metrics): + for k, v in metrics['counters'].items(): + self.send(k, v, "_counters", "count") + + for k, v in metrics['gauges'].items(): + self.send(k, v, "_gauges", "gauge") + + for k, v in metrics['timers'].items(): + # We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like + # 3521 k ms which is 3.521 seconds + group = k + self.send(k + "_mean", v['mean'] / 1000, group, "seconds") + self.send(k + "_min", v['min'] / 1000 , group, "seconds") + self.send(k + "_max", v['max'] / 1000, group, "seconds") + self.send(k + "_count", v['count'] , group, "count") + self.send(k + "_" + str(v['pct_threshold']) + "pct", v['max_threshold'] / 1000, group, "seconds") + + def send(self, k, v, group, units): + call([self.gmetric_exec, self.gmetric_options, "-u", units, "-g", group, "-t", "double", "-n", k, "-v", str(v) ]) diff --git a/pystatsd/backends/graphite.py b/pystatsd/backends/graphite.py new file mode 100644 index 0000000..7d168cd --- /dev/null +++ b/pystatsd/backends/graphite.py @@ -0,0 +1,64 @@ +import socket +import logging + + +TIMER_MSG = '''%(prefix)s.%(key)s.lower %(min)s %(ts)s +%(prefix)s.%(key)s.count %(count)s %(ts)s +%(prefix)s.%(key)s.mean %(mean)s %(ts)s +%(prefix)s.%(key)s.upper %(max)s %(ts)s +%(prefix)s.%(key)s.upper_%(pct_threshold)s %(max_threshold)s %(ts)s +''' + +log = logging.getLogger(__name__) + +class Graphite(object): + def __init__(self, options={}): + self.host = options.get('graphite_host', 'localhost') + self.port = options.get('graphite_port', 2003) + self.counters_prefix = options.get('counters_prefix', 'stats') + self.timers_prefix = options.get('timers_prefix', 'stats.timers') + self.global_prefix = options.get('global_prefix', None) + + def init(self, cfg): + self.debug = cfg.get('debug') + self.flush_interval = cfg.get('flush_interval') + + def flush(self, timestamp, metrics): + stat_string = '' + stats = 0 + + for k, v in metrics['counters'].items(): + msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, timestamp) + stat_string += msg + stats += 1 + + for k, v in metrics['gauges'].items(): + msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, timestamp) + stat_string += msg + stats += 1 + + for k, v in metrics['timers'].items(): + v.update({'prefix': self.timers_prefix, 'key': k, 'ts': timestamp}) + stat_string += TIMER_MSG % v + stats += 1 + + stat_string += "statsd.numStats %s %d\n" % (stats, timestamp) + self._send_metrics(stat_string) + + def _send_metrics(self, stat_string): + # Prepend stats with Hosted Graphite API key if necessary + if self.global_prefix: + stat_string = '\n'.join([ + '%s.%s' % (self.global_prefix, s) for s in stat_string.split('\n')[:-1] + ]) + + graphite = socket.socket() + + try: + graphite.connect((self.host, self.port)) + graphite.sendall(bytes(bytearray(stat_string, "utf-8"))) + graphite.close() + except socket.error as e: + log.error("Error communicating with Graphite: %s" % e) + if self.debug: + print("Error communicating with Graphite: %s" % e) diff --git a/pystatsd/server.py b/pystatsd/server.py index 40118c8..70e10f3 100644 --- a/pystatsd/server.py +++ b/pystatsd/server.py @@ -4,10 +4,10 @@ import time import types import logging -from . import gmetric + from subprocess import call from warnings import warn -# from xdrlib import Packer, Unpacker +from .backends import create_instance log = logging.getLogger(__name__) @@ -32,65 +32,40 @@ def _clean_key(k): k.replace('/', '-').replace(' ', '_') ) ) - - - -TIMER_MSG = '''%(prefix)s.%(key)s.lower %(min)s %(ts)s -%(prefix)s.%(key)s.count %(count)s %(ts)s -%(prefix)s.%(key)s.mean %(mean)s %(ts)s -%(prefix)s.%(key)s.upper %(max)s %(ts)s -%(prefix)s.%(key)s.upper_%(pct_threshold)s %(max_threshold)s %(ts)s -''' - + class Server(object): - def __init__(self, pct_threshold=90, debug=False, transport='graphite', - ganglia_host='localhost', ganglia_port=8649, - ganglia_spoof_host='statsd:statsd', - gmetric_exec='/usr/bin/gmetric', gmetric_options = '-d', - graphite_host='localhost', graphite_port=2003, global_prefix=None, - flush_interval=10000, - no_aggregate_counters=False, counters_prefix='stats', - timers_prefix='stats.timers', expire=0): + def __init__(self, pct_threshold=90, debug=False, flush_interval=10000, + expire=0, no_aggregate_counters=False, deleteGauges=False, + backends=[]): self.buf = 8192 self.flush_interval = flush_interval self.pct_threshold = pct_threshold - self.transport = transport - # Embedded Ganglia library options specific settings - self.ganglia_host = ganglia_host - self.ganglia_port = ganglia_port - self.ganglia_protocol = "udp" - # Use gmetric - self.gmetric_exec = gmetric_exec - self.gmetric_options = gmetric_options - # Set DMAX to flush interval plus 20%. That should avoid metrics to prematurely expire if there is - # some type of a delay when flushing - self.dmax = int(self.flush_interval * 1.2) - # What hostname should these metrics be attached to. - self.ganglia_spoof_host = ganglia_spoof_host - - # Graphite specific settings - self.graphite_host = graphite_host - self.graphite_port = graphite_port + self.no_aggregate_counters = no_aggregate_counters - self.counters_prefix = counters_prefix - self.timers_prefix = timers_prefix self.debug = debug self.expire = expire - # For services like Hosted Graphite, etc. - self.global_prefix = global_prefix - + self.backends = backends + self.deleteGauges = deleteGauges + + options = { + 'debug': debug, + 'flush_interval': flush_interval, + 'pct_threshold': pct_threshold, + 'expire': expire + } + + # initialize each backend + for backend in backends: + backend.init(options) + self.counters = {} self.timers = {} self.gauges = {} self.flusher = 0 - def send_to_ganglia_using_gmetric(self,k,v,group, units): - call([self.gmetric_exec, self.gmetric_options, "-u", units, "-g", group, "-t", "double", "-n", k, "-v", str(v) ]) - - def process(self, data): # the data is a sequence of newline-delimited metrics # a metric is in the form "name:value|rest" (rest may have more pipes) @@ -107,7 +82,7 @@ def process(self, data): value = match.group(2) rest = match.group(3).split('|') mtype = rest.pop(0) - + if (mtype == 'ms'): self.__record_timer(key, value, rest) elif (mtype == 'g' ): self.__record_gauge(key, value, rest) elif (mtype == 'c' ): self.__record_counter(key, value, rest) @@ -150,13 +125,11 @@ def on_timer(self): def flush(self): ts = int(time.time()) stats = 0 - - if self.transport == 'graphite': - stat_string = '' - elif self.transport == 'ganglia': - g = gmetric.Gmetric(self.ganglia_host, self.ganglia_port, self.ganglia_protocol) - - for k, (v, t) in self.counters.items(): + metrics = {'counters': {}, 'gauges': {}, 'timers': {}} + + #for k, (v, t) in self.counters.items(): + for k in list(self.counters.keys()): + v, t = self.counters[k] if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring counter %s (age: %s)" % (k, ts -t)) @@ -164,47 +137,28 @@ def flush(self): continue v = float(v) v = v if self.no_aggregate_counters else v / (self.flush_interval / 1000) - - if self.debug: - print("Sending %s => count=%s" % (k, v)) - - if self.transport == 'graphite': - msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, ts) - stat_string += msg - elif self.transport == 'ganglia': - # We put counters in _counters group. Underscore is to make sure counters show up - # first in the GUI. Change below if you disagree - g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': - self.send_to_ganglia_using_gmetric(k,v, "_counters", "count") + metrics['counters'][k] = v # Clear the counter once the data is sent del(self.counters[k]) stats += 1 - - for k, (v, t) in self.gauges.items(): + + #for k, (v, t) in self.gauges.items(): + for k in list(self.gauges.keys()): + v, t = self.gauges[k] if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring gauge %s (age: %s)" % (k, ts - t)) del(self.gauges[k]) continue - v = float(v) - - if self.debug: - print("Sending %s => value=%s" % (k, v)) - - if self.transport == 'graphite': - # note: counters and gauges implicitly end up in the same namespace - msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, ts) - stat_string += msg - elif self.transport == 'ganglia': - g.send(k, v, "double", "count", "both", 60, self.dmax, "_gauges", self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': - self.send_to_ganglia_using_gmetric(k,v, "_gauges", "gauge") - + metrics['gauges'][k] = float(v) + if self.deleteGauges: + del(self.gauges[k]) stats += 1 - - for k, (v, t) in self.timers.items(): + + #for k, (v, t) in self.timers.items(): + for k in list(self.timers.keys()): + v, t = self.timers[k] if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring timer %s (age: %s)" % (k, ts - t)) @@ -226,68 +180,16 @@ def flush(self): total = sum(v) mean = total / count + metrics['timers'][k] = { + 'mean': mean, 'max': max, 'min': min, 'count': count, + 'max_threshold': max_threshold, 'pct_threshold': self.pct_threshold + } del(self.timers[k]) - - if self.debug: - print("Sending %s ====> lower=%s, mean=%s, upper=%s, %dpct=%s, count=%s" \ - % (k, min, mean, max, self.pct_threshold, max_threshold, count)) - - if self.transport == 'graphite': - - stat_string += TIMER_MSG % { - 'prefix': self.timers_prefix, - 'key': k, - 'mean': mean, - 'max': max, - 'min': min, - 'count': count, - 'max_threshold': max_threshold, - 'pct_threshold': self.pct_threshold, - 'ts': ts, - } - - elif self.transport == 'ganglia': - # We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like - # 3521 k ms which is 3.521 seconds - # What group should these metrics be in. For the time being we'll set it to the name of the key - group = k - g.send(k + "_min", min / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) - g.send(k + "_mean", mean / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) - g.send(k + "_max", max / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) - g.send(k + "_count", count, "double", "count", "both", 60, self.dmax, group, self.ganglia_spoof_host) - g.send(k + "_" + str(self.pct_threshold) + "pct", max_threshold / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': - # We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like - # 3521 k ms which is 3.521 seconds - group = k - self.send_to_ganglia_using_gmetric(k + "_mean", mean / 1000, group, "seconds") - self.send_to_ganglia_using_gmetric(k + "_min", min / 1000 , group, "seconds") - self.send_to_ganglia_using_gmetric(k + "_max", max / 1000, group, "seconds") - self.send_to_ganglia_using_gmetric(k + "_count", count , group, "count") - self.send_to_ganglia_using_gmetric(k + "_" + str(self.pct_threshold) + "pct", max_threshold / 1000, group, "seconds") - stats += 1 - - if self.transport == 'graphite': - - stat_string += "statsd.numStats %s %d\n" % (stats, ts) - - # Prepend stats with Hosted Graphite API key if necessary - if self.global_prefix: - stat_string = '\n'.join([ - '%s.%s' % (self.global_prefix, s) for s in stat_string.split('\n')[:-1] - ]) - - graphite = socket.socket() - try: - graphite.connect((self.graphite_host, self.graphite_port)) - graphite.sendall(bytes(bytearray(stat_string, "utf-8"))) - graphite.close() - except socket.error as e: - log.error("Error communicating with Graphite: %s" % e) - if self.debug: - print("Error communicating with Graphite: %s" % e) - + + for backend in self.backends: + backend.flush(ts, metrics) + if self.debug: print("\n================== Flush completed. Waiting until next flush. Sent out %d metrics =======" \ % (stats)) @@ -327,23 +229,14 @@ class ServerDaemon(Daemon): def run(self, options): if setproctitle: setproctitle('pystatsd') - server = Server(pct_threshold=options.pct, - debug=options.debug, - transport=options.transport, - graphite_host=options.graphite_host, - graphite_port=options.graphite_port, - global_prefix=options.global_prefix, - ganglia_host=options.ganglia_host, - ganglia_spoof_host=options.ganglia_spoof_host, - ganglia_port=options.ganglia_port, - gmetric_exec=options.gmetric_exec, - gmetric_options=options.gmetric_options, - flush_interval=options.flush_interval, - no_aggregate_counters=options.no_aggregate_counters, - counters_prefix=options.counters_prefix, - timers_prefix=options.timers_prefix, - expire=options.expire) - + + backend = create_instance(options.transport, vars(options)) + + + server = Server(options.pct, options.debug, options.flush_interval, + options.expire, options.no_aggregate_counters, options.delete_gauges, + backends=[backend]) + server.serve(options.name, options.port) @@ -354,28 +247,37 @@ def run_server(): parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='debug mode', default=False) parser.add_argument('-n', '--name', dest='name', help='hostname to run on ', default='') parser.add_argument('-p', '--port', dest='port', help='port to run on (default: 8125)', type=int, default=8125) - parser.add_argument('-r', '--transport', dest='transport', help='transport to use graphite, ganglia (uses embedded library) or ganglia-gmetric (uses gmetric)', type=str, default="graphite") + parser.add_argument('-r', '--transport', dest='transport', help='transport to use graphite, ganglia (uses embedded library), ganglia-gmetric (uses gmetric) or console', type=str, default="graphite") + + # Graphite parser.add_argument('--graphite-port', dest='graphite_port', help='port to connect to graphite on (default: 2003)', type=int, default=2003) parser.add_argument('--graphite-host', dest='graphite_host', help='host to connect to graphite on (default: localhost)', type=str, default='localhost') + parser.add_argument('--global-prefix', dest='global_prefix', help='prefix to append to all stats sent to graphite. Useful for hosted services (ex: Hosted Graphite) or stats namespacing (default: None)', type=str, default=None) + parser.add_argument('--counters-prefix', dest='counters_prefix', help='prefix to append before sending counter data to graphite (default: stats)', type=str, default='stats') + parser.add_argument('--timers-prefix', dest='timers_prefix', help='prefix to append before sending timing data to graphite (default: stats.timers)', type=str, default='stats.timers') + # Uses embedded Ganglia Library parser.add_argument('--ganglia-port', dest='ganglia_port', help='Unicast port to connect to ganglia on', type=int, default=8649) parser.add_argument('--ganglia-host', dest='ganglia_host', help='Unicast host to connect to ganglia on', type=str, default='localhost') parser.add_argument('--ganglia-spoof-host', dest='ganglia_spoof_host', help='host to report metrics as to ganglia', type=str, default='statsd:statsd') + # Use gmetric parser.add_argument('--ganglia-gmetric-exec', dest='gmetric_exec', help='Use gmetric executable. Defaults to /usr/bin/gmetric', type=str, default="/usr/bin/gmetric") parser.add_argument('--ganglia-gmetric-options', dest='gmetric_options', help='Options to pass to gmetric. Defaults to -d 60', type=str, default="-d 60") - # + + # Server options parser.add_argument('--flush-interval', dest='flush_interval', help='how often to send data to graphite in millis (default: 10000)', type=int, default=10000) parser.add_argument('--no-aggregate-counters', dest='no_aggregate_counters', help='should statsd report counters as absolute instead of count/sec', action='store_true') - parser.add_argument('--global-prefix', dest='global_prefix', help='prefix to append to all stats sent to graphite. Useful for hosted services (ex: Hosted Graphite) or stats namespacing (default: None)', type=str, default=None) - parser.add_argument('--counters-prefix', dest='counters_prefix', help='prefix to append before sending counter data to graphite (default: stats)', type=str, default='stats') - parser.add_argument('--timers-prefix', dest='timers_prefix', help='prefix to append before sending timing data to graphite (default: stats.timers)', type=str, default='stats.timers') parser.add_argument('-t', '--pct', dest='pct', help='stats pct threshold (default: 90)', type=int, default=90) + parser.add_argument('--delete-gauges', dest='delete_gauges', action='store_true', help='Delete gauges once they have been sent to the backends', default=False) + + # Process options parser.add_argument('-D', '--daemon', dest='daemonize', action='store_true', help='daemonize', default=False) parser.add_argument('--pidfile', dest='pidfile', action='store', help='pid file', default='/var/run/pystatsd.pid') parser.add_argument('--restart', dest='restart', action='store_true', help='restart a running daemon', default=False) parser.add_argument('--stop', dest='stop', action='store_true', help='stop a running daemon', default=False) parser.add_argument('--expire', dest='expire', help='time-to-live for old stats (in secs)', type=int, default=0) + options = parser.parse_args(sys.argv[1:]) log_level = logging.DEBUG if options.debug else logging.INFO diff --git a/statsd_test.py b/statsd_test.py index 12bb37c..d9355e7 100644 --- a/statsd_test.py +++ b/statsd_test.py @@ -1,6 +1,7 @@ #!/usr/bin/env python from pystatsd import Client, Server +from pystatsd.backends import Console sc = Client('localhost', 8125) @@ -9,5 +10,5 @@ sc.decrement('python_test.decr_int') sc.gauge('python_test.gauge', 42) -srvr = Server(debug=True) +srvr = Server(debug=True, flush_interval=2000, deleteGauges=True, backends=[Console()]) srvr.serve() diff --git a/tests/__init__.py b/tests/__init__.py index 84ed79a..9446d13 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,2 +1,5 @@ from .client import * from .server import * +from .ganglia_backend import * +from .gmetric_backend import * +from. graphite_backend import * diff --git a/tests/ganglia_backend.py b/tests/ganglia_backend.py new file mode 100644 index 0000000..845070a --- /dev/null +++ b/tests/ganglia_backend.py @@ -0,0 +1,71 @@ +import time +import unittest +import mock +import socket +import sys + +from pystatsd.backends import Ganglia + +class GangliaBackendTestCase(unittest.TestCase): + """ + Tests the basic operations of the Ganglia backend + """ + def setUp(self): + self.patchers = [] + + gmetric_patcher = mock.patch('pystatsd.backends.ganglia.gmetric.Gmetric') + self.mock_gmetric = gmetric_patcher.start() + self.patchers.append(gmetric_patcher) + + self.options = { + 'ganglia_host': 'localhost', + 'ganglia_port': 8649, + 'ganglia_protocol': 'udp', + 'ganglia_spoof_host': 'statsd:statsd' + } + + self.config = {'debug': True, 'flush_interval': 10000, 'expire': 0, 'pct_threshold': 90} + + self.ganglia = Ganglia(self.options) + self.ganglia.init(self.config) + + def test_ganglia_create(self): + self.assertEqual(self.ganglia.host, self.options['ganglia_host']) + self.assertEqual(self.ganglia.port, self.options['ganglia_port']) + self.assertEqual(self.ganglia.protocol, self.options['ganglia_protocol']) + self.assertEqual(self.ganglia.spoof_host, self.options['ganglia_spoof_host']) + self.assertEqual(self.ganglia.dmax, int(self.config['flush_interval']*1.2)) + + def test_ganglia_flush(self): + ts = int(time.time()) + metrics = { + 'timers': {'glork': { + 'count': 1, 'max_threshold': 320.0, 'max': 320.0, 'min': 320.0, + 'pct_threshold': 90, 'mean': 320.0} + }, + 'gauges': {'gaugor': 333.0}, + 'counters': {'gorets': 1.1} + } + + self.ganglia.flush(ts, metrics) + + self.mock_gmetric.assert_called_with( + self.options['ganglia_host'], self.options['ganglia_port'], self.options['ganglia_protocol']) + + send_fn = self.mock_gmetric.return_value.send + + send_fn.assert_any_call('gorets', 1.1, "double", "count", "both", 60, + self.ganglia.dmax, "_counters", self.ganglia.spoof_host) + send_fn.assert_any_call('gaugor', 333.0, "double", "count", "both", 60, + self.ganglia.dmax, "_gauges", self.ganglia.spoof_host) + + for m in ['min', 'max', 'mean', '90pct']: + send_fn.assert_any_call('glork_'+m, 0.32, 'double', 'seconds', 'both', + 60, self.ganglia.dmax, 'glork', self.ganglia.spoof_host) + + send_fn.assert_any_call('glork_count', 1, 'double', 'count', 'both', + 60, self.ganglia.dmax, 'glork', self.ganglia.spoof_host) + + def tearDown(self): + for patcher in self.patchers: + patcher.stop() diff --git a/tests/gmetric_backend.py b/tests/gmetric_backend.py new file mode 100644 index 0000000..3c04681 --- /dev/null +++ b/tests/gmetric_backend.py @@ -0,0 +1,61 @@ +import time +import unittest +import mock +import socket +import sys + +from pystatsd.backends import Gmetric + +class GmetricBackendTestCase(unittest.TestCase): + """ + Tests the basic operations of the Ganglia backend + """ + def setUp(self): + self.patchers = [] + + gmetric_patcher = mock.patch('pystatsd.backends.gmetric.call') + self.mock_gmetric = gmetric_patcher.start() + self.patchers.append(gmetric_patcher) + + self.options = { + 'gmetric_exec': '/usr/bin/gmetric', + 'gmetric_options': '-d' + } + + self.config = {'debug': True, 'flush_interval': 10000, 'expire': 0, 'pct_threshold': 90} + + self.gmetric = Gmetric(self.options) + self.gmetric.init(self.config) + + def test_ganglia_create(self): + self.assertEqual(self.gmetric.gmetric_exec, self.options['gmetric_exec']) + self.assertEqual(self.gmetric.gmetric_options, self.options['gmetric_options']) + + def test_ganglia_flush(self): + ts = int(time.time()) + metrics = { + 'timers': {'glork': { + 'count': 1, 'max_threshold': 320.0, 'max': 320.0, 'min': 320.0, + 'pct_threshold': 90, 'mean': 320.0} + }, + 'gauges': {'gaugor': 333.0}, + 'counters': {'gorets': 1.1} + } + + self.gmetric.flush(ts, metrics) + + self.mock_gmetric.assert_any_call(self._send_args('gorets', 1.1, "_counters", "count")) + self.mock_gmetric.assert_any_call(self._send_args('gaugor', 333.0, "_gauges", "gauge")) + + for m in ['min', 'max', 'mean', '90pct']: + self.mock_gmetric.assert_any_call(self._send_args('glork_'+m, 0.32, "glork", "seconds")) + + self.mock_gmetric.assert_any_call(self._send_args('glork_count', 1, 'glork', 'count')) + + def _send_args(self, k, v, group, units): + return [self.options['gmetric_exec'], self.options['gmetric_options'], + "-u", units, "-g", group, "-t", "double", "-n", k, "-v", str(v)] + + def tearDown(self): + for patcher in self.patchers: + patcher.stop() diff --git a/tests/graphite_backend.py b/tests/graphite_backend.py new file mode 100644 index 0000000..b2a16da --- /dev/null +++ b/tests/graphite_backend.py @@ -0,0 +1,91 @@ +import time +import unittest +import mock +import socket +import sys + +from pystatsd.backends import Graphite + +class GraphiteBackendTestCase(unittest.TestCase): + """ + Tests the basic operations of the Graphite backend + """ + def setUp(self): + self.patchers = [] + + socket_patcher = mock.patch('pystatsd.backends.graphite.socket.socket') + self.mock_socket = socket_patcher.start() + self.patchers.append(socket_patcher) + + self.options = { + 'graphite_host': 'localhost', + 'graphite_port': 2003, + 'counters_prefix': 'stats', + 'timers_prefix': 'stats.timers', + 'global_prefix': None + } + + self.config = {'debug': True, 'flush_interval': 10000, 'expire': 0, 'pct_threshold': 90} + + self.graphite = Graphite(self.options) + self.graphite.init(self.config) + + def test_graphite_create(self): + self.assertEqual(self.graphite.host, self.options['graphite_host']) + self.assertEqual(self.graphite.port, self.options['graphite_port']) + self.assertEqual(self.graphite.counters_prefix, self.options['counters_prefix']) + self.assertEqual(self.graphite.timers_prefix, self.options['timers_prefix']) + self.assertEqual(self.graphite.global_prefix, self.options['global_prefix']) + + def test_graphite_flush(self): + ts = int(time.time()) + metrics = { + 'timers': {'glork': { + 'count': 1, 'max_threshold': 320.0, 'max': 320.0, 'min': 320.0, + 'pct_threshold': 90, 'mean': 320.0} + }, + 'gauges': {'gaugor': 333.0}, + 'counters': {'gorets': 1.1} + } + + self.graphite.flush(ts, metrics) + + # check connection information is correct + self.mock_socket.return_value.connect.assert_called_with(( + self.options['graphite_host'], self.options['graphite_port'])) + + # get sendall call argument + sendall = self.mock_socket.return_value.mock_calls[1] + self.assertEqual(sendall[0], 'sendall') + self.assertEqual(len(sendall[1]), 1) + + data = sendall[1][0].decode("utf-8").strip().split("\n") + + # check each metric + for metric in data: + fields = metric.split() + id = fields[0].split('.') + + if id[0] == 'statsd' and id[1] == 'numStats': + self.assertEqual(int(fields[1]), len(metrics)) + else: + self.assertEqual(id[0], self.options['counters_prefix']) + + # counters and gauges + if len(id) == 2: + self.assertEqual(metrics.get('gauges').get(id[1]) + or metrics.get('counters').get(id[1]), float(fields[1])) + # timers + elif len(id) == 4: + self.assertTrue(metrics.get('timers').get(id[2]) != None) + + # check timestamp + self.assertEqual(int(fields[2]), ts) + + # check connection has been closed + self.mock_socket.return_value.close.assert_called_with() + + + def tearDown(self): + for patcher in self.patchers: + patcher.stop() diff --git a/tests/server.py b/tests/server.py index 566f3e0..522c6c4 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1,25 +1,137 @@ import unittest import mock +import socket +import threading -# from pystatsd.statsd import Client from pystatsd.server import Server +from pystatsd.backends import Console +from mock import Mock, ANY +from time import sleep class ServerBasicsTestCase(unittest.TestCase): """ - Tests the basic operations of the client + Tests the basic operations of the server """ def setUp(self): - self.patchers = [] - - socket_patcher = mock.patch('pystatsd.statsd.socket.socket') - self.mock_socket = socket_patcher.start() - self.patchers.append(socket_patcher) + self.addr = (socket.gethostbyname(''), 8125) + self.backend = self.__create_backend() def test_server_create(self): - server = Server() + """Create a new server and checks if initialization is correct""" + server = Server(backends=[self.backend]) if getattr(self, "assertIsNotNone", False): self.assertIsNotNone(server) else: assert server is not None + + self.assertEqual(server.expire, 0) + self.assertEqual(server.buf, 8192) + self.assertEqual(len(server.backends), 1) + self.assertEqual(len(server.counters), 0) + self.assertEqual(len(server.timers), 0) + self.assertEqual(len(server.gauges), 0) + + self.backend.init.assert_called_with({'debug': False, 'flush_interval': 10000, 'expire': 0, 'pct_threshold': 90}) + + def test_server_process(self): + """ + Checks if the server is properly processing the different types of metrics. + """ + server = Server() + server.process('gorets:1|c\nglork:320|ms\ngaugor:333|g') + + self.assertEqual(len(server.counters), 1) + self.assertEqual(server.counters.get('gorets')[0], 1.0) + self.assertEqual(len(server.timers), 1) + self.assertEqual(server.timers.get('glork')[0], [320.0]) + self.assertEqual(len(server.gauges), 1) + self.assertEqual(server.gauges.get('gaugor')[0], 333.0) + + server.process('gorets:1|c|@0.1') + self.assertEqual(len(server.counters), 1) + self.assertEqual(server.counters.get('gorets')[0], 11.0) + + def test_server_flush(self): + """ + Checks if the backend is receiving the processed metrics properly after + a flush. + """ + server = Server(backends=[self.backend]) + server.process('gorets:1|c\ngorets:1|c|@0.1\nglork:320|ms\ngaugor:333|g') + + server.flush() + self.backend.flush.assert_called_with(ANY, { + 'timers': {'glork': { + 'count': 1, 'max_threshold': 320.0, 'max': 320.0, 'min': 320.0, + 'pct_threshold': 90, 'mean': 320.0} + }, + 'gauges': {'gaugor': 333.0}, + 'counters': {'gorets': 1.1} + }) + + server.flush() + self.backend.flush.assert_called_with(ANY, { + 'timers': {}, 'gauges': {'gaugor': 333.0}, 'counters': {}}) + + self.assertEqual(len(server.gauges), 1) + self.assertEqual(len(server.timers), 0) + self.assertEqual(len(server.counters), 0) + + def test_server_flush_del_gauges(self): + """ + Checks if all metrics are removed after a flush with a server set to delete + gauges. + """ + server = Server(deleteGauges=True, backends=[self.backend]) + server.process('gorets:1|c\ngorets:1|c|@0.1\nglork:320|ms\ngaugor:333|g') + server.flush() + + self.backend.flush.assert_called_with(ANY, { + 'timers': {'glork': { + 'count': 1, 'max_threshold': 320.0, 'max': 320.0, 'min': 320.0, + 'pct_threshold': 90, 'mean': 320.0} + }, + 'gauges': {'gaugor': 333.0}, + 'counters': {'gorets': 1.1} + }) + + self.assertEqual(len(server.gauges), 0) + self.assertEqual(len(server.timers), 0) + self.assertEqual(len(server.counters), 0) + + def test_server_flush_backends(self): + """ + Test a server with multiple backends and ensures that all of them are + called after a flush. + """ + backend_a = self.__create_backend() + backend_b = self.__create_backend() + + server = Server(backends=[backend_a, backend_b]) + server.flush() + + metrics = {'timers': {}, 'gauges': {}, 'counters': {}} + backend_a.flush.assert_called_with(ANY, metrics) + backend_b.flush.assert_called_with(ANY, metrics) + + def test_server_flush_expire(self): + """ + Test if metrics are removed by forcing them to expire. + """ + server = Server(expire=1, backends=[self.backend]) + server.process('gorets:1|c\ngorets:1|c|@0.1\nglork:320|ms\ngaugor:333|g') + + sleep(2) + server.flush() + self.assertEqual(len(server.gauges), 0) + self.assertEqual(len(server.timers), 0) + self.assertEqual(len(server.counters), 0) + + + def __create_backend(self): + backend = Console() + backend.init = Mock() + backend.flush = Mock() + return backend