From 42f59e4fd7af0c80b83f7d774ee0441375ee8726 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 30 Oct 2025 13:49:09 -0400 Subject: [PATCH 1/2] rabbit_prometheus_handler: Stream identity-encoded HTTP replies `prometheus_text_format:format/1` produces a binary of the format for the entire registry. For clusters with many resources, this can lead to large replies from `/metrics/[:registry]` especially for large registries like `per-object`. Instead of formatting the response and then sending it, we can stream the response by taking advantage of the new `format_into/3` callback (which needs to be added upstream to the `prometheus` dep). This uses `cowboy_req:stream_body/3` to stream the iodata as `prometheus` works through the registry. This should hopefully be a nice memory improvement. The other benefit is that results are sent eagerly. For a stress-testing example, 1. `make run-broker` 2. `rabbitmqctl import_definitions path/to/100k-classic-queues.json` 3. `curl -s localhost:15692/metrics/per-object` Before this change `curl` would wait for around 8 seconds and then the entire response would arrive. With this change the results start streaming in immediately. --- .../src/rabbit_prometheus_handler.erl | 85 +++++++++---------- rabbitmq-components.mk | 2 +- 2 files changed, 42 insertions(+), 45 deletions(-) diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl index 5f1c253cdffb..e26f6e46c30d 100644 --- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl +++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl @@ -89,47 +89,58 @@ gen_response(_, Request) -> Request. gen_metrics_response(Registry, Request) -> - {Code, RespHeaders, Body} = reply(Registry, Request), - - Headers = to_cowboy_headers(RespHeaders), - cowboy_req:reply(Code, maps:from_list(Headers), Body, Request). - -to_cowboy_headers(RespHeaders) -> - lists:map(fun to_cowboy_headers_/1, RespHeaders). - -to_cowboy_headers_({Name, Value}) -> - {to_cowboy_name(Name), Value}. - -to_cowboy_name(Name) -> - binary:replace(atom_to_binary(Name, utf8), <<"_">>, <<"-">>). - -reply(Registry, Request) -> case validate_registry(Registry, registry()) of {true, RealRegistry} -> format_metrics(Request, RealRegistry); {registry_conflict, _ReqR, _ConfR} -> - {409, [], <<>>} + cowboy_req:reply(409, #{}, <<>>, Request) end. format_metrics(Request, Registry) -> + %% Formatting registries produces large binaries. Fullsweep eagerly to + %% evict the large binaries faster and make GC cheaper. + process_flag(fullsweep_after, 0), AcceptEncoding = cowboy_req:header(<<"accept-encoding">>, Request, undefined), ContentType = prometheus_text_format:content_type(), - Scrape = render_format(ContentType, Registry), Encoding = accept_encoding_header:negotiate(AcceptEncoding, [<<"identity">>, <<"gzip">>]), - encode_format(ContentType, binary_to_list(Encoding), Scrape, Registry). - -render_format(ContentType, Registry) -> - Scrape = prometheus_summary:observe_duration( - Registry, - ?SCRAPE_DURATION, - [Registry, ContentType], - fun () -> prometheus_text_format:format(Registry) end), - prometheus_summary:observe(Registry, - ?SCRAPE_SIZE, - [Registry, ContentType], - iolist_size(Scrape)), - Scrape. + Headers = #{<<"content-type">> => ContentType, + <<"content-encoding">> => Encoding}, + case Encoding of + <<"gzip">> -> + Scrape = prometheus_summary:observe_duration( + Registry, + ?SCRAPE_DURATION, + [Registry, ContentType], + fun () -> prometheus_text_format:format(Registry) end), + prometheus_summary:observe(Registry, + ?SCRAPE_SIZE, + [Registry, ContentType], + iolist_size(Scrape)), + Encoded = zlib:gzip(Scrape), + prometheus_summary:observe(telemetry_registry(), + ?SCRAPE_ENCODED_SIZE, + [Registry, ContentType, Encoding], + iolist_size(Encoded)), + cowboy_req:reply(200, Headers, Encoded, Request); + <<"identity">> -> + Req = cowboy_req:stream_reply(200, Headers, Request), + Fmt = fun(Data, Size) -> + cowboy_req:stream_body(Data, nofin, Req), + Size + iolist_size(Data) + end, + Size = prometheus_summary:observe_duration( + Registry, + ?SCRAPE_DURATION, + [Registry, ContentType], + fun () -> prometheus_text_format:format_into(Registry, 0, Fmt) end), + cowboy_req:stream_body(<<>>, fin, Req), + prometheus_summary:observe(Registry, + ?SCRAPE_SIZE, + [Registry, ContentType], + Size), + Req + end. validate_registry(undefined, auto) -> {true, default}; @@ -146,20 +157,6 @@ telemetry_registry() -> registry() -> application:get_env(rabbitmq_prometheus, registry, auto). -encode_format(ContentType, Encoding, Scrape, Registry) -> - Encoded = encode_format_(Encoding, Scrape), - prometheus_summary:observe(telemetry_registry(), - ?SCRAPE_ENCODED_SIZE, - [Registry, ContentType, Encoding], - iolist_size(Encoded)), - {200, [{content_type, binary_to_list(ContentType)}, - {content_encoding, Encoding}], Encoded}. - -encode_format_("gzip", Scrape) -> - zlib:gzip(Scrape); -encode_format_("identity", Scrape) -> - Scrape. - %% It's not easy to pass this information in a pure way (it'll require changing prometheus.erl) put_filtering_options_into_process_dictionary(Request) -> #{vhost := VHosts, family := Families} = cowboy_req:match_qs([{vhost, [], undefined}, {family, [], undefined}], Request), diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 7fc8aae4fbd5..46882f90f0a2 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -50,7 +50,7 @@ dep_khepri = hex 0.17.2 dep_khepri_mnesia_migration = hex 0.8.0 dep_meck = hex 1.0.0 dep_osiris = git https://github.com/rabbitmq/osiris v1.10.2 -dep_prometheus = hex 5.1.1 +dep_prometheus = git https://github.com/the-mikedavis/prometheus.erl ce32cfef7eee60f577c1f200304428509152a7ff dep_ra = hex 2.17.1 dep_ranch = hex 2.2.0 dep_recon = hex 2.5.6 From 051c2c021e8f0bb135cb87a6f89884646acf01b0 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 13 Nov 2025 22:49:46 -0500 Subject: [PATCH 2/2] rabbit_prometheus_handler: Delete manual gzip handling The Cowboy stream handler `cowboy_compress_h` is already enabled for this endpoint so there's no need to perform gzipping manually. This change deletes the custom handling of accept-encoding. We previously returned a content-encoding of "identity" for non-gzipped requests, but according to , the content-encoding response header should not be set to identity. The test case is accurate anyways: the response is text, not something compressed. --- deps/rabbitmq_prometheus/Makefile | 2 +- .../src/rabbit_prometheus_handler.erl | 64 +++++-------------- .../test/rabbit_prometheus_http_SUITE.erl | 2 +- rabbitmq-components.mk | 1 - 4 files changed, 19 insertions(+), 50 deletions(-) diff --git a/deps/rabbitmq_prometheus/Makefile b/deps/rabbitmq_prometheus/Makefile index 75976e7cea8d..984c9f55c1c4 100644 --- a/deps/rabbitmq_prometheus/Makefile +++ b/deps/rabbitmq_prometheus/Makefile @@ -9,7 +9,7 @@ endef PROJECT := rabbitmq_prometheus PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ PROJECT_MOD := rabbit_prometheus_app -DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch +DEPS = cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch BUILD_DEPS = amqp_client rabbit_common rabbitmq_management TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_stream diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl index e26f6e46c30d..fe382826094b 100644 --- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl +++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl @@ -14,7 +14,6 @@ -define(SCRAPE_DURATION, telemetry_scrape_duration_seconds). -define(SCRAPE_SIZE, telemetry_scrape_size_bytes). --define(SCRAPE_ENCODED_SIZE, telemetry_scrape_encoded_size_bytes). -define(AUTH_REALM, "Basic realm=\"RabbitMQ Prometheus\""). @@ -58,14 +57,9 @@ setup_metrics(Registry) -> {help, "Scrape size, not encoded"}, {labels, ["registry", "content_type"]}, {registry, Registry}], - ScrapeEncodedSize = [{name, ?SCRAPE_ENCODED_SIZE}, - {help, "Scrape size, encoded"}, - {labels, ["registry", "content_type", "encoding"]}, - {registry, Registry}], prometheus_summary:declare(ScrapeDuration), - prometheus_summary:declare(ScrapeSize), - prometheus_summary:declare(ScrapeEncodedSize). + prometheus_summary:declare(ScrapeSize). %% =================================================================== %% Private functions @@ -100,47 +94,23 @@ format_metrics(Request, Registry) -> %% Formatting registries produces large binaries. Fullsweep eagerly to %% evict the large binaries faster and make GC cheaper. process_flag(fullsweep_after, 0), - AcceptEncoding = cowboy_req:header(<<"accept-encoding">>, Request, undefined), ContentType = prometheus_text_format:content_type(), - Encoding = accept_encoding_header:negotiate(AcceptEncoding, [<<"identity">>, - <<"gzip">>]), - Headers = #{<<"content-type">> => ContentType, - <<"content-encoding">> => Encoding}, - case Encoding of - <<"gzip">> -> - Scrape = prometheus_summary:observe_duration( - Registry, - ?SCRAPE_DURATION, - [Registry, ContentType], - fun () -> prometheus_text_format:format(Registry) end), - prometheus_summary:observe(Registry, - ?SCRAPE_SIZE, - [Registry, ContentType], - iolist_size(Scrape)), - Encoded = zlib:gzip(Scrape), - prometheus_summary:observe(telemetry_registry(), - ?SCRAPE_ENCODED_SIZE, - [Registry, ContentType, Encoding], - iolist_size(Encoded)), - cowboy_req:reply(200, Headers, Encoded, Request); - <<"identity">> -> - Req = cowboy_req:stream_reply(200, Headers, Request), - Fmt = fun(Data, Size) -> - cowboy_req:stream_body(Data, nofin, Req), - Size + iolist_size(Data) - end, - Size = prometheus_summary:observe_duration( - Registry, - ?SCRAPE_DURATION, - [Registry, ContentType], - fun () -> prometheus_text_format:format_into(Registry, 0, Fmt) end), - cowboy_req:stream_body(<<>>, fin, Req), - prometheus_summary:observe(Registry, - ?SCRAPE_SIZE, - [Registry, ContentType], - Size), - Req - end. + Req = cowboy_req:stream_reply(200, #{<<"content-type">> => ContentType}, Request), + Fmt = fun(Data, Size) -> + cowboy_req:stream_body(Data, nofin, Req), + Size + iolist_size(Data) + end, + Size = prometheus_summary:observe_duration( + Registry, + ?SCRAPE_DURATION, + [Registry, ContentType], + fun () -> prometheus_text_format:format_into(Registry, 0, Fmt) end), + cowboy_req:stream_body(<<>>, fin, Req), + prometheus_summary:observe(Registry, + ?SCRAPE_SIZE, + [Registry, ContentType], + Size), + Req. validate_registry(undefined, auto) -> {true, default}; diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index bb74cdffd21a..52357fad52aa 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -373,7 +373,7 @@ content_type_test(Config) -> encoding_test(Config) -> {Headers, Body} = http_get(Config, [{"accept-encoding", "deflate"}], 200), - ?assertMatch("identity", proplists:get_value("content-encoding", Headers)), + ?assertMatch(undefined, proplists:get_value("content-encoding", Headers)), ?assertEqual(match, re:run(Body, "^# TYPE", [{capture, none}, multiline])). gzip_encoding_test(Config) -> diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 46882f90f0a2..4f2b9794ee14 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -39,7 +39,6 @@ endif # We do that in this file, which is included by all projects, to ensure # all projects use the same versions. It avoids conflicts. -dep_accept = hex 0.3.5 dep_cowboy = hex 2.14.1 dep_cowlib = hex 2.16.0 dep_credentials_obfuscation = hex 3.5.0