Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions packages/faststream-stomp/faststream_stomp/opentelemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import stompman
from faststream.broker.message import StreamMessage
from faststream.opentelemetry import TelemetrySettingsProvider
from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME
from faststream.opentelemetry.middleware import TelemetryMiddleware
from faststream.types import AnyDict
from opentelemetry.metrics import Meter, MeterProvider
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import TracerProvider

from faststream_stomp.publisher import StompProducerPublishKwargs


class StompTelemetrySettingsProvider(TelemetrySettingsProvider[stompman.MessageFrame]):
messaging_system = "stomp"

def get_consume_attrs_from_message(self, msg: StreamMessage[stompman.MessageFrame]) -> "AnyDict":
return {
SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id,
SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id,
SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body),
MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.headers["destination"],
}

def get_consume_destination_name(self, msg: StreamMessage[stompman.MessageFrame]) -> str: # noqa: PLR6301
return msg.raw_message.headers["destination"]

def get_publish_attrs_from_kwargs(self, kwargs: StompProducerPublishKwargs) -> AnyDict: # type: ignore[override]
publish_attrs = {
SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
SpanAttributes.MESSAGING_DESTINATION_NAME: kwargs["destination"],
}
if kwargs["correlation_id"]:
publish_attrs[SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID] = kwargs["correlation_id"]
return publish_attrs

def get_publish_destination_name(self, kwargs: StompProducerPublishKwargs) -> str: # type: ignore[override] # noqa: PLR6301
return kwargs["destination"]


class StompTelemetryMiddleware(TelemetryMiddleware):
def __init__(
self,
*,
tracer_provider: TracerProvider | None = None,
meter_provider: MeterProvider | None = None,
meter: Meter | None = None,
) -> None:
super().__init__(
settings_provider_factory=lambda _: StompTelemetrySettingsProvider(),
tracer_provider=tracer_provider,
meter_provider=meter_provider,
meter=meter,
include_messages_counters=False,
)
2 changes: 1 addition & 1 deletion packages/faststream-stomp/faststream_stomp/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async def parse_message(message: stompman.MessageFrame) -> StreamMessage[stompma
body=message.body,
headers=cast("dict[str, str]", message.headers),
content_type=message.headers.get("content-type"),
message_id=message.headers.get("message-id", gen_cor_id()),
message_id=message.headers["message-id"],
correlation_id=cast("str", message.headers.get("correlation-id", gen_cor_id())),
)

Expand Down
50 changes: 50 additions & 0 deletions packages/faststream-stomp/faststream_stomp/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import stompman
from faststream.prometheus import ConsumeAttrs, MetricsSettingsProvider
from faststream.prometheus.middleware import BasePrometheusMiddleware
from faststream.types import EMPTY

if TYPE_CHECKING:
from collections.abc import Sequence

from faststream.broker.message import StreamMessage
from prometheus_client import CollectorRegistry

from faststream_stomp.publisher import StompProducerPublishKwargs

__all__ = ["StompMetricsSettingsProvider", "StompPrometheusMiddleware"]


class StompMetricsSettingsProvider(MetricsSettingsProvider[stompman.MessageFrame]):
messaging_system = "stomp"

def get_consume_attrs_from_message(self, msg: StreamMessage[stompman.MessageFrame]) -> ConsumeAttrs: # noqa: PLR6301
return {
"destination_name": msg.raw_message.headers["destination"],
"message_size": len(msg.body),
"messages_count": 1,
}

def get_publish_destination_name_from_kwargs(self, kwargs: StompProducerPublishKwargs) -> str: # type: ignore[override] # noqa: PLR6301
return kwargs["destination"]


class StompPrometheusMiddleware(BasePrometheusMiddleware):
def __init__(
self,
*,
registry: CollectorRegistry,
app_name: str = EMPTY,
metrics_prefix: str = "faststream",
received_messages_size_buckets: Sequence[float] | None = None,
) -> None:
super().__init__(
settings_provider_factory=lambda _: StompMetricsSettingsProvider(),
registry=registry,
app_name=app_name,
metrics_prefix=metrics_prefix,
received_messages_size_buckets=received_messages_size_buckets,
)
25 changes: 12 additions & 13 deletions packages/faststream-stomp/faststream_stomp/publisher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections.abc import Sequence
from functools import partial
from itertools import chain
from typing import Any
from typing import Any, TypedDict, Unpack

