Skip to content

Commit 48cc26f

Browse files
committed
streaming clients; decorators + context managers; more tests
1 parent 338289b commit 48cc26f

File tree

14 files changed

+436
-94
lines changed

14 files changed

+436
-94
lines changed

README.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ New metric constructors use separate `PUSH_REGISTRY` as a default, not to interf
2525

2626
### Default labelvalues
2727

28-
With regular prometheus_client, defaults may be defined for either _none_ or _all_ the labels (with `labelvalues`), but that's not enough.
28+
With regular prometheus_client, defaults may be defined for either _none_ or _all_ the labels (with `labelvalues`), but that's not enough (and sometimes doesn't work?).
2929

3030
We probably want to define _some_ defaults, like `hostname`, or more importantly, **if we use VictoriaMetrics cluster**, we always need to push label `VictoriaMetrics_AccountID=<int>` (usually 1) or else our metrics will be ignored.
3131

@@ -55,13 +55,13 @@ counter1.labels(host="non-default", event_type="login").inc()
5555
counter1.labels("non-default", "login").inc()
5656
```
5757

58-
Metrics with no labels are initialized at creation time. This can have unpleasant side-effect: if we initialize lots of metrics not used in currently running job, background clients will have to push their non-changing values in every synchronization session.
58+
Metrics with no labels are initialized at creation time. This can have unpleasant side-effect: if we initialize lots of metrics not used in currently running job, batch clients will have to push their non-changing values in every synchronization session.
5959

6060
To avoid that we'll have to properly isolate each task's metrics, which can be impossible or rather tricky, or we can create metrics with default, non-changing labels (like `hostname`). Such metrics will be initialized on fisrt use (inc), and we'll be pushing only those we actually used.
6161

62-
## Background clients
62+
## Batch clients
6363

64-
Background clients spawn synchronization jobs "in background" (meaning in a thread or asyncio task) to periodically send all metrics from `ppc.PUSH_REGISTRY` to the destination.
64+
Batch clients spawn synchronization jobs "in background" (meaning in a thread or asyncio task) to periodically send all metrics from `ppc.PUSH_REGISTRY` to the destination.
6565

6666
Clients will attempt to stop gracefully, synchronizing registry "one last time" after job exits or crashes. Sometimes this _may_ mess up Grafana sampling, but the worst picture I could atrifically create looks like this:
6767

@@ -99,3 +99,8 @@ async def main(urls):
9999
req_hist.labels(gethostname(url)).observe(response.elapsed)
100100
# ...
101101
```
102+
103+
104+
## Streming clients
105+
106+
:warning: histogram.obsrever doesn't work, fix later

