diff --git a/packages/faststream-stomp/faststream_stomp/opentelemetry.py b/packages/faststream-stomp/faststream_stomp/opentelemetry.py new file mode 100644 index 00000000..fcdf690e --- /dev/null +++ b/packages/faststream-stomp/faststream_stomp/opentelemetry.py @@ -0,0 +1,56 @@ +import stompman +from faststream.broker.message import StreamMessage +from faststream.opentelemetry import TelemetrySettingsProvider +from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME +from faststream.opentelemetry.middleware import TelemetryMiddleware +from faststream.types import AnyDict +from opentelemetry.metrics import Meter, MeterProvider +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import TracerProvider + +from faststream_stomp.publisher import StompProducerPublishKwargs + + +class StompTelemetrySettingsProvider(TelemetrySettingsProvider[stompman.MessageFrame]): + messaging_system = "stomp" + + def get_consume_attrs_from_message(self, msg: StreamMessage[stompman.MessageFrame]) -> "AnyDict": + return { + SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, + SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id, + SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id, + SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body), + MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.headers["destination"], + } + + def get_consume_destination_name(self, msg: StreamMessage[stompman.MessageFrame]) -> str: # noqa: PLR6301 + return msg.raw_message.headers["destination"] + + def get_publish_attrs_from_kwargs(self, kwargs: StompProducerPublishKwargs) -> AnyDict: # type: ignore[override] + publish_attrs = { + SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, + SpanAttributes.MESSAGING_DESTINATION_NAME: kwargs["destination"], + } + if kwargs["correlation_id"]: + publish_attrs[SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID] = kwargs["correlation_id"] + return publish_attrs + + def get_publish_destination_name(self, kwargs: StompProducerPublishKwargs) -> str: # type: ignore[override] # noqa: PLR6301 + return kwargs["destination"] + + +class StompTelemetryMiddleware(TelemetryMiddleware): + def __init__( + self, + *, + tracer_provider: TracerProvider | None = None, + meter_provider: MeterProvider | None = None, + meter: Meter | None = None, + ) -> None: + super().__init__( + settings_provider_factory=lambda _: StompTelemetrySettingsProvider(), + tracer_provider=tracer_provider, + meter_provider=meter_provider, + meter=meter, + include_messages_counters=False, + ) diff --git a/packages/faststream-stomp/faststream_stomp/parser.py b/packages/faststream-stomp/faststream_stomp/parser.py index f777934b..56acc23d 100644 --- a/packages/faststream-stomp/faststream_stomp/parser.py +++ b/packages/faststream-stomp/faststream_stomp/parser.py @@ -12,7 +12,7 @@ async def parse_message(message: stompman.MessageFrame) -> StreamMessage[stompma body=message.body, headers=cast("dict[str, str]", message.headers), content_type=message.headers.get("content-type"), - message_id=message.headers.get("message-id", gen_cor_id()), + message_id=message.headers["message-id"], correlation_id=cast("str", message.headers.get("correlation-id", gen_cor_id())), ) diff --git a/packages/faststream-stomp/faststream_stomp/prometheus.py b/packages/faststream-stomp/faststream_stomp/prometheus.py new file mode 100644 index 00000000..36606c54 --- /dev/null +++ b/packages/faststream-stomp/faststream_stomp/prometheus.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import stompman +from faststream.prometheus import ConsumeAttrs, MetricsSettingsProvider +from faststream.prometheus.middleware import BasePrometheusMiddleware +from faststream.types import EMPTY + +if TYPE_CHECKING: + from collections.abc import Sequence + + from faststream.broker.message import StreamMessage + from prometheus_client import CollectorRegistry + + from faststream_stomp.publisher import StompProducerPublishKwargs + +__all__ = ["StompMetricsSettingsProvider", "StompPrometheusMiddleware"] + + +class StompMetricsSettingsProvider(MetricsSettingsProvider[stompman.MessageFrame]): + messaging_system = "stomp" + + def get_consume_attrs_from_message(self, msg: StreamMessage[stompman.MessageFrame]) -> ConsumeAttrs: # noqa: PLR6301 + return { + "destination_name": msg.raw_message.headers["destination"], + "message_size": len(msg.body), + "messages_count": 1, + } + + def get_publish_destination_name_from_kwargs(self, kwargs: StompProducerPublishKwargs) -> str: # type: ignore[override] # noqa: PLR6301 + return kwargs["destination"] + + +class StompPrometheusMiddleware(BasePrometheusMiddleware): + def __init__( + self, + *, + registry: CollectorRegistry, + app_name: str = EMPTY, + metrics_prefix: str = "faststream", + received_messages_size_buckets: Sequence[float] | None = None, + ) -> None: + super().__init__( + settings_provider_factory=lambda _: StompMetricsSettingsProvider(), + registry=registry, + app_name=app_name, + metrics_prefix=metrics_prefix, + received_messages_size_buckets=received_messages_size_buckets, + ) diff --git a/packages/faststream-stomp/faststream_stomp/publisher.py b/packages/faststream-stomp/faststream_stomp/publisher.py index 2f2ac592..3c53be5c 100644 --- a/packages/faststream-stomp/faststream_stomp/publisher.py +++ b/packages/faststream-stomp/faststream_stomp/publisher.py @@ -1,7 +1,7 @@ from collections.abc import Sequence from functools import partial from itertools import chain -from typing import Any +from typing import Any, TypedDict, Unpack import stompman from faststream.asyncapi.schema import Channel, CorrelationId, Message, Operation @@ -14,6 +14,12 @@ from faststream.types import AsyncFunc, SendableMessage +class StompProducerPublishKwargs(TypedDict): + destination: str + correlation_id: str | None + headers: dict[str, str] | None + + class StompProducer(ProducerProto): _parser: AsyncCallable _decoder: AsyncCallable @@ -21,19 +27,12 @@ class StompProducer(ProducerProto): def __init__(self, client: stompman.Client) -> None: self.client = client - async def publish( # type: ignore[override] - self, - message: SendableMessage, - *, - destination: str, - correlation_id: str | None, - headers: dict[str, str] | None, - ) -> None: + async def publish(self, message: SendableMessage, **kwargs: Unpack[StompProducerPublishKwargs]) -> None: # type: ignore[override] body, content_type = encode_message(message) - all_headers = headers.copy() if headers else {} - if correlation_id: - all_headers["correlation-id"] = correlation_id - await self.client.send(body, destination, content_type=content_type, headers=all_headers) + all_headers = kwargs["headers"].copy() if kwargs["headers"] else {} + if kwargs["correlation_id"]: + all_headers["correlation-id"] = kwargs["correlation_id"] + await self.client.send(body, kwargs["destination"], content_type=content_type, headers=all_headers) async def request( # type: ignore[override] self, message: SendableMessage, *, correlation_id: str | None, headers: dict[str, str] | None diff --git a/packages/faststream-stomp/pyproject.toml b/packages/faststream-stomp/pyproject.toml index d16b6ee0..11582d08 100644 --- a/packages/faststream-stomp/pyproject.toml +++ b/packages/faststream-stomp/pyproject.toml @@ -2,7 +2,7 @@ name = "faststream-stomp" description = "FastStream STOMP broker" authors = [{ name = "Lev Vereshchagin", email = "mail@vrslev.com" }] -dependencies = ["faststream>=0.5", "stompman"] +dependencies = ["faststream~=0.5", "stompman"] requires-python = ">=3.11" readme = "README.md" license = { text = "MIT" } @@ -23,6 +23,9 @@ repository = "https://github.com/vrslev/stompman" requires = ["hatchling", "hatch-vcs"] build-backend = "hatchling.build" +[dependency-groups] +dev = ["faststream[otel,prometheus]~=0.5"] + [tool.hatch.version] source = "vcs" raw-options.root = "../.." diff --git a/packages/faststream-stomp/test_faststream_stomp/test_main.py b/packages/faststream-stomp/test_faststream_stomp/test_main.py index 87e8cbbb..44172252 100644 --- a/packages/faststream-stomp/test_faststream_stomp/test_main.py +++ b/packages/faststream-stomp/test_faststream_stomp/test_main.py @@ -7,6 +7,11 @@ from faststream import FastStream from faststream.asyncapi import get_app_schema from faststream.broker.message import gen_cor_id +from faststream_stomp.opentelemetry import StompTelemetryMiddleware +from faststream_stomp.prometheus import StompPrometheusMiddleware +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.trace import TracerProvider +from prometheus_client import CollectorRegistry from test_stompman.conftest import build_dataclass pytestmark = pytest.mark.anyio @@ -81,3 +86,25 @@ def test_asyncapi_schema(faker: faker.Faker, broker: faststream_stomp.StompBroke ) ) get_app_schema(FastStream(broker)) + + +async def test_opentelemetry_publish(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: + broker.add_middleware(StompTelemetryMiddleware(tracer_provider=TracerProvider(), meter_provider=MeterProvider())) + + @broker.subscriber(destination := faker.pystr()) + def _() -> None: ... + + async with faststream_stomp.TestStompBroker(broker): + await broker.start() + await broker.publish(faker.pystr(), destination, correlation_id=gen_cor_id()) + + +async def test_prometheus_publish(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: + broker.add_middleware(StompPrometheusMiddleware(registry=CollectorRegistry())) + + @broker.subscriber(destination := faker.pystr()) + def _() -> None: ... + + async with faststream_stomp.TestStompBroker(broker): + await broker.start() + await broker.publish(faker.pystr(), destination, correlation_id=gen_cor_id())