Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v/cloud_topics/reconciler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ redpanda_cc_library(
srcs = [
"reconciler.cc",
"reconciliation_source.cc",
"source_probe.cc",
],
hdrs = [
"reconciler.h",
"reconciliation_source.h",
"source_probe.h",
],
implementation_deps = [
"//src/v/cloud_topics:logger",
Expand All @@ -69,6 +71,7 @@ redpanda_cc_library(
"//src/v/cloud_topics/level_zero/stm:ctp_stm_api",
"//src/v/config",
"//src/v/kafka/utils:txn_reader",
"//src/v/metrics",
"//src/v/utils:retry_chain_node",
],
visibility = ["//visibility:public"],
Expand Down
5 changes: 5 additions & 0 deletions src/v/cloud_topics/reconciler/reconciler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,12 @@ void reconciler<Clock>::detach(const model::ntp& ntp) {
* which means that once a reference to a source is held,
* it shouldn't be assumed that the source remains in the
* _sources collection.
*
* Eagerly deregister metrics before erasing so that a new source
* for the same partition can be created while the old shared_ptr
* is still alive (held by an in-flight reconciliation pass).
*/
it->second->deregister_metrics();
_sources.erase(it);

// Clean up topic scheduler if no partitions remain.
Expand Down
13 changes: 13 additions & 0 deletions src/v/cloud_topics/reconciler/reconciliation_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ class l0_source : public source {
return lso.value() > kafka::next_offset(lro);
}

int64_t pending_offset_lag() override {
auto lro = last_reconciled_offset();
auto lso = _fe->last_stable_offset();
if (!lso.has_value()) {
return 0;
}
auto next = kafka::next_offset(lro);
if (lso.value() <= next) {
return 0;
}
return (lso.value() - next)();
}

kafka::offset last_reconciled_offset() override {
ctp_stm_api api(_partition->raft()->stm_manager()->get<ctp_stm>());
return api.get_last_reconciled_offset();
Expand Down
14 changes: 13 additions & 1 deletion src/v/cloud_topics/reconciler/reconciliation_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once

#include "cloud_topics/data_plane_api.h"
#include "cloud_topics/reconciler/source_probe.h"
#include "model/fundamental.h"
#include "model/record_batch_reader.h"

Expand Down Expand Up @@ -38,13 +39,18 @@ class source {

source(model::ntp ntp, model::topic_id_partition tidp)
: _ntp(std::move(ntp))
, _tidp(tidp) {}
, _tidp(tidp)
, _probe(_ntp, *this) {}
source(const source&) = delete;
source(source&&) = delete;
source& operator=(const source&) = delete;
source& operator=(source&&) = delete;
virtual ~source() = default;

// Deregister metrics so a new source for the same partition can be
// created while this shared_ptr is still alive.
void deregister_metrics() { _probe.clear(); }

// The NTP for this source
const model::ntp& ntp() const { return _ntp; }
// The topic ID + partition for the source
Expand All @@ -55,6 +61,11 @@ class source {
// Returns true if there may be new data to reconcile (LSO > LRO).
virtual bool has_pending_data() = 0;

// Returns the number of offsets pending reconciliation (LSO -
// next_offset(LRO)), or 0 if there is no pending data or LSO is
// unavailable.
virtual int64_t pending_offset_lag() = 0;

// Get the last reconciled offset for this source, or kafka::offset::min()
// if none.
virtual kafka::offset last_reconciled_offset() = 0;
Expand Down Expand Up @@ -86,6 +97,7 @@ class source {
private:
model::ntp _ntp;
model::topic_id_partition _tidp;
source_probe _probe;
};

// Make a reconciliation source from L0 components (data plane) and the cluster
Expand Down
47 changes: 47 additions & 0 deletions src/v/cloud_topics/reconciler/source_probe.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "cloud_topics/reconciler/source_probe.h"

#include "cloud_topics/reconciler/reconciliation_source.h"
#include "config/configuration.h"
#include "metrics/metrics.h"
#include "metrics/prometheus_sanitize.h"

#include <seastar/core/metrics.hh>

namespace cloud_topics::reconciler {

source_probe::source_probe(const model::ntp& ntp, source& src)
: _source(src) {
if (config::shard_local_cfg().disable_metrics()) {
return;
}

namespace sm = ss::metrics;
const std::vector<sm::label_instance> labels = {
metrics::namespace_label(ntp.ns()),
metrics::topic_label(ntp.tp.topic()),
metrics::partition_label(ntp.tp.partition()),
};

_metrics.add_group(
prometheus_sanitize::metrics_name("cloud_topics:reconciler"),
{sm::make_gauge(
"pending_offset_lag",
[this] { return _source.pending_offset_lag(); },
sm::description(
"Number of offsets pending reconciliation from L0 to L1"),
labels)},
{},
{sm::shard_label, metrics::partition_label});
}

} // namespace cloud_topics::reconciler
33 changes: 33 additions & 0 deletions src/v/cloud_topics/reconciler/source_probe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#pragma once

#include "metrics/metrics.h"
#include "model/fundamental.h"

#include <cstdint>

namespace cloud_topics::reconciler {

class source;

class source_probe {
public:
source_probe(const model::ntp& ntp, source& src);

void clear() { _metrics.clear(); }

private:
source& _source;
metrics::internal_metric_groups _metrics;
};

} // namespace cloud_topics::reconciler
54 changes: 54 additions & 0 deletions src/v/cloud_topics/reconciler/tests/reconciler_metrics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,62 @@ std::optional<uint64_t> get_offset_corrections() {
"cloud_topics_reconciler_offset_corrections");
}

std::optional<double> get_pending_offset_lag(const model::ntp& ntp) {
return test_utils::find_metric_value<double>(
"cloud_topics_reconciler_pending_offset_lag",
ss::metrics::default_handle(),
{
{ss::sstring("namespace"), ss::sstring(ntp.ns())},
{ss::sstring("topic"), ss::sstring(ntp.tp.topic())},
{ss::sstring("partition"),
ss::sstring(fmt::format("{}", ntp.tp.partition()))},
});
}

} // namespace

TEST_F(ReconcilerMetricsTest, PendingOffsetLag) {
const model::topic tp{"tapioca"};
const model::topic_id tid = model::topic_id::create();

auto src = add_source(tp, tid);
const auto& ntp = src->ntp();

// Before any data, lag should be 0.
EXPECT_THAT(get_pending_offset_lag(ntp), Optional(0.0));

// Add 10 records (offsets 0-9). Lag is visible immediately.
src->add_batch({.count = 10});
EXPECT_THAT(get_pending_offset_lag(ntp), Optional(10.0));

// Reconcile to advance LRO, lag should drop.
reconcile();
EXPECT_THAT(get_pending_offset_lag(ntp), Optional(0.0));

// Add more data and reconcile.
src->add_batch({.count = 5});
EXPECT_THAT(get_pending_offset_lag(ntp), Optional(5.0));
reconcile();
EXPECT_THAT(get_pending_offset_lag(ntp), Optional(0.0));
}

TEST_F(ReconcilerMetricsTest, PendingOffsetLagDetach) {
model::ntp ntp;
{
auto src = add_source();
ntp = src->ntp();

src->add_batch({.count = 10});
EXPECT_TRUE(get_pending_offset_lag(ntp).has_value());

// Detach drops the reconciler's reference. The test's local src
// is the last shared_ptr, so the source (and its probe) are
// destroyed when src goes out of scope.
reconciler().detach(ntp);
}
EXPECT_FALSE(get_pending_offset_lag(ntp).has_value());
}

TEST_F(ReconcilerMetricsTest, ThroughputCounters) {
EXPECT_THAT(get_objects_uploaded(), Optional(0));
EXPECT_THAT(get_bytes_reconciled(), Optional(0));
Expand Down
12 changes: 12 additions & 0 deletions src/v/cloud_topics/reconciler/tests/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ class fake_source : public source {

bool has_pending_data() override { return true; }

int64_t pending_offset_lag() override {
if (_source_log.empty()) {
return 0;
}
auto lso = kafka::offset(_source_log.back().last_offset()() + 1);
auto next = kafka::next_offset(_lro);
if (lso <= next) {
return 0;
}
return (lso - next)();
}

kafka::offset last_reconciled_offset() override { return _lro; }

ss::future<std::expected<void, errc>>
Expand Down
Loading