import stompman
from faststream.asyncapi.schema import Channel, CorrelationId, Message, Operation
Expand All @@ -14,26 +14,25 @@
from faststream.types import AsyncFunc, SendableMessage


class StompProducerPublishKwargs(TypedDict):
destination: str
correlation_id: str | None
headers: dict[str, str] | None


class StompProducer(ProducerProto):
_parser: AsyncCallable
_decoder: AsyncCallable

def __init__(self, client: stompman.Client) -> None:
self.client = client

async def publish( # type: ignore[override]
self,
message: SendableMessage,
*,
destination: str,
correlation_id: str | None,
headers: dict[str, str] | None,
) -> None:
async def publish(self, message: SendableMessage, **kwargs: Unpack[StompProducerPublishKwargs]) -> None: # type: ignore[override]
body, content_type = encode_message(message)
all_headers = headers.copy() if headers else {}
if correlation_id:
all_headers["correlation-id"] = correlation_id
await self.client.send(body, destination, content_type=content_type, headers=all_headers)
all_headers = kwargs["headers"].copy() if kwargs["headers"] else {}
if kwargs["correlation_id"]:
all_headers["correlation-id"] = kwargs["correlation_id"]
await self.client.send(body, kwargs["destination"], content_type=content_type, headers=all_headers)

async def request( # type: ignore[override]
self, message: SendableMessage, *, correlation_id: str | None, headers: dict[str, str] | None
Expand Down
5 changes: 4 additions & 1 deletion packages/faststream-stomp/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "faststream-stomp"
description = "FastStream STOMP broker"
authors = [{ name = "Lev Vereshchagin", email = "mail@vrslev.com" }]
dependencies = ["faststream>=0.5", "stompman"]
dependencies = ["faststream~=0.5", "stompman"]
requires-python = ">=3.11"
readme = "README.md"
license = { text = "MIT" }
Expand All @@ -23,6 +23,9 @@ repository = "https://github.com/vrslev/stompman"
requires = ["hatchling", "hatch-vcs"]
build-backend = "hatchling.build"

[dependency-groups]
dev = ["faststream[otel,prometheus]~=0.5"]

[tool.hatch.version]
source = "vcs"
raw-options.root = "../.."
Expand Down
27 changes: 27 additions & 0 deletions packages/faststream-stomp/test_faststream_stomp/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
from faststream import FastStream
from faststream.asyncapi import get_app_schema
from faststream.broker.message import gen_cor_id
from faststream_stomp.opentelemetry import StompTelemetryMiddleware
from faststream_stomp.prometheus import StompPrometheusMiddleware
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.trace import TracerProvider
from prometheus_client import CollectorRegistry
from test_stompman.conftest import build_dataclass

pytestmark = pytest.mark.anyio
Expand Down Expand Up @@ -81,3 +86,25 @@ def test_asyncapi_schema(faker: faker.Faker, broker: faststream_stomp.StompBroke
)
)
get_app_schema(FastStream(broker))


async def test_opentelemetry_publish(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
broker.add_middleware(StompTelemetryMiddleware(tracer_provider=TracerProvider(), meter_provider=MeterProvider()))

@broker.subscriber(destination := faker.pystr())
def _() -> None: ...

async with faststream_stomp.TestStompBroker(broker):
await broker.start()
await broker.publish(faker.pystr(), destination, correlation_id=gen_cor_id())


async def test_prometheus_publish(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
broker.add_middleware(StompPrometheusMiddleware(registry=CollectorRegistry()))

@broker.subscriber(destination := faker.pystr())
def _() -> None: ...

async with faststream_stomp.TestStompBroker(broker):
await broker.start()
await broker.publish(faker.pystr(), destination, correlation_id=gen_cor_id())