prometheus_push_client/__init__.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
1-
from .clients.bg import ThreadBGClient, AsyncioBGClient
2-
# TODO: clients.fg
1+
from .registry import PUSH_REGISTRY
2+
3+
from .clients.batch import ThreadBatchClient, AsyncBatchClient
4+
from .clients.streaming import SyncStreamingClient, AsyncStreamingClient
35
from .decorators import (
46
influx_udp_async,
57
statsd_udp_async,
68
influx_udp_thread,
79
statsd_udp_thread,
810
influx_http_async,
911
influx_http_thread,
12+
influx_udp_aiostream,
13+
statsd_udp_aiostream,
14+
influx_udp_stream,
15+
statsd_udp_stream,
1016
)
1117
from .formats.influx import InfluxFormat
1218
from .formats.statsd import StatsdFormat
1319
from .formats.openmetrics import OpenMetricsFormat
1420
from .metrics import Counter, Gauge, Summary, Histogram, Info, Enum
15-
from .registry import PUSH_REGISTRY
1621
from .transports.http import SyncHttpTransport, AioHttpTransport
1722
from .transports.udp import SyncUdpTransport, AioUdpTransport
1823
from .version import __version__
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""
2+
For documentation purposes
3+
"""
4+
5+
6+
class BaseSyncClient: # pragma: no cover
7+
def start(self):
8+
raise NotImplementedError()
9+
10+
def stop(self):
11+
raise NotImplementedError()
12+
13+
14+
class BaseAsyncClient: # pragma: no cover
15+
async def start(self):
16+
raise NotImplementedError()
17+
18+
async def stop(self):
19+
raise NotImplementedError()

prometheus_push_client/clients/bg.py renamed to prometheus_push_client/clients/batch.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@
66
from prometheus_push_client import compat
77

88

9-
log = logging.getLogger("prometheus.bg")
9+
log = logging.getLogger("prometheus.batch")
1010

1111

12-
class BackgroundClient:
13-
12+
class BaseBatchClient:
1413
def __init__(self, format, transport, period=15.0, *args, **kwargs):
1514
self.format = format
1615
self.transport = transport
@@ -29,7 +28,7 @@ def sleep_steps_iter(self, period):
2928
yield last_step
3029

3130

32-
class ThreadBGClient(BackgroundClient, threading.Thread):
31+
class ThreadBatchClient(BaseBatchClient, threading.Thread):
3332
def start(self):
3433
self.stop_event = threading.Event()
3534
self.transport.start()
@@ -52,15 +51,15 @@ def run(self):
5251
break
5352

5453
ts_start = time.time()
55-
samples_iter = self.format.iter_samples()
54+
data_gen = self.format.iter_samples()
5655
try:
57-
self.transport.push_all(samples_iter)
56+
self.transport.push_all(data_gen)
5857
except Exception:
5958
log.error("push crashed", exc_info=True)
6059
period = self.period - (time.time() - ts_start)
6160

6261

63-
class AsyncioBGClient(BackgroundClient):
62+
class AsyncBatchClient(BaseBatchClient):
6463
async def start(self):
6564
self.stop_event = asyncio.Event()
6665
await self.transport.start()
@@ -86,9 +85,9 @@ async def run(self):
8685
break
8786

8887
ts_start = time.time()
89-
samples_iter = self.format.iter_samples()
88+
data_gen = self.format.iter_samples()
9089
try:
91-
await self.transport.push_all(samples_iter)
90+
await self.transport.push_all(data_gen)
9291
except Exception:
9392
log.error("push crashed", exc_info=True)
9493
period = self.period - (time.time() - ts_start)
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
import logging
2+
from types import MethodType
3+
4+
5+
log = logging.getLogger("prometheus.streaming")
6+
7+
METRIC_METHODS = ("inc", "dec", "set", "observe")
8+
9+
10+
class BaseStreamingClient:
11+
def __init__(self, format, transport):
12+
self.format = format
13+
self.transport = transport
14+
15+
def _monkey_patch_all(self):
16+
registry = self.format.registry
17+
with registry._lock:
18+
patch_registry(registry, self.format, self.transport)
19+
for collector in registry._collector_to_names.keys():
20+
patch_collector(collector, self.format, self.transport)
21+
22+
def _restore_all(self):
23+
registry = self.format.registry
24+
with registry._lock:
25+
restore_registry(registry)
26+
for collector in registry._collector_to_names.keys():
27+
restore_collector(collector)
28+
29+
30+
class SyncStreamingClient(BaseStreamingClient):
31+
def start(self):
32+
self._monkey_patch_all()
33+
self.transport.start()
34+
35+
def stop(self):
36+
self._restore_all()
37+
self.transport.stop()
38+
39+
40+
class AsyncStreamingClient(BaseStreamingClient):
41+
async def start(self):
42+
self._monkey_patch_all()
43+
await self.transport.start()
44+
45+
async def stop(self):
46+
self._restore_all()
47+
await self.transport.stop()
48+
49+
50+
def push_all(collector, format, transport):
51+
try:
52+
metrics = collector.collect()
53+
data_gen = format.format_metrics(*metrics)
54+
transport.push_all_sync(data_gen)
55+
except Exception:
56+
log.error("push crashed", exc_info=True)
57+
58+
59+
#
60+
## Utils for monkey-patching
61+
#
62+
63+
64+
def patch_registry(registry, format, transport):
65+
register_orig = registry.register
66+
67+
def register_new(self, collector, *args, **kwargs):
68+
patch_collector(collector, format, transport)
69+
register_orig(collector, *args, **kwargs)
70+
71+
setattr(registry, "register", MethodType(register_new, registry))
72+
73+
74+
def patch_collector(collector, format, transport, parent=None):
75+
# own methods
76+
for method_name in METRIC_METHODS:
77+
if not hasattr(collector, method_name):
78+
continue
79+
patch_collector_method(collector, format, transport, method_name, parent)
80+
81+
# child _metrics
82+
if hasattr(collector, "_metrics"):
83+
with collector._lock:
84+
patch_collector_submetrics(collector, format, transport)
85+
86+
87+
def patch_collector_method(collector, format, transport, method_name, parent=None):
88+
method_orig = getattr(collector, method_name)
89+
90+
def method_new(self, *args, **kwargs):
91+
method_orig(*args, **kwargs)
92+
93+
push_from = self if parent is None else parent
94+
push_all(push_from, format, transport)
95+
96+
setattr(collector, method_name, MethodType(method_new, collector))
97+
98+
99+
def patch_collector_submetrics(collector, format, transport):
100+
new_container = SelfPatchingMetrics(collector, format, transport)
101+
102+
for orig_key in list(collector._metrics.keys()):
103+
orig_collector = collector._metrics.pop(orig_key)
104+
new_container[orig_key] = orig_collector
105+
106+
collector._metrics = new_container
107+
108+
109+
class SelfPatchingMetrics(dict):
110+
def __init__(self, parent, format, transport):
111+
self.parent = parent
112+
self.format = format
113+
self.transport = transport
114+
dict.__init__(self)
115+
116+
def __setitem__(self, key, submetric):
117+
patch_collector(submetric, self.format, self.transport, self.parent)
118+
dict.__setitem__(self, key, submetric)
119+
120+
121+
def restore_registry(registry):
122+
setattr(
123+
registry,
124+
"register",
125+
MethodType(
126+
registry.__class__.register,
127+
registry
128+
)
129+
)
130+
131+
132+
def restore_collector(collector):
133+
for method_name in METRIC_METHODS:
134+
if not hasattr(collector, method_name):
135+
continue
136+
137+
setattr(
138+
collector,
139+
method_name,
140+
MethodType(
141+
getattr(collector.__class__, method_name),
142+
collector,
143+
)
144+
)
145+
146+
if hasattr(collector, "_metrics"):
147+
with collector._lock:
148+
collector._metrics = {**collector._metrics}

prometheus_push_client/compat.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import contextlib
12
import asyncio
23

4+
35
if hasattr(asyncio, "get_running_loop"):
46
get_running_loop = asyncio.get_running_loop
57
else: # < 3.7, may be unsafe outside coroutines (running loop)
@@ -10,4 +12,4 @@
1012
create_task = asyncio.create_task
1113
else: # < 3.7, totally unsafe outside loop
1214
def create_task(coro): # pragma: no cover
13-
return asyncio.get_event_loop().create_task(coro)
15+
return asyncio.get_event_loop().create_task(coro)

0 commit comments

Comments
 (0)