diff --git a/src/v/cloud_topics/reconciler/BUILD b/src/v/cloud_topics/reconciler/BUILD index a16e65fe5f076..a91c67c59541c 100644 --- a/src/v/cloud_topics/reconciler/BUILD +++ b/src/v/cloud_topics/reconciler/BUILD @@ -57,10 +57,12 @@ redpanda_cc_library( srcs = [ "reconciler.cc", "reconciliation_source.cc", + "source_probe.cc", ], hdrs = [ "reconciler.h", "reconciliation_source.h", + "source_probe.h", ], implementation_deps = [ "//src/v/cloud_topics:logger", @@ -69,6 +71,7 @@ redpanda_cc_library( "//src/v/cloud_topics/level_zero/stm:ctp_stm_api", "//src/v/config", "//src/v/kafka/utils:txn_reader", + "//src/v/metrics", "//src/v/utils:retry_chain_node", ], visibility = ["//visibility:public"], diff --git a/src/v/cloud_topics/reconciler/reconciler.cc b/src/v/cloud_topics/reconciler/reconciler.cc index 49e4e4f9fc882..64c1ac48be145 100644 --- a/src/v/cloud_topics/reconciler/reconciler.cc +++ b/src/v/cloud_topics/reconciler/reconciler.cc @@ -208,7 +208,12 @@ void reconciler::detach(const model::ntp& ntp) { * which means that once a reference to a source is held, * it shouldn't be assumed that the source remains in the * _sources collection. + * + * Eagerly deregister metrics before erasing so that a new source + * for the same partition can be created while the old shared_ptr + * is still alive (held by an in-flight reconciliation pass). */ + it->second->deregister_metrics(); _sources.erase(it); // Clean up topic scheduler if no partitions remain. diff --git a/src/v/cloud_topics/reconciler/reconciliation_source.cc b/src/v/cloud_topics/reconciler/reconciliation_source.cc index 9e590d00085be..902e45b8cea46 100644 --- a/src/v/cloud_topics/reconciler/reconciliation_source.cc +++ b/src/v/cloud_topics/reconciler/reconciliation_source.cc @@ -72,6 +72,19 @@ class l0_source : public source { return lso.value() > kafka::next_offset(lro); } + int64_t pending_offset_lag() override { + auto lro = last_reconciled_offset(); + auto lso = _fe->last_stable_offset(); + if (!lso.has_value()) { + return 0; + } + auto next = kafka::next_offset(lro); + if (lso.value() <= next) { + return 0; + } + return (lso.value() - next)(); + } + kafka::offset last_reconciled_offset() override { ctp_stm_api api(_partition->raft()->stm_manager()->get()); return api.get_last_reconciled_offset(); diff --git a/src/v/cloud_topics/reconciler/reconciliation_source.h b/src/v/cloud_topics/reconciler/reconciliation_source.h index 4b571164afb1e..3518143afd59a 100644 --- a/src/v/cloud_topics/reconciler/reconciliation_source.h +++ b/src/v/cloud_topics/reconciler/reconciliation_source.h @@ -11,6 +11,7 @@ #pragma once #include "cloud_topics/data_plane_api.h" +#include "cloud_topics/reconciler/source_probe.h" #include "model/fundamental.h" #include "model/record_batch_reader.h" @@ -38,13 +39,18 @@ class source { source(model::ntp ntp, model::topic_id_partition tidp) : _ntp(std::move(ntp)) - , _tidp(tidp) {} + , _tidp(tidp) + , _probe(_ntp, *this) {} source(const source&) = delete; source(source&&) = delete; source& operator=(const source&) = delete; source& operator=(source&&) = delete; virtual ~source() = default; + // Deregister metrics so a new source for the same partition can be + // created while this shared_ptr is still alive. + void deregister_metrics() { _probe.clear(); } + // The NTP for this source const model::ntp& ntp() const { return _ntp; } // The topic ID + partition for the source @@ -55,6 +61,11 @@ class source { // Returns true if there may be new data to reconcile (LSO > LRO). virtual bool has_pending_data() = 0; + // Returns the number of offsets pending reconciliation (LSO - + // next_offset(LRO)), or 0 if there is no pending data or LSO is + // unavailable. + virtual int64_t pending_offset_lag() = 0; + // Get the last reconciled offset for this source, or kafka::offset::min() // if none. virtual kafka::offset last_reconciled_offset() = 0; @@ -86,6 +97,7 @@ class source { private: model::ntp _ntp; model::topic_id_partition _tidp; + source_probe _probe; }; // Make a reconciliation source from L0 components (data plane) and the cluster diff --git a/src/v/cloud_topics/reconciler/source_probe.cc b/src/v/cloud_topics/reconciler/source_probe.cc new file mode 100644 index 0000000000000..9319087f00c61 --- /dev/null +++ b/src/v/cloud_topics/reconciler/source_probe.cc @@ -0,0 +1,47 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_topics/reconciler/source_probe.h" + +#include "cloud_topics/reconciler/reconciliation_source.h" +#include "config/configuration.h" +#include "metrics/metrics.h" +#include "metrics/prometheus_sanitize.h" + +#include + +namespace cloud_topics::reconciler { + +source_probe::source_probe(const model::ntp& ntp, source& src) + : _source(src) { + if (config::shard_local_cfg().disable_metrics()) { + return; + } + + namespace sm = ss::metrics; + const std::vector labels = { + metrics::namespace_label(ntp.ns()), + metrics::topic_label(ntp.tp.topic()), + metrics::partition_label(ntp.tp.partition()), + }; + + _metrics.add_group( + prometheus_sanitize::metrics_name("cloud_topics:reconciler"), + {sm::make_gauge( + "pending_offset_lag", + [this] { return _source.pending_offset_lag(); }, + sm::description( + "Number of offsets pending reconciliation from L0 to L1"), + labels)}, + {}, + {sm::shard_label, metrics::partition_label}); +} + +} // namespace cloud_topics::reconciler diff --git a/src/v/cloud_topics/reconciler/source_probe.h b/src/v/cloud_topics/reconciler/source_probe.h new file mode 100644 index 0000000000000..149aecf8b5706 --- /dev/null +++ b/src/v/cloud_topics/reconciler/source_probe.h @@ -0,0 +1,33 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "metrics/metrics.h" +#include "model/fundamental.h" + +#include + +namespace cloud_topics::reconciler { + +class source; + +class source_probe { +public: + source_probe(const model::ntp& ntp, source& src); + + void clear() { _metrics.clear(); } + +private: + source& _source; + metrics::internal_metric_groups _metrics; +}; + +} // namespace cloud_topics::reconciler diff --git a/src/v/cloud_topics/reconciler/tests/reconciler_metrics_test.cc b/src/v/cloud_topics/reconciler/tests/reconciler_metrics_test.cc index 0233647be76dd..a740bc8a97afc 100644 --- a/src/v/cloud_topics/reconciler/tests/reconciler_metrics_test.cc +++ b/src/v/cloud_topics/reconciler/tests/reconciler_metrics_test.cc @@ -108,8 +108,62 @@ std::optional get_offset_corrections() { "cloud_topics_reconciler_offset_corrections"); } +std::optional get_pending_offset_lag(const model::ntp& ntp) { + return test_utils::find_metric_value( + "cloud_topics_reconciler_pending_offset_lag", + ss::metrics::default_handle(), + { + {ss::sstring("namespace"), ss::sstring(ntp.ns())}, + {ss::sstring("topic"), ss::sstring(ntp.tp.topic())}, + {ss::sstring("partition"), + ss::sstring(fmt::format("{}", ntp.tp.partition()))}, + }); +} + } // namespace +TEST_F(ReconcilerMetricsTest, PendingOffsetLag) { + const model::topic tp{"tapioca"}; + const model::topic_id tid = model::topic_id::create(); + + auto src = add_source(tp, tid); + const auto& ntp = src->ntp(); + + // Before any data, lag should be 0. + EXPECT_THAT(get_pending_offset_lag(ntp), Optional(0.0)); + + // Add 10 records (offsets 0-9). Lag is visible immediately. + src->add_batch({.count = 10}); + EXPECT_THAT(get_pending_offset_lag(ntp), Optional(10.0)); + + // Reconcile to advance LRO, lag should drop. + reconcile(); + EXPECT_THAT(get_pending_offset_lag(ntp), Optional(0.0)); + + // Add more data and reconcile. + src->add_batch({.count = 5}); + EXPECT_THAT(get_pending_offset_lag(ntp), Optional(5.0)); + reconcile(); + EXPECT_THAT(get_pending_offset_lag(ntp), Optional(0.0)); +} + +TEST_F(ReconcilerMetricsTest, PendingOffsetLagDetach) { + model::ntp ntp; + { + auto src = add_source(); + ntp = src->ntp(); + + src->add_batch({.count = 10}); + EXPECT_TRUE(get_pending_offset_lag(ntp).has_value()); + + // Detach drops the reconciler's reference. The test's local src + // is the last shared_ptr, so the source (and its probe) are + // destroyed when src goes out of scope. + reconciler().detach(ntp); + } + EXPECT_FALSE(get_pending_offset_lag(ntp).has_value()); +} + TEST_F(ReconcilerMetricsTest, ThroughputCounters) { EXPECT_THAT(get_objects_uploaded(), Optional(0)); EXPECT_THAT(get_bytes_reconciled(), Optional(0)); diff --git a/src/v/cloud_topics/reconciler/tests/test_utils.h b/src/v/cloud_topics/reconciler/tests/test_utils.h index 3be3334755ab7..3012474eadb28 100644 --- a/src/v/cloud_topics/reconciler/tests/test_utils.h +++ b/src/v/cloud_topics/reconciler/tests/test_utils.h @@ -51,6 +51,18 @@ class fake_source : public source { bool has_pending_data() override { return true; } + int64_t pending_offset_lag() override { + if (_source_log.empty()) { + return 0; + } + auto lso = kafka::offset(_source_log.back().last_offset()() + 1); + auto next = kafka::next_offset(_lro); + if (lso <= next) { + return 0; + } + return (lso - next)(); + } + kafka::offset last_reconciled_offset() override { return _lro; } ss::future>