Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/v/cloud_topics/level_zero/gc/level_zero_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,20 @@ l0::gc::epoch_source::max_gc_eligible_epoch(seastar::abort_source* as) {
co_return std::unexpected(partitions.error());
}
if (partitions.value().partitions.empty()) {
co_return std::nullopt;
// No cloud topic partitions currently exist, so no partition can
// hold back the collectible epoch. Use the snapshot revision as
// the watermark so that we can still collect stranded L0 objects
// from previously deleted topics.
auto result = partitions.value().snap_revision;
vlog(
cd_log.debug,
"Empty partition snapshot, max GC eligible epoch is snapshot "
"epoch {}",
result);
if (probe_) {
probe_->set_min_partition_gc_epoch(result);
}
co_return result;
}

/*
Expand Down
33 changes: 28 additions & 5 deletions src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -527,15 +527,20 @@ class LevelZeroGCMaxEpochTest : public testing::Test {
};

TEST_F(LevelZeroGCMaxEpochTest, EmptySnapshot) {
// no error
// An empty snapshot means no cloud topic partitions currently exist.
// The watermark falls through to `snap_revision`: this is the
// zero-iteration case of the min-reduce. Stranded L0 objects left over
// from previously deleted topics remain epoch-eligible.
snapshot().snap_revision = cloud_topics::cluster_epoch(42);
ASSERT_TRUE(max_gc().has_value());
// no result
ASSERT_FALSE(max_gc().value().has_value());
ASSERT_TRUE(max_gc().value().has_value());
ASSERT_EQ(max_gc().value().value(), cloud_topics::cluster_epoch(42));
}

namespace {
model::topic_namespace tpns0(model::ns("ns0"), model::topic("t0"));
}
model::topic_namespace tpns1(model::ns("ns1"), model::topic("t1"));
} // namespace

TEST_F(LevelZeroGCMaxEpochTest, EmptyGcEpochReport) {
// here we have a non-empty snapshot, but no reported epochs from individual
Expand Down Expand Up @@ -572,10 +577,28 @@ TEST_F(LevelZeroGCMaxEpochTest, MinReduce) {
}

TEST_F(LevelZeroGCMaxEpochTest, EmptySnapshotNonEmptyReport) {
// With an empty snapshot, no partition can hold the watermark back:
// the topic table is the source of truth for liveness, so report
// entries without a matching snapshot entry are ignored regardless
// of value. This handles two races between the health report and
// snapshot collection points:
// (1) topic created after the snapshot - reported epoch is
// > snap_revision, so its L0 data is out of range for this
// pass anyway.
// (2) topic deleted before the snapshot - reported epoch is
// <= snap_revision; ignoring it prevents regressing the
// watermark below the snapshot and preserving the objects
// this pass is meant to reap.
snapshot().snap_revision = cloud_topics::cluster_epoch(100);
// case (1): report entry from a topic created after the snapshot
get_partitions_max_gc_epoch_value[tpns0][model::partition_id(0)]
= cloud_topics::cluster_epoch(200);
// case (2): stale report entry from a topic deleted before the snapshot
get_partitions_max_gc_epoch_value[tpns1][model::partition_id(0)]
= cloud_topics::cluster_epoch(50);
ASSERT_TRUE(max_gc().has_value());
ASSERT_FALSE(max_gc().value().has_value());
ASSERT_TRUE(max_gc().value().has_value());
ASSERT_EQ(max_gc().value().value(), cloud_topics::cluster_epoch(100));
}

class LevelZeroGCScaleOutTest : public LevelZeroGCTest {
Expand Down
81 changes: 81 additions & 0 deletions tests/rptest/tests/cloud_topics/l0_gc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1195,3 +1195,84 @@ def _blocked_rounds_stable():
backoff_sec=2,
retry_on_exc=True,
)


class CloudTopicsL0GCAllTopicsDeletedTest(CloudTopicsL0GCAdminBase):
"""
Integration: when every cloud topic is deleted while L0 objects still
exist in the bucket, GC must still clean them up.

Regression test for: `l0::gc::epoch_source::max_gc_eligible_epoch`
returned `std::nullopt` when the partition snapshot was empty (no
cloud topic partitions existed). `try_to_collect` then listed L0
objects but refused to delete any of them, returning
`no_collectible_epoch`. The worker loop backed off and retried
forever without making progress, so objects remained in the bucket
indefinitely.
"""

L0_PREFIX = "level_zero/data/"

def _count_l0_objects(self) -> int:
objects = self.redpanda.get_objects_from_si()
keys = [o.key for o in objects if o.key.startswith(self.L0_PREFIX)]
self.logger.debug(f"Found {len(keys)} L0 objects")
return len(keys)

@cluster(num_nodes=4)
@matrix(
cloud_storage_type=get_cloud_storage_type(applies_only_on=[CloudStorageType.S3])
)
def test_gc_completes_after_all_topics_deleted(
self, cloud_storage_type: CloudStorageType
):
topic = TopicSpec(partition_count=2, replication_factor=3)
self.topics = [topic]
self.create_topics(self.topics)

# Pause GC cluster-wide so L0 objects accumulate in the bucket
# while we produce.
self.gc_pause()
self.wait_for_status(status=GcStatus.L0_GC_STATUS_PAUSED)

# Produce long enough that reconciliation advances the epoch and
# L0 objects exist to be collected.
self.produce_some(topics=[topic.name])

wait_until(
lambda: self._count_l0_objects() > 0,
timeout_sec=60,
backoff_sec=2,
err_msg="Expected L0 objects to exist in the bucket after produce",
)
l0_before = self._count_l0_objects()
self.logger.info(f"Accumulated {l0_before} L0 objects with GC paused")

# Delete the only cloud topic and wait for the deletion to be
# applied cluster-wide.
rpk = RpkTool(self.redpanda)
self.logger.info(f"Deleting topic {topic.name}")
rpk.delete_topic(topic.name)
wait_until(
lambda: topic.name not in rpk.list_topics(),
timeout_sec=30,
backoff_sec=1,
err_msg=f"Topic {topic.name} still visible after delete",
)

# Resume GC and allow plenty of time for multiple GC rounds to finish.
# NOTE: Grace period is 10s.
# (see cloud_topics_short_term_gc_minimum_object_age in the base class).
self.gc_start()
self.wait_all_running()

wait_until(
lambda: self._count_l0_objects() == 0,
timeout_sec=120,
backoff_sec=5,
err_msg=lambda: (
f"L0 objects not cleaned up after deleting all cloud topics: "
f"{self._count_l0_objects()} remain (started with {l0_before})"
),
)
self.logger.info("All L0 objects cleaned up after topic deletion")
Loading