From d0465e1d8192b009d7c0f47a2c6d778e16ecb03c Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 31 Jan 2025 16:20:16 +0300 Subject: [PATCH 1/6] Add dep --- packages/faststream-stomp/pyproject.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/faststream-stomp/pyproject.toml b/packages/faststream-stomp/pyproject.toml index d16b6ee0..11582d08 100644 --- a/packages/faststream-stomp/pyproject.toml +++ b/packages/faststream-stomp/pyproject.toml @@ -2,7 +2,7 @@ name = "faststream-stomp" description = "FastStream STOMP broker" authors = [{ name = "Lev Vereshchagin", email = "mail@vrslev.com" }] -dependencies = ["faststream>=0.5", "stompman"] +dependencies = ["faststream~=0.5", "stompman"] requires-python = ">=3.11" readme = "README.md" license = { text = "MIT" } @@ -23,6 +23,9 @@ repository = "https://github.com/vrslev/stompman" requires = ["hatchling", "hatch-vcs"] build-backend = "hatchling.build" +[dependency-groups] +dev = ["faststream[otel,prometheus]~=0.5"] + [tool.hatch.version] source = "vcs" raw-options.root = "../.." From 8bf7a93d2cc8f2f8506d98be6baf0190ff0741bd Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 31 Jan 2025 16:37:27 +0300 Subject: [PATCH 2/6] Add prometheus middleware --- .../faststream_stomp/prometheus.py | 50 +++++++++++++++++++ .../faststream_stomp/publisher.py | 25 +++++----- 2 files changed, 62 insertions(+), 13 deletions(-) create mode 100644 packages/faststream-stomp/faststream_stomp/prometheus.py diff --git a/packages/faststream-stomp/faststream_stomp/prometheus.py b/packages/faststream-stomp/faststream_stomp/prometheus.py new file mode 100644 index 00000000..36606c54 --- /dev/null +++ b/packages/faststream-stomp/faststream_stomp/prometheus.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import stompman +from faststream.prometheus import ConsumeAttrs, MetricsSettingsProvider +from faststream.prometheus.middleware import BasePrometheusMiddleware +from faststream.types import EMPTY + +if TYPE_CHECKING: + from collections.abc import Sequence + + from faststream.broker.message import StreamMessage + from prometheus_client import CollectorRegistry + + from faststream_stomp.publisher import StompProducerPublishKwargs + +__all__ = ["StompMetricsSettingsProvider", "StompPrometheusMiddleware"] + + +class StompMetricsSettingsProvider(MetricsSettingsProvider[stompman.MessageFrame]): + messaging_system = "stomp" + + def get_consume_attrs_from_message(self, msg: StreamMessage[stompman.MessageFrame]) -> ConsumeAttrs: # noqa: PLR6301 + return { + "destination_name": msg.raw_message.headers["destination"], + "message_size": len(msg.body), + "messages_count": 1, + } + + def get_publish_destination_name_from_kwargs(self, kwargs: StompProducerPublishKwargs) -> str: # type: ignore[override] # noqa: PLR6301 + return kwargs["destination"] + + +class StompPrometheusMiddleware(BasePrometheusMiddleware): + def __init__( + self, + *, + registry: CollectorRegistry, + app_name: str = EMPTY, + metrics_prefix: str = "faststream", + received_messages_size_buckets: Sequence[float] | None = None, + ) -> None: + super().__init__( + settings_provider_factory=lambda _: StompMetricsSettingsProvider(), + registry=registry, + app_name=app_name, + metrics_prefix=metrics_prefix, + received_messages_size_buckets=received_messages_size_buckets, + ) diff --git a/packages/faststream-stomp/faststream_stomp/publisher.py b/packages/faststream-stomp/faststream_stomp/publisher.py index 2f2ac592..3c53be5c 100644 --- a/packages/faststream-stomp/faststream_stomp/publisher.py +++ b/packages/faststream-stomp/faststream_stomp/publisher.py @@ -1,7 +1,7 @@ from collections.abc import Sequence from functools import partial from itertools import chain -from typing import Any +from typing import Any, TypedDict, Unpack import stompman from faststream.asyncapi.schema import Channel, CorrelationId, Message, Operation @@ -14,6 +14,12 @@ from faststream.types import AsyncFunc, SendableMessage +class StompProducerPublishKwargs(TypedDict): + destination: str + correlation_id: str | None + headers: dict[str, str] | None + + class StompProducer(ProducerProto): _parser: AsyncCallable _decoder: AsyncCallable @@ -21,19 +27,12 @@ class StompProducer(ProducerProto): def __init__(self, client: stompman.Client) -> None: self.client = client - async def publish( # type: ignore[override] - self, - message: SendableMessage, - *, - destination: str, - correlation_id: str | None, - headers: dict[str, str] | None, - ) -> None: + async def publish(self, message: SendableMessage, **kwargs: Unpack[StompProducerPublishKwargs]) -> None: # type: ignore[override] body, content_type = encode_message(message) - all_headers = headers.copy() if headers else {} - if correlation_id: - all_headers["correlation-id"] = correlation_id - await self.client.send(body, destination, content_type=content_type, headers=all_headers) + all_headers = kwargs["headers"].copy() if kwargs["headers"] else {} + if kwargs["correlation_id"]: + all_headers["correlation-id"] = kwargs["correlation_id"] + await self.client.send(body, kwargs["destination"], content_type=content_type, headers=all_headers) async def request( # type: ignore[override] self, message: SendableMessage, *, correlation_id: str | None, headers: dict[str, str] | None From 55ab4d0127eb19697cf102215601a0461b728608 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 31 Jan 2025 16:54:32 +0300 Subject: [PATCH 3/6] Add opentelemetry middleware --- .../faststream_stomp/opentelemetry.py | 54 +++++++++++++++++++ .../faststream_stomp/parser.py | 2 +- 2 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 packages/faststream-stomp/faststream_stomp/opentelemetry.py diff --git a/packages/faststream-stomp/faststream_stomp/opentelemetry.py b/packages/faststream-stomp/faststream_stomp/opentelemetry.py new file mode 100644 index 00000000..a632c93e --- /dev/null +++ b/packages/faststream-stomp/faststream_stomp/opentelemetry.py @@ -0,0 +1,54 @@ +import stompman +from faststream.broker.message import StreamMessage +from faststream.opentelemetry import TelemetrySettingsProvider +from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME +from faststream.opentelemetry.middleware import TelemetryMiddleware +from faststream.types import AnyDict +from opentelemetry.metrics import Meter, MeterProvider +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import TracerProvider + +from faststream_stomp.publisher import StompProducerPublishKwargs + + +class StompTelemetrySettingsProvider(TelemetrySettingsProvider[stompman.MessageFrame]): + messaging_system = "stomp" + + def get_consume_attrs_from_message(self, msg: StreamMessage[stompman.MessageFrame]) -> "AnyDict": + return { + SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, + SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id, + SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id, + SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body), + MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.headers["destination"], + } + + def get_consume_destination_name(self, msg: StreamMessage[stompman.MessageFrame]) -> str: # noqa: PLR6301 + return msg.raw_message.headers["destination"] + + def get_publish_attrs_from_kwargs(self, kwargs: StompProducerPublishKwargs) -> AnyDict: # type: ignore[override] + return { + SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, + SpanAttributes.MESSAGING_DESTINATION_NAME: kwargs["destination"], + SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: kwargs["correlation_id"], + } + + def get_publish_destination_name(self, kwargs: StompProducerPublishKwargs) -> str: # type: ignore[override] # noqa: PLR6301 + return kwargs["destination"] + + +class StompTelemetryMiddleware(TelemetryMiddleware): + def __init__( + self, + *, + tracer_provider: TracerProvider | None = None, + meter_provider: MeterProvider | None = None, + meter: Meter | None = None, + ) -> None: + super().__init__( + settings_provider_factory=lambda _: StompTelemetrySettingsProvider(), + tracer_provider=tracer_provider, + meter_provider=meter_provider, + meter=meter, + include_messages_counters=False, + ) diff --git a/packages/faststream-stomp/faststream_stomp/parser.py b/packages/faststream-stomp/faststream_stomp/parser.py index f777934b..56acc23d 100644 --- a/packages/faststream-stomp/faststream_stomp/parser.py +++ b/packages/faststream-stomp/faststream_stomp/parser.py @@ -12,7 +12,7 @@ async def parse_message(message: stompman.MessageFrame) -> StreamMessage[stompma body=message.body, headers=cast("dict[str, str]", message.headers), content_type=message.headers.get("content-type"), - message_id=message.headers.get("message-id", gen_cor_id()), + message_id=message.headers["message-id"], correlation_id=cast("str", message.headers.get("correlation-id", gen_cor_id())), ) From 97200e9ffb06b0fab1c180e3c42f4474fba659cf Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 31 Jan 2025 17:20:42 +0300 Subject: [PATCH 4/6] Add OpenTelemetry tests --- .../faststream_stomp/opentelemetry.py | 6 ++- .../test_faststream_stomp/test_main.py | 46 +++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/packages/faststream-stomp/faststream_stomp/opentelemetry.py b/packages/faststream-stomp/faststream_stomp/opentelemetry.py index a632c93e..fcdf690e 100644 --- a/packages/faststream-stomp/faststream_stomp/opentelemetry.py +++ b/packages/faststream-stomp/faststream_stomp/opentelemetry.py @@ -27,11 +27,13 @@ def get_consume_destination_name(self, msg: StreamMessage[stompman.MessageFrame] return msg.raw_message.headers["destination"] def get_publish_attrs_from_kwargs(self, kwargs: StompProducerPublishKwargs) -> AnyDict: # type: ignore[override] - return { + publish_attrs = { SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, SpanAttributes.MESSAGING_DESTINATION_NAME: kwargs["destination"], - SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: kwargs["correlation_id"], } + if kwargs["correlation_id"]: + publish_attrs[SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID] = kwargs["correlation_id"] + return publish_attrs def get_publish_destination_name(self, kwargs: StompProducerPublishKwargs) -> str: # type: ignore[override] # noqa: PLR6301 return kwargs["destination"] diff --git a/packages/faststream-stomp/test_faststream_stomp/test_main.py b/packages/faststream-stomp/test_faststream_stomp/test_main.py index 87e8cbbb..e9e2f727 100644 --- a/packages/faststream-stomp/test_faststream_stomp/test_main.py +++ b/packages/faststream-stomp/test_faststream_stomp/test_main.py @@ -7,6 +7,13 @@ from faststream import FastStream from faststream.asyncapi import get_app_schema from faststream.broker.message import gen_cor_id +from faststream_stomp.opentelemetry import StompTelemetryMiddleware, StompTelemetrySettingsProvider +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from test_stompman.conftest import build_dataclass pytestmark = pytest.mark.anyio @@ -81,3 +88,42 @@ def test_asyncapi_schema(faker: faker.Faker, broker: faststream_stomp.StompBroke ) ) get_app_schema(FastStream(broker)) + + +async def test_opentelemetry_spans(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: + resource = Resource.create(attributes={"service.name": "faststream.test"}) + tracer_provider = TracerProvider(resource=resource) + span_exporter = InMemorySpanExporter() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + broker.add_middleware(StompTelemetryMiddleware(tracer_provider=tracer_provider)) + message, destination = faker.pystr(), faker.pystr() + + async with faststream_stomp.TestStompBroker(broker): + await broker.start() + await broker.publish(message, destination) + + assert [tuple((one_span.attributes or {}).values()) for one_span in span_exporter.get_finished_spans()] == [ + (StompTelemetrySettingsProvider.messaging_system, destination), + (StompTelemetrySettingsProvider.messaging_system, destination, "publish"), + ] + + +async def test_opentelemetry_metrics(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: + resource = Resource.create(attributes={"service.name": "faststream.test"}) + metric_reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=(metric_reader,), resource=resource) + broker.add_middleware(StompTelemetryMiddleware(meter_provider=meter_provider)) + message, destination = faker.pystr(), faker.pystr() + + async with faststream_stomp.TestStompBroker(broker): + await broker.start() + await broker.publish(message, destination) + + metrics_data = metric_reader.get_metrics_data() + assert metrics_data + assert tuple( + metrics_data.resource_metrics[0].scope_metrics[0].metrics[0].data.data_points[0].attributes.values() + ) == ( + StompTelemetrySettingsProvider.messaging_system, + destination, + ) From 263de5856cd828cbfe42bcc97e6df08fa603d254 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 31 Jan 2025 17:24:30 +0300 Subject: [PATCH 5/6] Simplify test --- .../test_faststream_stomp/test_main.py | 44 +++---------------- 1 file changed, 6 insertions(+), 38 deletions(-) diff --git a/packages/faststream-stomp/test_faststream_stomp/test_main.py b/packages/faststream-stomp/test_faststream_stomp/test_main.py index e9e2f727..34f29053 100644 --- a/packages/faststream-stomp/test_faststream_stomp/test_main.py +++ b/packages/faststream-stomp/test_faststream_stomp/test_main.py @@ -7,13 +7,9 @@ from faststream import FastStream from faststream.asyncapi import get_app_schema from faststream.broker.message import gen_cor_id -from faststream_stomp.opentelemetry import StompTelemetryMiddleware, StompTelemetrySettingsProvider +from faststream_stomp.opentelemetry import StompTelemetryMiddleware from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import InMemoryMetricReader -from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import SimpleSpanProcessor -from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from test_stompman.conftest import build_dataclass pytestmark = pytest.mark.anyio @@ -90,40 +86,12 @@ def test_asyncapi_schema(faker: faker.Faker, broker: faststream_stomp.StompBroke get_app_schema(FastStream(broker)) -async def test_opentelemetry_spans(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: - resource = Resource.create(attributes={"service.name": "faststream.test"}) - tracer_provider = TracerProvider(resource=resource) - span_exporter = InMemorySpanExporter() - tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) - broker.add_middleware(StompTelemetryMiddleware(tracer_provider=tracer_provider)) - message, destination = faker.pystr(), faker.pystr() +async def test_opentelemetry_publish(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: + broker.add_middleware(StompTelemetryMiddleware(tracer_provider=TracerProvider(), meter_provider=MeterProvider())) - async with faststream_stomp.TestStompBroker(broker): - await broker.start() - await broker.publish(message, destination) - - assert [tuple((one_span.attributes or {}).values()) for one_span in span_exporter.get_finished_spans()] == [ - (StompTelemetrySettingsProvider.messaging_system, destination), - (StompTelemetrySettingsProvider.messaging_system, destination, "publish"), - ] - - -async def test_opentelemetry_metrics(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: - resource = Resource.create(attributes={"service.name": "faststream.test"}) - metric_reader = InMemoryMetricReader() - meter_provider = MeterProvider(metric_readers=(metric_reader,), resource=resource) - broker.add_middleware(StompTelemetryMiddleware(meter_provider=meter_provider)) - message, destination = faker.pystr(), faker.pystr() + @broker.subscriber(destination := faker.pystr()) + def _() -> None: ... async with faststream_stomp.TestStompBroker(broker): await broker.start() - await broker.publish(message, destination) - - metrics_data = metric_reader.get_metrics_data() - assert metrics_data - assert tuple( - metrics_data.resource_metrics[0].scope_metrics[0].metrics[0].data.data_points[0].attributes.values() - ) == ( - StompTelemetrySettingsProvider.messaging_system, - destination, - ) + await broker.publish(faker.pystr(), destination, correlation_id=gen_cor_id()) From 6b13e081308b3aad65fbfe59c704a2ef1a52b0af Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 31 Jan 2025 17:26:00 +0300 Subject: [PATCH 6/6] Add prometheus test --- .../test_faststream_stomp/test_main.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/packages/faststream-stomp/test_faststream_stomp/test_main.py b/packages/faststream-stomp/test_faststream_stomp/test_main.py index 34f29053..44172252 100644 --- a/packages/faststream-stomp/test_faststream_stomp/test_main.py +++ b/packages/faststream-stomp/test_faststream_stomp/test_main.py @@ -8,8 +8,10 @@ from faststream.asyncapi import get_app_schema from faststream.broker.message import gen_cor_id from faststream_stomp.opentelemetry import StompTelemetryMiddleware +from faststream_stomp.prometheus import StompPrometheusMiddleware from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.trace import TracerProvider +from prometheus_client import CollectorRegistry from test_stompman.conftest import build_dataclass pytestmark = pytest.mark.anyio @@ -95,3 +97,14 @@ def _() -> None: ... async with faststream_stomp.TestStompBroker(broker): await broker.start() await broker.publish(faker.pystr(), destination, correlation_id=gen_cor_id()) + + +async def test_prometheus_publish(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: + broker.add_middleware(StompPrometheusMiddleware(registry=CollectorRegistry())) + + @broker.subscriber(destination := faker.pystr()) + def _() -> None: ... + + async with faststream_stomp.TestStompBroker(broker): + await broker.start() + await broker.publish(faker.pystr(), destination, correlation_id=gen_cor_id())